Skip to content

Commit

Permalink
Merge 06bb4e9 into f5d04df
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Dec 10, 2018
2 parents f5d04df + 06bb4e9 commit 2f1fa55
Show file tree
Hide file tree
Showing 13 changed files with 871 additions and 947 deletions.
17 changes: 11 additions & 6 deletions connectableobservable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ import (
)

type ConnectableObservable interface {
Iterable
Connect() Observer
Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer
}

type connectableObservable struct {
iterator Iterator
observable Observable
observers []Observer
}

func NewConnectableObservable(observable Observable) ConnectableObservable {
func newConnectableObservableFromObservable(observable Observable) ConnectableObservable {
return &connectableObservable{
observable: observable,
iterator: observable.Iterator(),
}
}

func (c *connectableObservable) Iterator() Iterator {
return c.iterator
}

func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer {
ob := CheckEventHandler(handler)
c.observers = append(c.observers, ob)
Expand All @@ -32,11 +39,9 @@ func (c *connectableObservable) Subscribe(handler handlers.EventHandler, opts ..
func (c *connectableObservable) Connect() Observer {
source := make([]interface{}, 0)

for {
item, err := c.observable.Next()
if err != nil {
break
}
it := c.iterator
for it.Next() {
item := it.Value()
source = append(source, item)
}

Expand Down
14 changes: 5 additions & 9 deletions flatmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func (o *observable) flatMap(

go flatteningFunc(out, o, apply, maxInParallel)

return &observable{
ch: out,
}
return newObservableFromChannel(out)
}

func flatObservedSequence(out chan interface{}, o Observable, apply func(interface{}) Observable, maxInParallel uint) {
Expand All @@ -43,12 +41,10 @@ func flatObservedSequence(out chan interface{}, o Observable, apply func(interfa

count = 0

for {
element, err := o.Next()
if err != nil {
break
}
sequence = apply(element)
it := o.Iterator()
for it.Next() {
item := it.Value()
sequence = apply(item)
count++
wg.Add(1)
go func() {
Expand Down
65 changes: 65 additions & 0 deletions iterable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rxgo

type Iterable interface {
Iterator() Iterator
}

type iterableFromChannel struct {
ch chan interface{}
}

type iterableFromSlice struct {
s []interface{}
}

type iterableFromRange struct {
start int
count int
}

type iterableFromFunc struct {
f func(chan interface{})
}

func (it *iterableFromFunc) Iterator() Iterator {
out := make(chan interface{})
go it.f(out)
return newIteratorFromChannel(out)
}

func (it *iterableFromChannel) Iterator() Iterator {
return newIteratorFromChannel(it.ch)
}

func (it *iterableFromSlice) Iterator() Iterator {
return newIteratorFromSlice(it.s)
}

func (it *iterableFromRange) Iterator() Iterator {
return newIteratorFromRange(it.start-1, it.start+it.count)
}

func newIterableFromChannel(ch chan interface{}) Iterable {
return &iterableFromChannel{
ch: ch,
}
}

func newIterableFromSlice(s []interface{}) Iterable {
return &iterableFromSlice{
s: s,
}
}

func newIterableFromRange(start, count int) Iterable {
return &iterableFromRange{
start: start,
count: count,
}
}

func newIterableFromFunc(f func(chan interface{})) Iterable {
return &iterableFromFunc{
f: f,
}
}
38 changes: 0 additions & 38 deletions iterable/iterable.go

This file was deleted.

99 changes: 0 additions & 99 deletions iterable/iterable_test.go

This file was deleted.

74 changes: 72 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,76 @@
package rxgo

// Iterator type is implemented by Iterable.
type Iterator interface {
Next() (interface{}, error)
Next() bool
Value() interface{}
}

type iteratorFromChannel struct {
item interface{}
ch chan interface{}
}

type iteratorFromSlice struct {
index int
s []interface{}
}

type iteratorFromRange struct {
current int
end int // Included
}

func (it *iteratorFromChannel) Next() bool {
if v, ok := <-it.ch; ok {
it.item = v
return true
}

return false
}

func (it *iteratorFromChannel) Value() interface{} {
return it.item
}

func (it *iteratorFromSlice) Next() bool {
it.index = it.index + 1
if it.index >= len(it.s) {
return false
} else {
return true
}
}

func (it *iteratorFromSlice) Value() interface{} {
return it.s[it.index]
}

func (it *iteratorFromRange) Next() bool {
it.current = it.current + 1
return it.current <= it.end
}

func (it *iteratorFromRange) Value() interface{} {
return it.current
}

func newIteratorFromChannel(ch chan interface{}) Iterator {
return &iteratorFromChannel{
ch: ch,
}
}

func newIteratorFromSlice(s []interface{}) Iterator {
return &iteratorFromSlice{
index: -1,
s: s,
}
}

func newIteratorFromRange(start, end int) Iterator {
return &iteratorFromRange{
current: start,
end: end,
}
}
Loading

0 comments on commit 2f1fa55

Please sign in to comment.