From 538a04e65dfbd18782bf7f35742893167a393894 Mon Sep 17 00:00:00 2001 From: Arseniy Zhizhelev Date: Fri, 19 Aug 2022 09:48:56 +0300 Subject: [PATCH] Add Collector type --- stream/execution.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/stream/execution.go b/stream/execution.go index 2acbd52..03687d8 100644 --- a/stream/execution.go +++ b/stream/execution.go @@ -8,6 +8,9 @@ import ( "github.com/primetalk/goio/io" ) +// Collector reads the stream and produces some value. +type Collector[A any, B any] func(Stream[A]) io.IO[B] + // Collect collects all element from the stream and for each element invokes // the provided function func Collect[A any](stm Stream[A], collector func(A) error) io.IO[fun.Unit] { @@ -87,8 +90,8 @@ func Head[A any](stm Stream[A]) io.IO[A] { // Partition divides the stream into two that are handled independently. func Partition[A any, C any, D any](stm Stream[A], predicate func(A) bool, - trueHandler func(Stream[A]) io.IO[C], - falseHandler func(Stream[A]) io.IO[D], + trueHandler Collector[A, C], + falseHandler Collector[A, D], ) io.IO[fun.Pair[C, D]] { eithersIO := FanOut(stm, func(stm Stream[A]) io.IO[either.Either[C, D]] {