Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/new_observables' into new_observ…
Browse files Browse the repository at this point in the history
…ables
  • Loading branch information
teivah committed Jan 11, 2019
2 parents 2f969f1 + 3f219cb commit f49215f
Show file tree
Hide file tree
Showing 8 changed files with 627 additions and 473 deletions.
9 changes: 6 additions & 3 deletions connectableobservable.go
Expand Up @@ -40,9 +40,12 @@ func (c *connectableObservable) Connect() Observer {
source := make([]interface{}, 0)

it := c.iterator
for it.Next() {
item := it.Value()
source = append(source, item)
for {
if item, err := it.Next(); err == nil {
source = append(source, item)
} else {
break
}
}

var wg sync.WaitGroup
Expand Down
25 changes: 14 additions & 11 deletions flatmap.go
Expand Up @@ -42,18 +42,21 @@ func flatObservedSequence(out chan interface{}, o Observable, apply func(interfa
count = 0

it := o.Iterator()
for it.Next() {
item := it.Value()
sequence = apply(item)
count++
wg.Add(1)
go func() {
defer wg.Done()
sequence.Subscribe(emissionObserver).Block()
}()
for {
if item, err := it.Next(); err == nil {
sequence = apply(item)
count++
wg.Add(1)
go func() {
defer wg.Done()
sequence.Subscribe(emissionObserver).Block()
}()

if count%maxInParallel == 0 {
wg.Wait()
if count%maxInParallel == 0 {
wg.Wait()
}
} else {
break
}
}

Expand Down
45 changes: 18 additions & 27 deletions iterator.go
@@ -1,13 +1,13 @@
package rxgo

import "github.com/reactivex/rxgo/errors"

type Iterator interface {
Next() bool
Value() interface{}
Next() (interface{}, error)
}

type iteratorFromChannel struct {
item interface{}
ch chan interface{}
ch chan interface{}
}

type iteratorFromSlice struct {
Expand All @@ -20,39 +20,30 @@ type iteratorFromRange struct {
end int // Included
}

func (it *iteratorFromChannel) Next() bool {
if v, ok := <-it.ch; ok {
it.item = v
return true
func (it *iteratorFromChannel) Next() (interface{}, error) {
if next, ok := <-it.ch; ok {
return next, nil
}

return false
}

func (it *iteratorFromChannel) Value() interface{} {
return it.item
return nil, errors.New(errors.EndOfIteratorError)
}

func (it *iteratorFromSlice) Next() bool {
func (it *iteratorFromSlice) Next() (interface{}, error) {
it.index = it.index + 1
if it.index >= len(it.s) {
return false
if it.index < len(it.s) {
return it.s[it.index], nil
} else {
return true
return nil, errors.New(errors.EndOfIteratorError)
}
}

func (it *iteratorFromSlice) Value() interface{} {
return it.s[it.index]
}

func (it *iteratorFromRange) Next() bool {
func (it *iteratorFromRange) Next() (interface{}, error) {
it.current = it.current + 1
return it.current <= it.end
}

func (it *iteratorFromRange) Value() interface{} {
return it.current
if it.current <= it.end {
return it.current, nil
} else {
return nil, errors.New(errors.EndOfIteratorError)
}
}

func newIteratorFromChannel(ch chan interface{}) Iterator {
Expand Down
45 changes: 20 additions & 25 deletions iterator_test.go
@@ -1,49 +1,44 @@
package rxgo

import (
"testing"

"github.com/stretchr/testify/assert"
"testing"
)

func TestIteratorFromChannel(t *testing.T) {
ch := make(chan interface{}, 1)
it := newIteratorFromChannel(ch)

ch <- 1
assert.True(t, it.Next())
assert.Equal(t, 1, it.Value())
next, err := it.Next()
assert.Nil(t, err)
assert.Equal(t, 1, next)

ch <- 2
assert.True(t, it.Next())
assert.Equal(t, 2, it.Value())
next, err = it.Next()
assert.Nil(t, err)
assert.Equal(t, 2, next)

close(ch)
assert.False(t, it.Next())
_, err = it.Next()
assert.NotNil(t, err)
}

func TestIteratorFromSlice(t *testing.T) {
it := newIteratorFromSlice([]interface{}{1, 2, 3})

assert.True(t, it.Next())
assert.Equal(t, 1, it.Value())
next, err := it.Next()
assert.Nil(t, err)
assert.Equal(t, 1, next)

assert.True(t, it.Next())
assert.Equal(t, 2, it.Value())

assert.True(t, it.Next())
assert.Equal(t, 3, it.Value())

assert.False(t, it.Next())
}
next, err = it.Next()
assert.Nil(t, err)
assert.Equal(t, 2, next)

func TestName(t *testing.T) {
just := Just(1).Map(func(i interface{}) interface{} {
return 1 + i.(int)
}).Map(func(i interface{}) interface{} {
return 1 + i.(int)
})
next, err = it.Next()
assert.Nil(t, err)
assert.Equal(t, 3, next)

AssertThatObservable(t, just, HasItems(3))
AssertThatObservable(t, just, HasItems(3))
_, err = it.Next()
assert.NotNil(t, err)
}

0 comments on commit f49215f

Please sign in to comment.