Skip to content

Commit

Permalink
Add stream.Partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Primetalk committed Aug 15, 2022
1 parent 894659b commit 6939be6
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
31 changes: 28 additions & 3 deletions stream/execution.go
Expand Up @@ -3,14 +3,15 @@ package stream
import (
"fmt"

"github.com/primetalk/goio/either"
"github.com/primetalk/goio/fun"
"github.com/primetalk/goio/io"
)

// Collect collects all element from the stream and for each element invokes
// the provided function
func Collect[A any](stm Stream[A], collector func(A) error) io.IO[fun.Unit] {
return io.FlatMap[StepResult[A]](
return io.FlatMap(
io.IO[StepResult[A]](stm),
func(sra StepResult[A]) io.IO[fun.Unit] {
if sra.IsFinished {
Expand Down Expand Up @@ -38,7 +39,7 @@ func ForEach[A any](stm Stream[A], collector func(A)) io.IO[fun.Unit] {

// DrainAll executes the stream and throws away all values.
func DrainAll[A any](stm Stream[A]) io.IO[fun.Unit] {
return io.FlatMap[StepResult[A]](
return io.FlatMap(
io.IO[StepResult[A]](stm),
func(sra StepResult[A]) io.IO[fun.Unit] {
if sra.IsFinished {
Expand All @@ -51,7 +52,7 @@ func DrainAll[A any](stm Stream[A]) io.IO[fun.Unit] {

// AppendToSlice executes the stream and appends it's results to the slice.
func AppendToSlice[A any](stm Stream[A], start []A) io.IO[[]A] {
return io.FlatMap[StepResult[A]](
return io.FlatMap(
io.IO[StepResult[A]](stm),
func(sra StepResult[A]) io.IO[[]A] {
if sra.IsFinished {
Expand Down Expand Up @@ -82,3 +83,27 @@ func Head[A any](stm Stream[A]) io.IO[A] {
return
})
}

// Partition divides the stream into two that are handled independently.
func Partition[A any, C any, D any](stm Stream[A],
predicate func(A) bool,
trueHandler func(Stream[A]) io.IO[C],
falseHandler func(Stream[A]) io.IO[D],
) io.IO[fun.Pair[C, D]] {
eithersIO := FanOut(stm,
func(stm Stream[A]) io.IO[either.Either[C, D]] {
return io.Map(trueHandler(Filter(stm, predicate)), either.Left[C, D])
},
func(stm Stream[A]) io.IO[either.Either[C, D]] {
return io.Map(falseHandler(FilterNot(stm, predicate)), either.Right[C, D])
},
)
return io.Map(eithersIO, func(eithers []either.Either[C, D]) (p fun.Pair[C, D]) {
if either.IsLeft(eithers[0]) {
p = fun.NewPair(eithers[0].Left, eithers[1].Right)
} else {
p = fun.NewPair(eithers[1].Left, eithers[0].Right)
}
return
})
}
15 changes: 15 additions & 0 deletions stream/execution_test.go
Expand Up @@ -3,6 +3,7 @@ package stream_test
import (
"testing"

"github.com/primetalk/goio/fun"
"github.com/primetalk/goio/io"
"github.com/primetalk/goio/stream"
"github.com/stretchr/testify/assert"
Expand All @@ -20,3 +21,17 @@ func TestForEach(t *testing.T) {
assert.NoError(t, err)
assert.ElementsMatch(t, []int{2, 4, 8, 16, 32}, is)
}

func TestPartition(t *testing.T) {
cdIO := stream.Partition(nats10, isEven,
func(even stream.Stream[int]) io.IO[int] {
return stream.Head(stream.Sum(even))
},
func(odd stream.Stream[int]) io.IO[string] {
return stream.Head(stream.Map(stream.Sum(odd), fun.ToString[int]))
},
)
res, err := io.UnsafeRunSync(cdIO)
assert.NoError(t, err)
assert.Equal(t, fun.NewPair(30, "25"), res)
}

0 comments on commit 6939be6

Please sign in to comment.