diff --git a/README.md b/README.md index 2256764..b562afb 100644 --- a/README.md +++ b/README.md @@ -348,8 +348,8 @@ Sink is a Pipe that returns a stream of units. That stream could be drained afte - `stream.NewSink[A any](f func(a A)) Sink[A]` - `stream.Through[A any, B any](stm Stream[A], pipe Pipe[A, B]) Stream[B]` - `stream.ThroughPipeEval[A any, B any](stm Stream[A], pipeIO io.IO[Pipe[A, B]]) Stream[B]` - ThroughPipeEval runs the given stream through pipe that is returned by the provided pipeIO. - - `stream.ToSink[A any](stm Stream[A], sink Sink[A]) Stream[fun.Unit]` +- `stream.ConcatPipes[A any, B any, C any](pipe1 Pipe[A, B], pipe2 Pipe[B, C]) Pipe[A, C]` - ConcatPipes connects two pipes into one. ### Length manipulation diff --git a/stream/common_test.go b/stream/common_test.go index af65144..8816244 100644 --- a/stream/common_test.go +++ b/stream/common_test.go @@ -2,8 +2,11 @@ package stream_test import ( "fmt" + "testing" + "github.com/primetalk/goio/io" "github.com/primetalk/goio/stream" + "github.com/stretchr/testify/assert" ) var nats = stream.Unfold(0, func(s int) int { @@ -16,3 +19,9 @@ var Mul2 = stream.MapPipe(func(i int) int { return i * 2 }) var pipeMul2IO = stream.PipeToPairOfChannels(Mul2) var printInt = stream.NewSink(func(i int) { fmt.Printf("%d", i) }) + +func UnsafeStreamToSlice[A any](t *testing.T, stm stream.Stream[A]) []A { + res, err1 := io.UnsafeRunSync(stream.ToSlice(stm)) + assert.NoError(t, err1) + return res +} diff --git a/stream/pipe.go b/stream/pipe.go index 6d47d69..3996988 100644 --- a/stream/pipe.go +++ b/stream/pipe.go @@ -40,3 +40,10 @@ func NewSink[A any](f func(a A)) Sink[A] { func ToSink[A any](stm Stream[A], sink Sink[A]) Stream[fun.Unit] { return sink(stm) } + +// ConcatPipes connects two pipes into one. +func ConcatPipes[A any, B any, C any](pipe1 Pipe[A, B], pipe2 Pipe[B, C]) Pipe[A, C] { + return func(sa Stream[A]) Stream[C] { + return pipe2(pipe1(sa)) + } +} diff --git a/stream/pipe_test.go b/stream/pipe_test.go new file mode 100644 index 0000000..2047f32 --- /dev/null +++ b/stream/pipe_test.go @@ -0,0 +1,15 @@ +package stream_test + +import ( + "testing" + + "github.com/primetalk/goio/stream" + "github.com/stretchr/testify/assert" +) + +func TestConcatPipes(t *testing.T) { + inc := stream.MapPipe(func(i int) int { return i + 1 }) + dec := stream.MapPipe(func(i int) int { return i - 1 }) + nop := stream.ConcatPipes(inc, dec) + assert.ElementsMatch(t, nats10Values, UnsafeStreamToSlice(t, nop(nats10))) +}