Skip to content

Commit

Permalink
Merge 4db5779 into 69008a7
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Apr 18, 2019
2 parents 69008a7 + 4db5779 commit 2444799
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 17 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -19,6 +19,7 @@ install:
- go get github.com/gorilla/websocket
- go get github.com/onsi/ginkgo
- go get github.com/onsi/gomega
- go get golang.org/x/sync/semaphore

script:
- go test -v -race -cover -coverprofile=/tmp/coverage.out
Expand Down
25 changes: 8 additions & 17 deletions flatmap.go
@@ -1,9 +1,10 @@
package rxgo

import (
"sync"
"context"

"github.com/reactivex/rxgo/handlers"
"golang.org/x/sync/semaphore"
)

// Transforms emitted items into Observables and flattens them into a single Observable.
Expand All @@ -30,37 +31,27 @@ func (o *observable) flatMap(
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
var (
sequence Observable
wg sync.WaitGroup
count uint
)
ctx := context.TODO()
sem := semaphore.NewWeighted(int64(maxInParallel))

defer close(out)
emissionObserver := newFlattenEmissionObserver(out)

count = 0

it := o.Iterator()
for {
if item, err := it.Next(); err == nil {
sequence = apply(item)
count++
wg.Add(1)
sequence := apply(item)
sem.Acquire(ctx, 1)
go func() {
defer wg.Done()
defer sem.Release(1)
sequence.Subscribe(emissionObserver).Block()
}()

if count%maxInParallel == 0 {
wg.Wait()
}
} else {
break
}
}

wg.Wait()
sem.Acquire(ctx, int64(maxInParallel))
}

func newFlattenEmissionObserver(out chan interface{}) Observer {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -5,4 +5,5 @@ require (
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/stretchr/testify v1.3.0
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
)
1 change: 1 addition & 0 deletions iterator.go
Expand Up @@ -2,6 +2,7 @@ package rxgo

import (
"context"

"github.com/reactivex/rxgo/errors"
)

Expand Down

0 comments on commit 2444799

Please sign in to comment.