Skip to content

Commit

Permalink
Merge pull request #2 from ReactiveX/v2
Browse files Browse the repository at this point in the history
V2
  • Loading branch information
MarekLabuz committed Feb 17, 2019
2 parents 6007666 + e93b104 commit 21c5135
Show file tree
Hide file tree
Showing 16 changed files with 1,322 additions and 1,218 deletions.
16 changes: 12 additions & 4 deletions connectableobservable.go
Expand Up @@ -8,21 +8,28 @@ import (
)

type ConnectableObservable interface {
Iterable
Connect() Observer
Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer
}

type connectableObservable struct {
iterator Iterator
observable Observable
observers []Observer
}

func NewConnectableObservable(observable Observable) ConnectableObservable {
func newConnectableObservableFromObservable(observable Observable) ConnectableObservable {
return &connectableObservable{
observable: observable,
iterator: observable.Iterator(),
}
}

func (c *connectableObservable) Iterator() Iterator {
return c.iterator
}

func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer {
ob := CheckEventHandler(handler)
c.observers = append(c.observers, ob)
Expand All @@ -32,12 +39,13 @@ func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ..
func (c *connectableObservable) Connect() Observer {
source := make([]interface{}, 0)

it := c.iterator
for {
item, err := c.observable.Next()
if err != nil {
if item, err := it.Next(); err == nil {
source = append(source, item)
} else {
break
}
source = append(source, item)
}

var wg sync.WaitGroup
Expand Down
37 changes: 18 additions & 19 deletions flatmap.go
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/reactivex/rxgo/handlers"
)

// transforms emitted items into observables and flattens them into single observable.
// maxInParallel argument controls how many transformed observables are processed in parallel
// For an example please take a look at flatmap_slice_test.go file in the examples directory.
// Transforms emitted items into Observables and flattens them into a single Observable.
// The maxInParallel argument controls how many transformed Observables are processed in parallel.
// For an example, please take a look at flatmap_slice_test.go in the examples directory.
func (o *observable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable {
return o.flatMap(apply, maxInParallel, flatObservedSequence)
}
Expand All @@ -26,9 +26,7 @@ func (o *observable) flatMap(

go flatteningFunc(out, o, apply, maxInParallel)

return &observable{
ch: out,
}
return newObservableFromChannel(out)
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
Expand All @@ -43,21 +41,22 @@ func flatObservedSequence(out chan interface{}, o Observable, apply func(interfa

count = 0

it := o.Iterator()
for {
element, err := o.Next()
if err != nil {
break
}
sequence = apply(element)
count++
wg.Add(1)
go func() {
defer wg.Done()
sequence.Subscribe(emissionObserver).Block()
}()
if item, err := it.Next(); err == nil {
sequence = apply(item)
count++
wg.Add(1)
go func() {
defer wg.Done()
sequence.Subscribe(emissionObserver).Block()
}()

if count%maxInParallel == 0 {
wg.Wait()
if count%maxInParallel == 0 {
wg.Wait()
}
} else {
break
}
}

Expand Down
6 changes: 6 additions & 0 deletions go.mod
@@ -0,0 +1,6 @@
module github.com/reactivex/rxgo

require (
github.com/gorilla/websocket v1.4.0
github.com/stretchr/testify v1.3.0
)
10 changes: 10 additions & 0 deletions go.sum
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
65 changes: 65 additions & 0 deletions iterable.go
@@ -0,0 +1,65 @@
package rxgo

type Iterable interface {
Iterator() Iterator
}

type iterableFromChannel struct {
ch chan interface{}
}

type iterableFromSlice struct {
s []interface{}
}

type iterableFromRange struct {
start int
count int
}

type iterableFromFunc struct {
f func(chan interface{})
}

func (it *iterableFromFunc) Iterator() Iterator {
out := make(chan interface{})
go it.f(out)
return newIteratorFromChannel(out)
}

func (it *iterableFromChannel) Iterator() Iterator {
return newIteratorFromChannel(it.ch)
}

func (it *iterableFromSlice) Iterator() Iterator {
return newIteratorFromSlice(it.s)
}

func (it *iterableFromRange) Iterator() Iterator {
return newIteratorFromRange(it.start-1, it.start+it.count)
}

func newIterableFromChannel(ch chan interface{}) Iterable {
return &iterableFromChannel{
ch: ch,
}
}

func newIterableFromSlice(s []interface{}) Iterable {
return &iterableFromSlice{
s: s,
}
}

func newIterableFromRange(start, count int) Iterable {
return &iterableFromRange{
start: start,
count: count,
}
}

func newIterableFromFunc(f func(chan interface{})) Iterable {
return &iterableFromFunc{
f: f,
}
}
38 changes: 0 additions & 38 deletions iterable/iterable.go

This file was deleted.

94 changes: 0 additions & 94 deletions iterable/iterable_test.go

This file was deleted.

63 changes: 62 additions & 1 deletion iterator.go
@@ -1,6 +1,67 @@
package rxgo

// Iterator type is implemented by Iterable.
import "github.com/reactivex/rxgo/errors"

type Iterator interface {
Next() (interface{}, error)
}

type iteratorFromChannel struct {
ch chan interface{}
}

type iteratorFromSlice struct {
index int
s []interface{}
}

type iteratorFromRange struct {
current int
end int // Included
}

func (it *iteratorFromChannel) Next() (interface{}, error) {
if next, ok := <-it.ch; ok {
return next, nil
}

return nil, errors.New(errors.EndOfIteratorError)
}

func (it *iteratorFromSlice) Next() (interface{}, error) {
it.index = it.index + 1
if it.index < len(it.s) {
return it.s[it.index], nil
} else {
return nil, errors.New(errors.EndOfIteratorError)
}
}

func (it *iteratorFromRange) Next() (interface{}, error) {
it.current = it.current + 1
if it.current <= it.end {
return it.current, nil
} else {
return nil, errors.New(errors.EndOfIteratorError)
}
}

func newIteratorFromChannel(ch chan interface{}) Iterator {
return &iteratorFromChannel{
ch: ch,
}
}

func newIteratorFromSlice(s []interface{}) Iterator {
return &iteratorFromSlice{
index: -1,
s: s,
}
}

func newIteratorFromRange(start, end int) Iterator {
return &iteratorFromRange{
current: start,
end: end,
}
}

0 comments on commit 21c5135

Please sign in to comment.