From d1f71af6c880a42159e2077c7e522708267a6225 Mon Sep 17 00:00:00 2001 From: David Parrish Date: Wed, 16 Jan 2019 15:57:42 +1100 Subject: [PATCH] Use a semaphore instead of WaitGroup for FlatMap parallelism. The WaitGroup method used has an uneven work load distribution. It will create up to `maxInParallel` goroutines and then wait for them ALL to complete before starting another `maxInParallel` goroutines and so on. Using a semaphore instead, it will try to keep up to `maxInParallel` goroutines processing all at the same time. --- flatmap.go | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/flatmap.go b/flatmap.go index 5c3d1862..dfcb92f8 100644 --- a/flatmap.go +++ b/flatmap.go @@ -1,9 +1,10 @@ package rxgo import ( - "sync" + "context" "github.com/reactivex/rxgo/handlers" + "golang.org/x/sync/semaphore" ) // transforms emitted items into observables and flattens them into single observable. @@ -32,36 +33,26 @@ func (o *observable) flatMap( } func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) { - var ( - sequence Observable - wg sync.WaitGroup - count uint - ) + ctx := context.TODO() + sem := semaphore.NewWeighted(int64(maxInParallel)) defer close(out) emissionObserver := newFlattenEmissionObserver(out) - count = 0 - for { element, err := o.Next() if err != nil { break } - sequence = apply(element) - count++ - wg.Add(1) + sequence := apply(element) + sem.Acquire(ctx, 1) go func() { - defer wg.Done() + defer sem.Release(1) sequence.Subscribe(emissionObserver).Block() }() - - if count%maxInParallel == 0 { - wg.Wait() - } } - wg.Wait() + sem.Acquire(ctx, int64(maxInParallel)) } func newFlattenEmissionObserver(out chan interface{}) Observer {