Skip to content

Commit

Permalink
flatmap operator
Browse files Browse the repository at this point in the history
  • Loading branch information
venth committed Mar 13, 2018
1 parent a0cc4da commit 3d93883
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 0 deletions.
62 changes: 62 additions & 0 deletions observable/flatmap.go
@@ -0,0 +1,62 @@
package observable

import (
"github.com/reactivex/rxgo/observer"
"github.com/reactivex/rxgo/handlers"
"sync"
)

func (o Observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable {
return o.flatMap(apply, maxInParallel, flatObservedSequence)
}

func (o Observable) flatMap(
apply func(interface{}) Observable,
maxInParallel uint,
flatteningFunc func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint)) Observable {

out := make(chan interface{})

if maxInParallel < 0 {
maxInParallel = 1
}

go flatObservedSequence(out, o, apply, maxInParallel)

return Observable(out)
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
var (
sequence Observable
wg sync.WaitGroup
count uint
)

defer close(out)
emissionObserver := newFlattenEmissionObserver(out)

count = 0
for element := range o {
sequence = apply(element)
count++
wg.Add(1)
go func() {
defer wg.Done()
<-(sequence.Subscribe(*emissionObserver))
}()

if count%maxInParallel == 0 {
wg.Wait()
}
}

wg.Wait()
}

func newFlattenEmissionObserver(out chan interface{}) *observer.Observer {
ob := observer.New(handlers.NextFunc(func(element interface{}) {
out <- element
}))
return &ob
}
124 changes: 124 additions & 0 deletions observable/flatmap_test.go
@@ -0,0 +1,124 @@
package observable

import (
"testing"
"github.com/reactivex/rxgo/observer"
"github.com/reactivex/rxgo/handlers"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/assert"
)

func TestFlatMapCompletesWhenSequenceIsEmpty(t *testing.T) {
// given
emissionObserver := newObserverMock()

// and empty sequence
sequence := Empty()

// and flattens the sequence with identity
sequence = sequence.FlatMap(identity, 1)

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes without any emission
emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything)
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertCalled(t, "OnDone")
}

func TestFlatMapReturnsSameElementBecauseIdentifyApplied(t *testing.T) {
// given
emissionObserver := newObserverMock()

// and sequence containing one element
element := 1
sequence := Just(element)

// and flattens the sequence with identity
sequence = sequence.FlatMap(identity, 1)

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes with emission of the same element
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertCalled(t, "OnNext", element)
emissionObserver.AssertCalled(t, "OnDone")
}

func TestFlatMapUsesForParallelProcessingAtLeast1Process(t *testing.T) {
// given
emissionObserver := newObserverMock()

// and
var maxInParallel uint = 0

// and
var requestedMaxInParallel uint = 0
flatteningFuncMock := func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
requestedMaxInParallel = maxInParallel
flatObservedSequence(out, o, apply, maxInParallel)
}

// and flattens the sequence with identity
sequence := someSequence.flatMap(identity, maxInParallel, flatteningFuncMock)

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes with emission of the same element
assert.Equal(t, 1, requestedMaxInParallel)

}

func newObserverMock() *ObserverMock {
obMock := new(ObserverMock)
obMock.On("OnDone").Return()
obMock.On("OnNext", mock.Anything).Return()
obMock.On("OnError", mock.Anything).Return()
return obMock
}

var (
someElement = "some element"
someSequence = Just(someElement)
)

type ObserverMock struct {
mock.Mock
}

// OnDone provides a mock function with given fields:
func (m *ObserverMock) OnDone() {
m.Called()
}

// OnError provides a mock function with given fields: err
func (m *ObserverMock) OnError(err error) {
m.Called(err)
}

// OnNext provides a mock function with given fields: item
func (m *ObserverMock) OnNext(item interface{}) {
m.Called(item)
}

func (m *ObserverMock) Capture() observer.Observer {
ob := observer.New(
handlers.NextFunc(func(el interface{}) {
m.OnNext(el)
}),
handlers.ErrFunc(func(err error) {
m.OnError(err)
}),
handlers.DoneFunc(func() {
m.OnDone()
}),
)
return ob
}

func identity(el interface{}) Observable {
return Just(el)
}

0 comments on commit 3d93883

Please sign in to comment.