Skip to content

Commit

Permalink
flatmap documentation and example; create operator to ease implementa…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
venth committed Nov 3, 2018
1 parent d78dde8 commit f4a32c6
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,37 @@ func main() {
}
```

FlatMap example:
```go
package main

import (
"fmt"

"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/observable"
"github.com/reactivex/rxgo/observer"
)

func main() {
primeSequence := observable.Just([]int{2, 3, 5, 7, 11, 13})

<-primeSequence.
FlatMap(func(primes interface{}) observable.Observable {
return observable.Create(func(emitter *observer.Observer) {
for _, prime := range primes.([]int) {
emitter.OnNext(prime)
}
emitter.OnDone()
})
}, 1).
Last().
Subscribe(handlers.NextFunc(func(prime interface{}) {
fmt.Println("Prime -> ", prime)
}))
}
```

Please check out the [examples](examples/) to see how it can be applied to reactive applications.

## Recap
Expand Down
31 changes: 31 additions & 0 deletions examples/flatmap/flatmap_slice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package flatmap

import (
"github.com/reactivex/rxgo/observable"
"github.com/reactivex/rxgo/observer"
"testing"
)

func TestFlatMapExample(t *testing.T) {
// given
observerMock := observer.NewObserverMock()

// and
primeSequence := observable.Just([]int{2, 3, 5, 7, 11, 13})

// when
<-primeSequence.
FlatMap(func(primes interface{}) observable.Observable {
return observable.Create(func(emitter *observer.Observer) {
for _, prime := range primes.([]int) {
emitter.OnNext(prime)
}
emitter.OnDone()
})
}, 1).
Last().
Subscribe(observerMock.Capture())

// then
observerMock.AssertCalled(t, "OnNext", 13)
}
34 changes: 34 additions & 0 deletions observable/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package observable

import "github.com/reactivex/rxgo/observer"

// Creates observable from based on source function. Keep it mind to call emitter.OnDone()
// to signal sequence's end.
// Example:
// - emitting none elements
// observable.Create(emitter *observer.Observer) { emitter.OnDone() })
// - emitting one element
// observable.Create(func(emitter chan interface{}) {
// emitter.OnNext("one element")
// emitter.OnDone()
// })
func Create(source func(emitter *observer.Observer)) Observable {
emitted := make(chan interface{})
emitter := &observer.Observer{
NextHandler: func(el interface{}) {
emitted <- el
},
ErrHandler: func(err error) {
// decide how to deal with errors
},
DoneHandler: func() {
close(emitted)
},
}

go func() {
source(emitter)
}()

return emitted
}
3 changes: 3 additions & 0 deletions observable/flatmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"sync"
)

// transforms emitted items into observables and flattens them into single observable.
// maxInParallel argument controls how many transformed observables are processed in parallel
// For an example please take a look at flatmap_slice_test.go file in the examples directory.
func (o Observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable {
return o.flatMap(apply, maxInParallel, flatObservedSequence)
}
Expand Down

0 comments on commit f4a32c6

Please sign in to comment.