Skip to content

Commit

Permalink
Fixing merge
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Dec 10, 2018
1 parent 14aec26 commit 6358645
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 48 deletions.
Empty file removed iterable/iterable_test.go
Empty file.
54 changes: 26 additions & 28 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/reactivex/rxgo/errors"
"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/iterable"
"github.com/reactivex/rxgo/optional"
"github.com/reactivex/rxgo/options"
)
Expand Down Expand Up @@ -55,7 +54,7 @@ type Observable interface {
SkipLast(nth uint) Observable
SkipWhile(apply Predicate) Observable
StartWithItems(items ...interface{}) Observable
StartWithIterable(iterable iterable.Iterable) Observable
StartWithIterable(iterable Iterable) Observable
StartWithObservable(observable Observable) Observable
Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer
SumFloat32() Single
Expand Down Expand Up @@ -1286,61 +1285,60 @@ func (o *observable) SumFloat64() Single {
// StartWithItems returns an Observable that emits the specified items before it begins to emit items emitted
// by the source Observable.
func (o *observable) StartWithItems(items ...interface{}) Observable {
out := make(chan interface{})
go func() {
f := func(out chan interface{}) {
for _, item := range items {
out <- item
}

for item := range o.ch {
it := o.iterable.Iterator()
for it.Next() {
item := it.Value()
out <- item
}

close(out)
}()
return &observable{ch: out}
}
return newColdObservable(f)
}

// StartWithIterable returns an Observable that emits the items in a specified Iterable before it begins to
// emit items emitted by the source Observable.
func (o *observable) StartWithIterable(iterable iterable.Iterable) Observable {
out := make(chan interface{})
go func() {
for {
item, err := iterable.Next()
if err != nil {
break
}
func (o *observable) StartWithIterable(iterable Iterable) Observable {
f := func(out chan interface{}) {
it := iterable.Iterator()
for it.Next() {
item := it.Value()
out <- item
}

for item := range o.ch {
it = o.iterable.Iterator()
for it.Next() {
item := it.Value()
out <- item
}

close(out)
}()
return &observable{ch: out}
}
return newColdObservable(f)
}

// StartWithObservable returns an Observable that emits the items in a specified Observable before it begins to
// emit items emitted by the source Observable.
func (o *observable) StartWithObservable(obs Observable) Observable {
out := make(chan interface{})
go func() {
for {
item, err := obs.Next()
if err != nil {
break
}
f := func(out chan interface{}) {
it := obs.Iterator()
for it.Next() {
item := it.Value()
out <- item
}

for item := range o.ch {
it = o.iterable.Iterator()
for it.Next() {
item := it.Value()
out <- item
}

close(out)
}()
return &observable{ch: out}
}
return newColdObservable(f)
}
28 changes: 8 additions & 20 deletions observable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,47 +1595,35 @@ func TestStartWithItemsWithoutItems(t *testing.T) {
}

func TestStartWithIterable(t *testing.T) {
ch := make(chan interface{})
it, err := iterable.New(ch)
if err != nil {
t.Fail()
}
ch := make(chan interface{}, 1)
it := newIterableFromChannel(ch)
obs := Just(1, 2, 3).StartWithIterable(it)
ch <- 10
close(ch)
AssertThatObservable(t, obs, HasItems(10, 1, 2, 3))
}

func TestStartWithIterableWithError(t *testing.T) {
ch := make(chan interface{})
it, err := iterable.New(ch)
if err != nil {
t.Fail()
}
ch := make(chan interface{}, 1)
it := newIterableFromChannel(ch)
obs := Just(1, 2, 3).StartWithIterable(it)
ch <- errors.New("")
close(ch)
AssertThatObservable(t, obs, IsEmpty(), HasRaisedError(errors.New("")))
}

func TestStartWithIterableFromEmpty(t *testing.T) {
ch := make(chan interface{})
it, err := iterable.New(ch)
if err != nil {
t.Fail()
}
ch := make(chan interface{}, 1)
it := newIterableFromChannel(ch)
obs := Empty().StartWithIterable(it)
ch <- 1
close(ch)
AssertThatObservable(t, obs, HasItems(1))
}

func TestStartWithIterableWithoutItems(t *testing.T) {
ch := make(chan interface{})
it, err := iterable.New(ch)
if err != nil {
t.Fail()
}
ch := make(chan interface{}, 1)
it := newIterableFromChannel(ch)
obs := Just(1, 2, 3).StartWithIterable(it)
close(ch)
AssertThatObservable(t, obs, HasItems(1, 2, 3))
Expand Down

0 comments on commit 6358645

Please sign in to comment.