Skip to content

Commit

Permalink
Merge 3f219cb into 6007666
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Dec 12, 2018
2 parents 6007666 + 3f219cb commit 094856b
Show file tree
Hide file tree
Showing 13 changed files with 1,279 additions and 1,211 deletions.
16 changes: 12 additions & 4 deletions connectableobservable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ import (
)

type ConnectableObservable interface {
Iterable
Connect() Observer
Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer
}

type connectableObservable struct {
iterator Iterator
observable Observable
observers []Observer
}

func NewConnectableObservable(observable Observable) ConnectableObservable {
func newConnectableObservableFromObservable(observable Observable) ConnectableObservable {
return &connectableObservable{
observable: observable,
iterator: observable.Iterator(),
}
}

func (c *connectableObservable) Iterator() Iterator {
return c.iterator
}

func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer {
ob := CheckEventHandler(handler)
c.observers = append(c.observers, ob)
Expand All @@ -32,12 +39,13 @@ func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ..
func (c *connectableObservable) Connect() Observer {
source := make([]interface{}, 0)

it := c.iterator
for {
item, err := c.observable.Next()
if err != nil {
if item, err := it.Next(); err == nil {
source = append(source, item)
} else {
break
}
source = append(source, item)
}

var wg sync.WaitGroup
Expand Down
31 changes: 15 additions & 16 deletions flatmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func (o *observable) flatMap(

go flatteningFunc(out, o, apply, maxInParallel)

return &observable{
ch: out,
}
return newObservableFromChannel(out)
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
Expand All @@ -43,21 +41,22 @@ func flatObservedSequence(out chan interface{}, o Observable, apply func(interfa

count = 0

it := o.Iterator()
for {
element, err := o.Next()
if err != nil {
break
}
sequence = apply(element)
count++
wg.Add(1)
go func() {
defer wg.Done()
sequence.Subscribe(emissionObserver).Block()
}()
if item, err := it.Next(); err == nil {
sequence = apply(item)
count++
wg.Add(1)
go func() {
defer wg.Done()
sequence.Subscribe(emissionObserver).Block()
}()

if count%maxInParallel == 0 {
wg.Wait()
if count%maxInParallel == 0 {
wg.Wait()
}
} else {
break
}
}

Expand Down
65 changes: 65 additions & 0 deletions iterable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rxgo

type Iterable interface {
Iterator() Iterator
}

type iterableFromChannel struct {
ch chan interface{}
}

type iterableFromSlice struct {
s []interface{}
}

type iterableFromRange struct {
start int
count int
}

type iterableFromFunc struct {
f func(chan interface{})
}

func (it *iterableFromFunc) Iterator() Iterator {
out := make(chan interface{})
go it.f(out)
return newIteratorFromChannel(out)
}

func (it *iterableFromChannel) Iterator() Iterator {
return newIteratorFromChannel(it.ch)
}

func (it *iterableFromSlice) Iterator() Iterator {
return newIteratorFromSlice(it.s)
}

func (it *iterableFromRange) Iterator() Iterator {
return newIteratorFromRange(it.start-1, it.start+it.count)
}

func newIterableFromChannel(ch chan interface{}) Iterable {
return &iterableFromChannel{
ch: ch,
}
}

func newIterableFromSlice(s []interface{}) Iterable {
return &iterableFromSlice{
s: s,
}
}

func newIterableFromRange(start, count int) Iterable {
return &iterableFromRange{
start: start,
count: count,
}
}

func newIterableFromFunc(f func(chan interface{})) Iterable {
return &iterableFromFunc{
f: f,
}
}
38 changes: 0 additions & 38 deletions iterable/iterable.go

This file was deleted.

94 changes: 0 additions & 94 deletions iterable/iterable_test.go

This file was deleted.

63 changes: 62 additions & 1 deletion iterator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,67 @@
package rxgo

// Iterator type is implemented by Iterable.
import "github.com/reactivex/rxgo/errors"

type Iterator interface {
Next() (interface{}, error)
}

type iteratorFromChannel struct {
ch chan interface{}
}

type iteratorFromSlice struct {
index int
s []interface{}
}

type iteratorFromRange struct {
current int
end int // Included
}

func (it *iteratorFromChannel) Next() (interface{}, error) {
if next, ok := <-it.ch; ok {
return next, nil
}

return nil, errors.New(errors.EndOfIteratorError)
}

func (it *iteratorFromSlice) Next() (interface{}, error) {
it.index = it.index + 1
if it.index < len(it.s) {
return it.s[it.index], nil
} else {
return nil, errors.New(errors.EndOfIteratorError)
}
}

func (it *iteratorFromRange) Next() (interface{}, error) {
it.current = it.current + 1
if it.current <= it.end {
return it.current, nil
} else {
return nil, errors.New(errors.EndOfIteratorError)
}
}

func newIteratorFromChannel(ch chan interface{}) Iterator {
return &iteratorFromChannel{
ch: ch,
}
}

func newIteratorFromSlice(s []interface{}) Iterator {
return &iteratorFromSlice{
index: -1,
s: s,
}
}

func newIteratorFromRange(start, end int) Iterator {
return &iteratorFromRange{
current: start,
end: end,
}
}

0 comments on commit 094856b

Please sign in to comment.