From 97691defc3811198e14d1fb7a49a35575cacb75d Mon Sep 17 00:00:00 2001 From: Artur Krysiak Date: Mon, 12 Mar 2018 20:52:32 +0100 Subject: [PATCH 1/3] ignored intellij related files --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 6f957ffe..769728aa 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,7 @@ _testmain.go # Dep directories bower_components node_modules + +# IntelliJ +.idea +*.iml From d78dde86e89cf06a073bb304c5f3d067836dd45f Mon Sep 17 00:00:00 2001 From: Artur Krysiak Date: Mon, 12 Mar 2018 20:52:17 +0100 Subject: [PATCH 2/3] flatmap operator --- observable/flatmap.go | 62 +++++++++++++++++++ observable/flatmap_test.go | 112 ++++++++++++++++++++++++++++++++++ observable/observable.go | 11 ++-- observable/observable_test.go | 26 ++++++-- observer/observer_mock.go | 48 +++++++++++++++ 5 files changed, 248 insertions(+), 11 deletions(-) create mode 100644 observable/flatmap.go create mode 100644 observable/flatmap_test.go create mode 100644 observer/observer_mock.go diff --git a/observable/flatmap.go b/observable/flatmap.go new file mode 100644 index 00000000..f3f7f9ef --- /dev/null +++ b/observable/flatmap.go @@ -0,0 +1,62 @@ +package observable + +import ( + "github.com/reactivex/rxgo/observer" + "github.com/reactivex/rxgo/handlers" + "sync" +) + +func (o Observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable { + return o.flatMap(apply, maxInParallel, flatObservedSequence) +} + +func (o Observable) flatMap( + apply func(interface{}) Observable, + maxInParallel uint, + flatteningFunc func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint)) Observable { + + out := make(chan interface{}) + + if maxInParallel < 1 { + maxInParallel = 1 + } + + go flatteningFunc(out, o, apply, maxInParallel) + + return Observable(out) +} + +func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) { + var ( + sequence Observable + wg sync.WaitGroup + count uint + ) + + defer close(out) + emissionObserver := newFlattenEmissionObserver(out) + + count = 0 + for element := range o { + sequence = apply(element) + count++ + wg.Add(1) + go func() { + defer wg.Done() + <-(sequence.Subscribe(*emissionObserver)) + }() + + if count%maxInParallel == 0 { + wg.Wait() + } + } + + wg.Wait() +} + +func newFlattenEmissionObserver(out chan interface{}) *observer.Observer { + ob := observer.New(handlers.NextFunc(func(element interface{}) { + out <- element + })) + return &ob +} diff --git a/observable/flatmap_test.go b/observable/flatmap_test.go new file mode 100644 index 00000000..053d6a98 --- /dev/null +++ b/observable/flatmap_test.go @@ -0,0 +1,112 @@ +package observable + +import ( + "testing" + "github.com/reactivex/rxgo/observer" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/assert" +) + +func TestFlatMapCompletesWhenSequenceIsEmpty(t *testing.T) { + // given + emissionObserver := observer.NewObserverMock() + + // and empty sequence + sequence := Empty() + + // and flattens the sequence with identity + sequence = sequence.FlatMap(identity, 1) + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes without any emission + emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything) + emissionObserver.AssertNotCalled(t, "OnError", mock.Anything) + emissionObserver.AssertCalled(t, "OnDone") +} + +func TestFlatMapReturnsSameElementBecauseIdentifyApplied(t *testing.T) { + // given + emissionObserver := observer.NewObserverMock() + + // and sequence containing one element + element := 1 + sequence := Just(element) + + // and flattens the sequence with identity + sequence = sequence.FlatMap(identity, 1) + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes with emission of the same element + emissionObserver.AssertNotCalled(t, "OnError", mock.Anything) + emissionObserver.AssertCalled(t, "OnNext", element) + emissionObserver.AssertCalled(t, "OnDone") +} + +func TestFlatMapReturnsSliceElements(t *testing.T) { + // given + emissionObserver := observer.NewObserverMock() + + // and sequence containing slice with few elements + element1 := "element1" + element2 := "element2" + element3 := "element3" + slice := &([]string{element1, element2, element3}) + sequence := Just(slice) + + // and flattens the sequence with identity + sequence = sequence.FlatMap(flattenThreeElementSlice, 1) + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes with emission of flatten elements + emissionObserver.AssertNotCalled(t, "OnError", mock.Anything) + emissionObserver.AssertNotCalled(t, "OnNext", slice) + emissionObserver.AssertCalled(t, "OnNext", element1) + emissionObserver.AssertCalled(t, "OnNext", element2) + emissionObserver.AssertCalled(t, "OnNext", element3) + emissionObserver.AssertCalled(t, "OnDone") +} + +func TestFlatMapUsesForParallelProcessingAtLeast1Process(t *testing.T) { + // given + emissionObserver := observer.NewObserverMock() + + // and + var maxInParallel uint = 0 + + // and + var requestedMaxInParallel uint = 0 + flatteningFuncMock := func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) { + requestedMaxInParallel = maxInParallel + flatObservedSequence(out, o, apply, maxInParallel) + } + + // and flattens the sequence with identity + sequence := someSequence.flatMap(identity, maxInParallel, flatteningFuncMock) + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes with emission of the same element + assert.Equal(t, uint(1), requestedMaxInParallel) + +} + +var ( + someElement = "some element" + someSequence = Just(someElement) +) + +func identity(el interface{}) Observable { + return Just(el) +} + +func flattenThreeElementSlice(el interface{}) Observable { + slice := *(el.(*[]string)) + return Just(slice[0], slice[1], slice[2]) +} diff --git a/observable/observable.go b/observable/observable.go index 40d950c0..e84d7002 100644 --- a/observable/observable.go +++ b/observable/observable.go @@ -244,10 +244,10 @@ func (o Observable) SkipLast(nth uint) Observable { buf := make(chan interface{}, nth) for item := range o { select { - case buf <- item: - default: - out <- (<- buf) - buf <- item + case buf <- item: + default: + out <- (<-buf) + buf <- item } } close(buf) @@ -256,7 +256,6 @@ func (o Observable) SkipLast(nth uint) Observable { return Observable(out) } - // Scan applies ScannableFunc predicate to each item in the original // Observable sequentially and emits each successive value on a new Observable. func (o Observable) Scan(apply fx.ScannableFunc) Observable { @@ -322,7 +321,7 @@ func Interval(term chan struct{}, interval time.Duration) Observable { // Repeat creates an Observable emitting a given item repeatedly func Repeat(item interface{}, ntimes ...int) Observable { source := make(chan interface{}) - + // this is the infinity case no ntime parameter is given if len(ntimes) == 0 { go func() { diff --git a/observable/observable_test.go b/observable/observable_test.go index aed9f83a..0c5642aa 100644 --- a/observable/observable_test.go +++ b/observable/observable_test.go @@ -12,6 +12,7 @@ import ( "github.com/reactivex/rxgo/observer" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestDefaultObservable(t *testing.T) { @@ -614,7 +615,7 @@ func TestObservableSkip(t *testing.T) { sub := stream2.Subscribe(onNext) <-sub - assert.Exactly(t, []int{5,1,8}, nums) + assert.Exactly(t, []int{5, 1, 8}, nums) } func TestObservableSkipWithEmpty(t *testing.T) { @@ -632,7 +633,7 @@ func TestObservableSkipWithEmpty(t *testing.T) { sub := stream2.Subscribe(onNext) <-sub - assert.Exactly(t, []int{}, nums) + assert.Exactly(t, []int{}, nums) } func TestObservableSkipLast(t *testing.T) { @@ -656,7 +657,7 @@ func TestObservableSkipLast(t *testing.T) { sub := stream2.Subscribe(onNext) <-sub - assert.Exactly(t, []int{0, 1, 3}, nums) + assert.Exactly(t, []int{0, 1, 3}, nums) } func TestObservableSkipLastWithEmpty(t *testing.T) { @@ -674,10 +675,9 @@ func TestObservableSkipLastWithEmpty(t *testing.T) { sub := stream2.Subscribe(onNext) <-sub - assert.Exactly(t, []int{}, nums) + assert.Exactly(t, []int{}, nums) } - func TestObservableDistinct(t *testing.T) { items := []interface{}{1, 2, 2, 1, 3} it, err := iterable.New(items) @@ -909,3 +909,19 @@ func TestRepeatWithNegativeTimesOperator(t *testing.T) { assert.Exactly(t, []string{"end"}, stringarray) } + +func TestEmptyCompletesSequence(t *testing.T) { + // given + emissionObserver := observer.NewObserverMock() + + // and empty sequence + sequence := Empty() + + // when subscribes to the sequence + <-sequence.Subscribe(emissionObserver.Capture()) + + // then completes without any emission + emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything) + emissionObserver.AssertNotCalled(t, "OnError", mock.Anything) + emissionObserver.AssertCalled(t, "OnDone") +} diff --git a/observer/observer_mock.go b/observer/observer_mock.go new file mode 100644 index 00000000..56829bed --- /dev/null +++ b/observer/observer_mock.go @@ -0,0 +1,48 @@ +package observer + +import ( + "github.com/reactivex/rxgo/handlers" + "github.com/stretchr/testify/mock" +) + +func NewObserverMock() *ObserverMock { + obMock := new(ObserverMock) + obMock.On("OnDone").Return() + obMock.On("OnNext", mock.Anything).Return() + obMock.On("OnError", mock.Anything).Return() + return obMock +} + +type ObserverMock struct { + mock.Mock +} + +// OnDone provides a mock function with given fields: +func (m *ObserverMock) OnDone() { + m.Called() +} + +// OnError provides a mock function with given fields: err +func (m *ObserverMock) OnError(err error) { + m.Called(err) +} + +// OnNext provides a mock function with given fields: item +func (m *ObserverMock) OnNext(item interface{}) { + m.Called(item) +} + +func (m *ObserverMock) Capture() Observer { + ob := New( + handlers.NextFunc(func(el interface{}) { + m.OnNext(el) + }), + handlers.ErrFunc(func(err error) { + m.OnError(err) + }), + handlers.DoneFunc(func() { + m.OnDone() + }), + ) + return ob +} From f4a32c69c564a9c408aa0a82f749f735698ea06d Mon Sep 17 00:00:00 2001 From: Artur Krysiak Date: Sat, 3 Nov 2018 16:54:31 +0100 Subject: [PATCH 3/3] flatmap documentation and example; create operator to ease implementation --- README.md | 31 +++++++++++++++++++++++ examples/flatmap/flatmap_slice_test.go | 31 +++++++++++++++++++++++ observable/create.go | 34 ++++++++++++++++++++++++++ observable/flatmap.go | 3 +++ 4 files changed, 99 insertions(+) create mode 100644 examples/flatmap/flatmap_slice_test.go create mode 100644 observable/create.go diff --git a/README.md b/README.md index ee280809..8c83cdc8 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,37 @@ func main() { } ``` +FlatMap example: +```go +package main + +import ( + "fmt" + + "github.com/reactivex/rxgo/handlers" + "github.com/reactivex/rxgo/observable" + "github.com/reactivex/rxgo/observer" +) + +func main() { + primeSequence := observable.Just([]int{2, 3, 5, 7, 11, 13}) + + <-primeSequence. + FlatMap(func(primes interface{}) observable.Observable { + return observable.Create(func(emitter *observer.Observer) { + for _, prime := range primes.([]int) { + emitter.OnNext(prime) + } + emitter.OnDone() + }) + }, 1). + Last(). + Subscribe(handlers.NextFunc(func(prime interface{}) { + fmt.Println("Prime -> ", prime) + })) +} +``` + Please check out the [examples](examples/) to see how it can be applied to reactive applications. ## Recap diff --git a/examples/flatmap/flatmap_slice_test.go b/examples/flatmap/flatmap_slice_test.go new file mode 100644 index 00000000..5e3b7a12 --- /dev/null +++ b/examples/flatmap/flatmap_slice_test.go @@ -0,0 +1,31 @@ +package flatmap + +import ( + "github.com/reactivex/rxgo/observable" + "github.com/reactivex/rxgo/observer" + "testing" +) + +func TestFlatMapExample(t *testing.T) { + // given + observerMock := observer.NewObserverMock() + + // and + primeSequence := observable.Just([]int{2, 3, 5, 7, 11, 13}) + + // when + <-primeSequence. + FlatMap(func(primes interface{}) observable.Observable { + return observable.Create(func(emitter *observer.Observer) { + for _, prime := range primes.([]int) { + emitter.OnNext(prime) + } + emitter.OnDone() + }) + }, 1). + Last(). + Subscribe(observerMock.Capture()) + + // then + observerMock.AssertCalled(t, "OnNext", 13) +} diff --git a/observable/create.go b/observable/create.go new file mode 100644 index 00000000..c4fce5de --- /dev/null +++ b/observable/create.go @@ -0,0 +1,34 @@ +package observable + +import "github.com/reactivex/rxgo/observer" + +// Creates observable from based on source function. Keep it mind to call emitter.OnDone() +// to signal sequence's end. +// Example: +// - emitting none elements +// observable.Create(emitter *observer.Observer) { emitter.OnDone() }) +// - emitting one element +// observable.Create(func(emitter chan interface{}) { +// emitter.OnNext("one element") +// emitter.OnDone() +// }) +func Create(source func(emitter *observer.Observer)) Observable { + emitted := make(chan interface{}) + emitter := &observer.Observer{ + NextHandler: func(el interface{}) { + emitted <- el + }, + ErrHandler: func(err error) { + // decide how to deal with errors + }, + DoneHandler: func() { + close(emitted) + }, + } + + go func() { + source(emitter) + }() + + return emitted +} diff --git a/observable/flatmap.go b/observable/flatmap.go index f3f7f9ef..fa68c868 100644 --- a/observable/flatmap.go +++ b/observable/flatmap.go @@ -6,6 +6,9 @@ import ( "sync" ) +// transforms emitted items into observables and flattens them into single observable. +// maxInParallel argument controls how many transformed observables are processed in parallel +// For an example please take a look at flatmap_slice_test.go file in the examples directory. func (o Observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable { return o.flatMap(apply, maxInParallel, flatObservedSequence) }