diff --git a/README.md b/README.md index 4772c04..bc1c568 100644 --- a/README.md +++ b/README.md @@ -298,6 +298,7 @@ The following functions could be used to create a new stream: - `stream.FromStepResult[A any](iosr io.IO[StepResult[A]]) Stream[A]` - basic definition of a stream - IO that returns value and continuation. - `stream.Eval[A any](ioa io.IO[A]) Stream[A]` - Eval returns a stream of one value that is the result of IO. - `stream.Fail[A any](err error) Stream[A]` - Fail returns a stream that fails immediately. +- `stream.Wrapf[A any](stm Stream[A], format string, args ...interface{}) Stream[A]` - Wrapf wraps errors produced by this stream with additional context info. ### Manipulation diff --git a/stream/common_test.go b/stream/common_test.go index 92b3b03..5c26576 100644 --- a/stream/common_test.go +++ b/stream/common_test.go @@ -23,6 +23,8 @@ var printInt = stream.NewSink(func(i int) { fmt.Printf("%d", i) }) var errExpected = errors.New("expected error") var failedStream = stream.Eval(io.Fail[int](errExpected)) +var natsAndThenFail = stream.AndThen(nats10, failedStream) + func UnsafeStreamToSlice[A any](t *testing.T, stm stream.Stream[A]) []A { return UnsafeIO(t, stream.ToSlice(stm)) } diff --git a/stream/stream.go b/stream/stream.go index 62af741..65fd0d1 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -467,3 +467,27 @@ func FoldLeft[A any, B any](stm Stream[A], zero B, combine func(B, A) B) io.IO[B }) }) } + +// Wrapf wraps errors produced by this stream with additional context info. +func Wrapf[A any](stm Stream[A], format string, args ...interface{}) Stream[A] { + iosra := io.IO[StepResult[A]](stm) + w := io.Wrapf(iosra, format, args...) + m := io.FlatMap(w, func(sra StepResult[A]) (res io.IO[StepResult[A]]) { + if sra.IsFinished { + res = io.Lift(sra) + } else { + cont := Stream[A]( + io.Delay(func() io.IO[StepResult[A]] { + return io.IO[StepResult[A]](Wrapf(sra.Continuation, format, args...)) + }), + ) + if sra.HasValue { + res = io.Lift(NewStepResult(sra.Value, cont)) + } else { + res = io.Lift(NewStepResultEmpty(cont)) + } + } + return + }) + return Stream[A](m) +} diff --git a/stream/stream_test.go b/stream/stream_test.go index 46f0fad..f7c4263 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -148,8 +148,6 @@ func TestFoldLeftEval(t *testing.T) { } func TestStateFlatMapWithFinishAndFailureHandling(t *testing.T) { - failedStream := stream.Eval(io.Fail[int](errExpected)) - natsAndThenFail := stream.AndThen(nats10, failedStream) sumStream := stream.StateFlatMapWithFinishAndFailureHandling(natsAndThenFail, 0, func(i, j int) io.IO[fun.Pair[int, stream.Stream[int]]] { return io.Lift(fun.NewPair(i+j, stream.Empty[int]())) @@ -190,3 +188,12 @@ func TestStateFlatMapWithFinishAndFailureHandling2(t *testing.T) { assert.NoError(t, err1) assert.Equal(t, -45, sum) } + +func TestWrapf(t *testing.T) { + wrappedNatsAndThenFail := stream.Wrapf(natsAndThenFail, "wrapped") + wrLastIO := stream.Last(wrappedNatsAndThenFail) + _, err1 := io.UnsafeRunSync(wrLastIO) + if assert.Error(t, err1) { + assert.Contains(t, err1.Error(), "wrapped") + } +}