Skip to content

Commit

Permalink
Merge f0669f3 into 613168f
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Apr 22, 2019
2 parents 613168f + f0669f3 commit ea0233d
Show file tree
Hide file tree
Showing 27 changed files with 337 additions and 743 deletions.
3 changes: 1 addition & 2 deletions assert.go
Expand Up @@ -6,7 +6,6 @@ import (
"errors"

"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/optional"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -215,7 +214,7 @@ func AssertThatOptionalSingle(t *testing.T, optionalSingle OptionalSingle, asser
assert.Fail(t, "error while retrieving OptionalSingle results")
}

if optional, ok := v.(optional.Optional); ok {
if optional, ok := v.(Optional); ok {
checkIsEmpty, empty := ass.isEmptyFunc()
if checkIsEmpty {
if empty {
Expand Down
12 changes: 5 additions & 7 deletions assert_test.go
Expand Up @@ -3,8 +3,6 @@ package rxgo
import (
"errors"
"testing"

"github.com/reactivex/rxgo/optional"
)

func TestAssertThatObservableHasItems(t *testing.T) {
Expand Down Expand Up @@ -37,21 +35,21 @@ func TestAssertThatSingleNotError(t *testing.T) {
}

func TestAssertThatOptionalSingleIsEmpty(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Empty()), IsEmpty())
AssertThatOptionalSingle(t, newOptionalSingleFrom(EmptyOptional()), IsEmpty())
}

func TestAssertThatOptionalSingleIsNotEmpty(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(1)), IsNotEmpty())
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(1)), IsNotEmpty())
}

func TestAssertThatOptionalSingleHasValue(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(1)), HasValue(1))
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(1)), HasValue(1))
}

func TestAssertThatOptionalSingleHasRaisedAnError(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(errors.New("foo"))), HasRaisedAnError())
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(errors.New("foo"))), HasRaisedAnError())
}

func TestAssertThatOptionalSingleHasRaisedError(t *testing.T) {
AssertThatOptionalSingle(t, newOptionalSingleFrom(optional.Of(errors.New("foo"))), HasRaisedError(errors.New("foo")))
AssertThatOptionalSingle(t, newOptionalSingleFrom(Of(errors.New("foo"))), HasRaisedError(errors.New("foo")))
}
13 changes: 9 additions & 4 deletions connectableobservable.go
@@ -1,6 +1,7 @@
package rxgo

import (
"context"
"sync"

"github.com/reactivex/rxgo/handlers"
Expand All @@ -24,8 +25,8 @@ func newConnectableObservableFromObservable(observable Observable) ConnectableOb
}
}

func (c *connectableObservable) Iterator() Iterator {
return c.observable.Iterator()
func (c *connectableObservable) Iterator(ctx context.Context) Iterator {
return c.observable.Iterator(context.Background())
}

func (c *connectableObservable) All(predicate Predicate) Single {
Expand Down Expand Up @@ -75,9 +76,9 @@ func (c *connectableObservable) BufferWithTimeOrCount(timespan Duration, count i
func (c *connectableObservable) Connect() Observer {
out := NewObserver()
go func() {
it := c.observable.Iterator()
it := c.observable.Iterator(context.Background())
for {
if item, err := it.Next(); err == nil {
if item, err := it.Next(context.Background()); err == nil {
c.observersMutex.Lock()
for _, observer := range c.observers {
c.observersMutex.Unlock()
Expand Down Expand Up @@ -284,6 +285,10 @@ func (c *connectableObservable) TakeWhile(apply Predicate) Observable {
return c.observable.TakeWhile(apply)
}

func (c *connectableObservable) Timeout(duration Duration) Observable {
return c.observable.Timeout(duration)
}

func (c *connectableObservable) ToChannel(opts ...options.Option) Channel {
return c.observable.ToChannel(opts...)
}
Expand Down
50 changes: 50 additions & 0 deletions errors.go
@@ -0,0 +1,50 @@
package rxgo

// CancelledIteratorError is triggered when an iterator is canceled
type CancelledIteratorError struct {
}

// EndOfIteratorError is triggered when an iterator is complete
type EndOfIteratorError struct {
}

// IllegalInputError is triggered when the observable receives an illegal input
type IllegalInputError struct {
reason string
}

// IndexOutOfBoundError is triggered when the observable cannot access to the specified index
type IndexOutOfBoundError struct {
}

// NoSuchElementError is triggered when an optional does not contain any element
type NoSuchElementError struct {
}

// TimeoutError is triggered when a timeout occurs
type TimeoutError struct {
}

func (e *CancelledIteratorError) Error() string {
return "CancelledIteratorError"
}

func (e *EndOfIteratorError) Error() string {
return "EndOfIteratorError"
}

func (e *IllegalInputError) Error() string {
return e.reason
}

func (e *IndexOutOfBoundError) Error() string {
return "IndexOutOfBoundError"
}

func (e *NoSuchElementError) Error() string {
return "NoSuchElementError"
}

func (e *TimeoutError) Error() string {
return "TimeoutError"
}
17 changes: 0 additions & 17 deletions errors/errorcode_string.go

This file was deleted.

47 changes: 0 additions & 47 deletions errors/errors.go

This file was deleted.

60 changes: 0 additions & 60 deletions errors/errors_test.go

This file was deleted.

26 changes: 0 additions & 26 deletions examples/flatmap/flatmap_slice.go

This file was deleted.

31 changes: 0 additions & 31 deletions examples/flatmap/flatmap_slice_test.go

This file was deleted.

0 comments on commit ea0233d

Please sign in to comment.