Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stream.Wrapf #74

Merged
merged 1 commit into from Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -298,6 +298,7 @@ The following functions could be used to create a new stream:
- `stream.FromStepResult[A any](iosr io.IO[StepResult[A]]) Stream[A]` - basic definition of a stream - IO that returns value and continuation.
- `stream.Eval[A any](ioa io.IO[A]) Stream[A]` - Eval returns a stream of one value that is the result of IO.
- `stream.Fail[A any](err error) Stream[A]` - Fail returns a stream that fails immediately.
- `stream.Wrapf[A any](stm Stream[A], format string, args ...interface{}) Stream[A]` - Wrapf wraps errors produced by this stream with additional context info.

### Manipulation

Expand Down
2 changes: 2 additions & 0 deletions stream/common_test.go
Expand Up @@ -23,6 +23,8 @@ var printInt = stream.NewSink(func(i int) { fmt.Printf("%d", i) })
var errExpected = errors.New("expected error")
var failedStream = stream.Eval(io.Fail[int](errExpected))

var natsAndThenFail = stream.AndThen(nats10, failedStream)

func UnsafeStreamToSlice[A any](t *testing.T, stm stream.Stream[A]) []A {
return UnsafeIO(t, stream.ToSlice(stm))
}
Expand Down
24 changes: 24 additions & 0 deletions stream/stream.go
Expand Up @@ -467,3 +467,27 @@ func FoldLeft[A any, B any](stm Stream[A], zero B, combine func(B, A) B) io.IO[B
})
})
}

// Wrapf wraps errors produced by this stream with additional context info.
func Wrapf[A any](stm Stream[A], format string, args ...interface{}) Stream[A] {
iosra := io.IO[StepResult[A]](stm)
w := io.Wrapf(iosra, format, args...)
m := io.FlatMap(w, func(sra StepResult[A]) (res io.IO[StepResult[A]]) {
if sra.IsFinished {
res = io.Lift(sra)
} else {
cont := Stream[A](
io.Delay(func() io.IO[StepResult[A]] {
return io.IO[StepResult[A]](Wrapf(sra.Continuation, format, args...))
}),
)
if sra.HasValue {
res = io.Lift(NewStepResult(sra.Value, cont))
} else {
res = io.Lift(NewStepResultEmpty(cont))
}
}
return
})
return Stream[A](m)
}
11 changes: 9 additions & 2 deletions stream/stream_test.go
Expand Up @@ -148,8 +148,6 @@ func TestFoldLeftEval(t *testing.T) {
}

func TestStateFlatMapWithFinishAndFailureHandling(t *testing.T) {
failedStream := stream.Eval(io.Fail[int](errExpected))
natsAndThenFail := stream.AndThen(nats10, failedStream)
sumStream := stream.StateFlatMapWithFinishAndFailureHandling(natsAndThenFail, 0,
func(i, j int) io.IO[fun.Pair[int, stream.Stream[int]]] {
return io.Lift(fun.NewPair(i+j, stream.Empty[int]()))
Expand Down Expand Up @@ -190,3 +188,12 @@ func TestStateFlatMapWithFinishAndFailureHandling2(t *testing.T) {
assert.NoError(t, err1)
assert.Equal(t, -45, sum)
}

func TestWrapf(t *testing.T) {
wrappedNatsAndThenFail := stream.Wrapf(natsAndThenFail, "wrapped")
wrLastIO := stream.Last(wrappedNatsAndThenFail)
_, err1 := io.UnsafeRunSync(wrLastIO)
if assert.Error(t, err1) {
assert.Contains(t, err1.Error(), "wrapped")
}
}