Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions flatmap.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package rxgo

import (
"sync"
"context"

"github.com/reactivex/rxgo/handlers"
"golang.org/x/sync/semaphore"
)

// transforms emitted items into observables and flattens them into single observable.
Expand Down Expand Up @@ -32,36 +33,26 @@ func (o *observable) flatMap(
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
var (
sequence Observable
wg sync.WaitGroup
count uint
)
ctx := context.TODO()
sem := semaphore.NewWeighted(int64(maxInParallel))

defer close(out)
emissionObserver := newFlattenEmissionObserver(out)

count = 0

for {
element, err := o.Next()
if err != nil {
break
}
sequence = apply(element)
count++
wg.Add(1)
sequence := apply(element)
sem.Acquire(ctx, 1)
go func() {
defer wg.Done()
defer sem.Release(1)
sequence.Subscribe(emissionObserver).Block()
}()

if count%maxInParallel == 0 {
wg.Wait()
}
}

wg.Wait()
sem.Acquire(ctx, int64(maxInParallel))
}

func newFlattenEmissionObserver(out chan interface{}) Observer {
Expand Down