From 3d938836c9054024f833f6d1a781fe1c95bae445 Mon Sep 17 00:00:00 2001 From: Artur Krysiak Date: Mon, 12 Mar 2018 20:52:17 +0100 Subject: [PATCH] flatmap operator --- observable/flatmap.go | 62 +++++++++++++++++++ observable/flatmap_test.go | 124 +++++++++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 observable/flatmap.go create mode 100644 observable/flatmap_test.go diff --git a/observable/flatmap.go b/observable/flatmap.go new file mode 100644 index 00000000..954c38fc --- /dev/null +++ b/observable/flatmap.go @@ -0,0 +1,62 @@ +package observable + +import ( + "github.com/reactivex/rxgo/observer" + "github.com/reactivex/rxgo/handlers" + "sync" +) + +func (o Observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable { + return o.flatMap(apply, maxInParallel, flatObservedSequence) +} + +func (o Observable) flatMap( + apply func(interface{}) Observable, + maxInParallel uint, + flatteningFunc func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint)) Observable { + + out := make(chan interface{}) + + if maxInParallel < 0 { + maxInParallel = 1 + } + + go flatObservedSequence(out, o, apply, maxInParallel) + + return Observable(out) +} + +func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) { + var ( + sequence Observable + wg sync.WaitGroup + count uint + ) + + defer close(out) + emissionObserver := newFlattenEmissionObserver(out) + + count = 0 + for element := range o { + sequence = apply(element) + count++ + wg.Add(1) + go func() { + defer wg.Done() + <-(sequence.Subscribe(*emissionObserver)) + }() + + if count%maxInParallel == 0 { + wg.Wait() + } + } + + wg.Wait() +} + +func newFlattenEmissionObserver(out chan interface{}) *observer.Observer { + ob := observer.New(handlers.NextFunc(func(element interface{}) { + out <- element + })) + return &ob +} diff --git a/observable/flatmap_test.go b/observable/flatmap_test.go new file mode 100644 index 00000000..57052d1a --- /dev/null +++ b/observable/flatmap_test.go @@ -0,0 +1,124 @@ +package observable + +import ( + "testing" + "github.com/reactivex/rxgo/observer" + "github.com/reactivex/rxgo/handlers" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/assert" +) + +func TestFlatMapCompletesWhenSequenceIsEmpty(t *testing.T) { + // given + emissionObserver := newObserverMock() + + // and empty sequence + sequence := Empty() + + // and flattens the sequence with identity + sequence = sequence.FlatMap(identity, 1) + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes without any emission + emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything) + emissionObserver.AssertNotCalled(t, "OnError", mock.Anything) + emissionObserver.AssertCalled(t, "OnDone") +} + +func TestFlatMapReturnsSameElementBecauseIdentifyApplied(t *testing.T) { + // given + emissionObserver := newObserverMock() + + // and sequence containing one element + element := 1 + sequence := Just(element) + + // and flattens the sequence with identity + sequence = sequence.FlatMap(identity, 1) + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes with emission of the same element + emissionObserver.AssertNotCalled(t, "OnError", mock.Anything) + emissionObserver.AssertCalled(t, "OnNext", element) + emissionObserver.AssertCalled(t, "OnDone") +} + +func TestFlatMapUsesForParallelProcessingAtLeast1Process(t *testing.T) { + // given + emissionObserver := newObserverMock() + + // and + var maxInParallel uint = 0 + + // and + var requestedMaxInParallel uint = 0 + flatteningFuncMock := func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) { + requestedMaxInParallel = maxInParallel + flatObservedSequence(out, o, apply, maxInParallel) + } + + // and flattens the sequence with identity + sequence := someSequence.flatMap(identity, maxInParallel, flatteningFuncMock) + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes with emission of the same element + assert.Equal(t, 1, requestedMaxInParallel) + +} + +func newObserverMock() *ObserverMock { + obMock := new(ObserverMock) + obMock.On("OnDone").Return() + obMock.On("OnNext", mock.Anything).Return() + obMock.On("OnError", mock.Anything).Return() + return obMock +} + +var ( + someElement = "some element" + someSequence = Just(someElement) +) + +type ObserverMock struct { + mock.Mock +} + +// OnDone provides a mock function with given fields: +func (m *ObserverMock) OnDone() { + m.Called() +} + +// OnError provides a mock function with given fields: err +func (m *ObserverMock) OnError(err error) { + m.Called(err) +} + +// OnNext provides a mock function with given fields: item +func (m *ObserverMock) OnNext(item interface{}) { + m.Called(item) +} + +func (m *ObserverMock) Capture() observer.Observer { + ob := observer.New( + handlers.NextFunc(func(el interface{}) { + m.OnNext(el) + }), + handlers.ErrFunc(func(err error) { + m.OnError(err) + }), + handlers.DoneFunc(func() { + m.OnDone() + }), + ) + return ob +} + +func identity(el interface{}) Observable { + return Just(el) +}