Skip to content

Commit

Permalink
Merge f4a32c6 into e715dd8
Browse files Browse the repository at this point in the history
  • Loading branch information
venth committed Nov 3, 2018
2 parents e715dd8 + f4a32c6 commit c80bd5b
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 11 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ _testmain.go
# Dep directories
bower_components
node_modules

# IntelliJ
.idea
*.iml
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,37 @@ func main() {
}
```

FlatMap example:
```go
package main

import (
"fmt"

"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/observable"
"github.com/reactivex/rxgo/observer"
)

func main() {
primeSequence := observable.Just([]int{2, 3, 5, 7, 11, 13})

<-primeSequence.
FlatMap(func(primes interface{}) observable.Observable {
return observable.Create(func(emitter *observer.Observer) {
for _, prime := range primes.([]int) {
emitter.OnNext(prime)
}
emitter.OnDone()
})
}, 1).
Last().
Subscribe(handlers.NextFunc(func(prime interface{}) {
fmt.Println("Prime -> ", prime)
}))
}
```

Please check out the [examples](examples/) to see how it can be applied to reactive applications.

## Recap
Expand Down
31 changes: 31 additions & 0 deletions examples/flatmap/flatmap_slice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package flatmap

import (
"github.com/reactivex/rxgo/observable"
"github.com/reactivex/rxgo/observer"
"testing"
)

func TestFlatMapExample(t *testing.T) {
// given
observerMock := observer.NewObserverMock()

// and
primeSequence := observable.Just([]int{2, 3, 5, 7, 11, 13})

// when
<-primeSequence.
FlatMap(func(primes interface{}) observable.Observable {
return observable.Create(func(emitter *observer.Observer) {
for _, prime := range primes.([]int) {
emitter.OnNext(prime)
}
emitter.OnDone()
})
}, 1).
Last().
Subscribe(observerMock.Capture())

// then
observerMock.AssertCalled(t, "OnNext", 13)
}
34 changes: 34 additions & 0 deletions observable/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package observable

import "github.com/reactivex/rxgo/observer"

// Creates observable from based on source function. Keep it mind to call emitter.OnDone()
// to signal sequence's end.
// Example:
// - emitting none elements
// observable.Create(emitter *observer.Observer) { emitter.OnDone() })
// - emitting one element
// observable.Create(func(emitter chan interface{}) {
// emitter.OnNext("one element")
// emitter.OnDone()
// })
func Create(source func(emitter *observer.Observer)) Observable {
emitted := make(chan interface{})
emitter := &observer.Observer{
NextHandler: func(el interface{}) {
emitted <- el
},
ErrHandler: func(err error) {
// decide how to deal with errors
},
DoneHandler: func() {
close(emitted)
},
}

go func() {
source(emitter)
}()

return emitted
}
65 changes: 65 additions & 0 deletions observable/flatmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package observable

import (
"github.com/reactivex/rxgo/observer"
"github.com/reactivex/rxgo/handlers"
"sync"
)

// transforms emitted items into observables and flattens them into single observable.
// maxInParallel argument controls how many transformed observables are processed in parallel
// For an example please take a look at flatmap_slice_test.go file in the examples directory.
func (o Observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable {
return o.flatMap(apply, maxInParallel, flatObservedSequence)
}

func (o Observable) flatMap(
apply func(interface{}) Observable,
maxInParallel uint,
flatteningFunc func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint)) Observable {

out := make(chan interface{})

if maxInParallel < 1 {
maxInParallel = 1
}

go flatteningFunc(out, o, apply, maxInParallel)

return Observable(out)
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
var (
sequence Observable
wg sync.WaitGroup
count uint
)

defer close(out)
emissionObserver := newFlattenEmissionObserver(out)

count = 0
for element := range o {
sequence = apply(element)
count++
wg.Add(1)
go func() {
defer wg.Done()
<-(sequence.Subscribe(*emissionObserver))
}()

if count%maxInParallel == 0 {
wg.Wait()
}
}

wg.Wait()
}

func newFlattenEmissionObserver(out chan interface{}) *observer.Observer {
ob := observer.New(handlers.NextFunc(func(element interface{}) {
out <- element
}))
return &ob
}
112 changes: 112 additions & 0 deletions observable/flatmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package observable

import (
"testing"
"github.com/reactivex/rxgo/observer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/assert"
)

func TestFlatMapCompletesWhenSequenceIsEmpty(t *testing.T) {
// given
emissionObserver := observer.NewObserverMock()

// and empty sequence
sequence := Empty()

// and flattens the sequence with identity
sequence = sequence.FlatMap(identity, 1)

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes without any emission
emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything)
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertCalled(t, "OnDone")
}

func TestFlatMapReturnsSameElementBecauseIdentifyApplied(t *testing.T) {
// given
emissionObserver := observer.NewObserverMock()

// and sequence containing one element
element := 1
sequence := Just(element)

// and flattens the sequence with identity
sequence = sequence.FlatMap(identity, 1)

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes with emission of the same element
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertCalled(t, "OnNext", element)
emissionObserver.AssertCalled(t, "OnDone")
}

func TestFlatMapReturnsSliceElements(t *testing.T) {
// given
emissionObserver := observer.NewObserverMock()

// and sequence containing slice with few elements
element1 := "element1"
element2 := "element2"
element3 := "element3"
slice := &([]string{element1, element2, element3})
sequence := Just(slice)

// and flattens the sequence with identity
sequence = sequence.FlatMap(flattenThreeElementSlice, 1)

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes with emission of flatten elements
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertNotCalled(t, "OnNext", slice)
emissionObserver.AssertCalled(t, "OnNext", element1)
emissionObserver.AssertCalled(t, "OnNext", element2)
emissionObserver.AssertCalled(t, "OnNext", element3)
emissionObserver.AssertCalled(t, "OnDone")
}

func TestFlatMapUsesForParallelProcessingAtLeast1Process(t *testing.T) {
// given
emissionObserver := observer.NewObserverMock()

// and
var maxInParallel uint = 0

// and
var requestedMaxInParallel uint = 0
flatteningFuncMock := func(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
requestedMaxInParallel = maxInParallel
flatObservedSequence(out, o, apply, maxInParallel)
}

// and flattens the sequence with identity
sequence := someSequence.flatMap(identity, maxInParallel, flatteningFuncMock)

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes with emission of the same element
assert.Equal(t, uint(1), requestedMaxInParallel)

}

var (
someElement = "some element"
someSequence = Just(someElement)
)

func identity(el interface{}) Observable {
return Just(el)
}

func flattenThreeElementSlice(el interface{}) Observable {
slice := *(el.(*[]string))
return Just(slice[0], slice[1], slice[2])
}
11 changes: 5 additions & 6 deletions observable/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ func (o Observable) SkipLast(nth uint) Observable {
buf := make(chan interface{}, nth)
for item := range o {
select {
case buf <- item:
default:
out <- (<- buf)
buf <- item
case buf <- item:
default:
out <- (<-buf)
buf <- item
}
}
close(buf)
Expand All @@ -256,7 +256,6 @@ func (o Observable) SkipLast(nth uint) Observable {
return Observable(out)
}


// Scan applies ScannableFunc predicate to each item in the original
// Observable sequentially and emits each successive value on a new Observable.
func (o Observable) Scan(apply fx.ScannableFunc) Observable {
Expand Down Expand Up @@ -322,7 +321,7 @@ func Interval(term chan struct{}, interval time.Duration) Observable {
// Repeat creates an Observable emitting a given item repeatedly
func Repeat(item interface{}, ntimes ...int) Observable {
source := make(chan interface{})

// this is the infinity case no ntime parameter is given
if len(ntimes) == 0 {
go func() {
Expand Down
26 changes: 21 additions & 5 deletions observable/observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/reactivex/rxgo/observer"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestDefaultObservable(t *testing.T) {
Expand Down Expand Up @@ -614,7 +615,7 @@ func TestObservableSkip(t *testing.T) {
sub := stream2.Subscribe(onNext)
<-sub

assert.Exactly(t, []int{5,1,8}, nums)
assert.Exactly(t, []int{5, 1, 8}, nums)
}

func TestObservableSkipWithEmpty(t *testing.T) {
Expand All @@ -632,7 +633,7 @@ func TestObservableSkipWithEmpty(t *testing.T) {
sub := stream2.Subscribe(onNext)
<-sub

assert.Exactly(t, []int{}, nums)
assert.Exactly(t, []int{}, nums)
}

func TestObservableSkipLast(t *testing.T) {
Expand All @@ -656,7 +657,7 @@ func TestObservableSkipLast(t *testing.T) {
sub := stream2.Subscribe(onNext)
<-sub

assert.Exactly(t, []int{0, 1, 3}, nums)
assert.Exactly(t, []int{0, 1, 3}, nums)
}

func TestObservableSkipLastWithEmpty(t *testing.T) {
Expand All @@ -674,10 +675,9 @@ func TestObservableSkipLastWithEmpty(t *testing.T) {
sub := stream2.Subscribe(onNext)
<-sub

assert.Exactly(t, []int{}, nums)
assert.Exactly(t, []int{}, nums)
}


func TestObservableDistinct(t *testing.T) {
items := []interface{}{1, 2, 2, 1, 3}
it, err := iterable.New(items)
Expand Down Expand Up @@ -909,3 +909,19 @@ func TestRepeatWithNegativeTimesOperator(t *testing.T) {

assert.Exactly(t, []string{"end"}, stringarray)
}

func TestEmptyCompletesSequence(t *testing.T) {
// given
emissionObserver := observer.NewObserverMock()

// and empty sequence
sequence := Empty()

// when subscribes to the sequence
<-sequence.Subscribe(emissionObserver.Capture())

// then completes without any emission
emissionObserver.AssertNotCalled(t, "OnNext", mock.Anything)
emissionObserver.AssertNotCalled(t, "OnError", mock.Anything)
emissionObserver.AssertCalled(t, "OnDone")
}

0 comments on commit c80bd5b

Please sign in to comment.