diff --git a/flatmap.go b/flatmap.go index 5c3d1862..dfcb92f8 100644 --- a/flatmap.go +++ b/flatmap.go @@ -1,9 +1,10 @@ package rxgo import ( - "sync" + "context" "github.com/reactivex/rxgo/handlers" + "golang.org/x/sync/semaphore" ) // transforms emitted items into observables and flattens them into single observable. @@ -32,36 +33,26 @@ func (o *observable) flatMap( } func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) { - var ( - sequence Observable - wg sync.WaitGroup - count uint - ) + ctx := context.TODO() + sem := semaphore.NewWeighted(int64(maxInParallel)) defer close(out) emissionObserver := newFlattenEmissionObserver(out) - count = 0 - for { element, err := o.Next() if err != nil { break } - sequence = apply(element) - count++ - wg.Add(1) + sequence := apply(element) + sem.Acquire(ctx, 1) go func() { - defer wg.Done() + defer sem.Release(1) sequence.Subscribe(emissionObserver).Block() }() - - if count%maxInParallel == 0 { - wg.Wait() - } } - wg.Wait() + sem.Acquire(ctx, int64(maxInParallel)) } func newFlattenEmissionObserver(out chan interface{}) Observer {