Skip to content

Commit

Permalink
Merge pull request #62 from Primetalk/pipe-concat
Browse files Browse the repository at this point in the history
ConcatPipes
  • Loading branch information
Primetalk committed Aug 18, 2022
2 parents d401758 + f109957 commit 295ffa5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -348,8 +348,8 @@ Sink is a Pipe that returns a stream of units. That stream could be drained afte
- `stream.NewSink[A any](f func(a A)) Sink[A]`
- `stream.Through[A any, B any](stm Stream[A], pipe Pipe[A, B]) Stream[B]`
- `stream.ThroughPipeEval[A any, B any](stm Stream[A], pipeIO io.IO[Pipe[A, B]]) Stream[B]` - ThroughPipeEval runs the given stream through pipe that is returned by the provided pipeIO.

- `stream.ToSink[A any](stm Stream[A], sink Sink[A]) Stream[fun.Unit]`
- `stream.ConcatPipes[A any, B any, C any](pipe1 Pipe[A, B], pipe2 Pipe[B, C]) Pipe[A, C]` - ConcatPipes connects two pipes into one.

### Length manipulation

Expand Down
9 changes: 9 additions & 0 deletions stream/common_test.go
Expand Up @@ -2,8 +2,11 @@ package stream_test

import (
"fmt"
"testing"

"github.com/primetalk/goio/io"
"github.com/primetalk/goio/stream"
"github.com/stretchr/testify/assert"
)

var nats = stream.Unfold(0, func(s int) int {
Expand All @@ -16,3 +19,9 @@ var Mul2 = stream.MapPipe(func(i int) int { return i * 2 })
var pipeMul2IO = stream.PipeToPairOfChannels(Mul2)

var printInt = stream.NewSink(func(i int) { fmt.Printf("%d", i) })

func UnsafeStreamToSlice[A any](t *testing.T, stm stream.Stream[A]) []A {
res, err1 := io.UnsafeRunSync(stream.ToSlice(stm))
assert.NoError(t, err1)
return res
}
7 changes: 7 additions & 0 deletions stream/pipe.go
Expand Up @@ -40,3 +40,10 @@ func NewSink[A any](f func(a A)) Sink[A] {
func ToSink[A any](stm Stream[A], sink Sink[A]) Stream[fun.Unit] {
return sink(stm)
}

// ConcatPipes connects two pipes into one.
func ConcatPipes[A any, B any, C any](pipe1 Pipe[A, B], pipe2 Pipe[B, C]) Pipe[A, C] {
return func(sa Stream[A]) Stream[C] {
return pipe2(pipe1(sa))
}
}
15 changes: 15 additions & 0 deletions stream/pipe_test.go
@@ -0,0 +1,15 @@
package stream_test

import (
"testing"

"github.com/primetalk/goio/stream"
"github.com/stretchr/testify/assert"
)

func TestConcatPipes(t *testing.T) {
inc := stream.MapPipe(func(i int) int { return i + 1 })
dec := stream.MapPipe(func(i int) int { return i - 1 })
nop := stream.ConcatPipes(inc, dec)
assert.ElementsMatch(t, nats10Values, UnsafeStreamToSlice(t, nop(nats10)))
}

0 comments on commit 295ffa5

Please sign in to comment.