Skip to content

Commit

Permalink
Merge pull request #44 from Primetalk/use-obtain-result-in-fold
Browse files Browse the repository at this point in the history
Use obtain result in fold
  • Loading branch information
Primetalk committed Aug 16, 2022
2 parents 73bff84 + 180fed3 commit 29cbed2
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
3 changes: 3 additions & 0 deletions io/continuation.go
Expand Up @@ -3,6 +3,8 @@ package io
import (
"errors"
"fmt"

"github.com/primetalk/goio/fun"
)

// Continuation represents some multistep computation.
Expand All @@ -21,6 +23,7 @@ var MaxContinuationDepth = 1000000000000

// ObtainResult executes continuation until final result is obtained.
func ObtainResult[A any](c Continuation[A]) (res A, err error) {
defer fun.RecoverToErrorVar("ObtainResult", &err)
if c == nil {
err = errors.New("nil continuation is being enforced")
} else {
Expand Down
31 changes: 14 additions & 17 deletions io/io.go
Expand Up @@ -161,24 +161,21 @@ func Fail[A any](err error) IO[A] {
// Fold performs different calculations based on whether IO[A] failed or succeeded.
func Fold[A any, B any](ioA IO[A], f func(a A) IO[B], recover func(error) IO[B]) IO[B] {
return func() ResultOrContinuation[B] {
rA := ioA()
if rA.Continuation == nil {
if rA.Error == nil {
cont := Continuation[B](func() ResultOrContinuation[B] {
ioB := f(rA.Value)
return ioB()
})
return ResultOrContinuation[B]{
Continuation: &cont,
}
} else {
return recover(rA.Error)()
}
a, err := ObtainResult(Continuation[A](ioA))
var cont Continuation[B]
if err == nil {
cont = Continuation[B](func() ResultOrContinuation[B] {
ioB := f(a)
return ioB()
})
} else {
cont := Continuation[B](Fold(IO[A](*rA.Continuation), f, recover))
return ResultOrContinuation[B]{
Continuation: &cont,
}
cont = Continuation[B](func() ResultOrContinuation[B] {
ioB := recover(err)
return ioB()
})
}
return ResultOrContinuation[B]{
Continuation: &cont,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions stream/pool.go
Expand Up @@ -42,10 +42,10 @@ func NewPool[A any](size int) io.IO[Pipe[io.IO[A], io.GoResult[A]]] {
}

// NewPoolFromExecutionContext creates an execution pool that will execute tasks concurrently.
// After the execution context a buffer is created to allow as many as `capacity`
// After the execution context a buffer is created to allow as many as `capacity`
// parallel tasks to be executed.
// This pool won't change the order of elements.
// NB! As work starts in parallel, in case of failure
// NB! As work starts in parallel, in case of failure
// some future elements could be evaluated even after the failed element.
// Hence we use GoResult to represent evaluation results.
func NewPoolFromExecutionContext[A any](ec io.ExecutionContext, capacity int) io.IO[Pipe[io.IO[A], io.GoResult[A]]] {
Expand Down
2 changes: 1 addition & 1 deletion stream/pool_test.go
Expand Up @@ -26,7 +26,7 @@ func TestPool(t *testing.T) {
sleepResults := stream.ThroughPipeEval(sleepTasks100, poolIO)
resultStream := stream.MapEval(sleepResults, io.FromConstantGoResult[int])
resultsIO := stream.ToSlice(resultStream)

start := time.Now()
results, err := io.UnsafeRunSync(resultsIO)
assert.NoError(t, err)
Expand Down

0 comments on commit 29cbed2

Please sign in to comment.