Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ install:
- go get github.com/onsi/ginkgo
- go get github.com/onsi/gomega
- go get golang.org/x/sync/semaphore
- go get github.com/pkg/errors

script:
- go test -v -race -cover -coverprofile=/tmp/coverage.out
Expand Down
27 changes: 9 additions & 18 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,38 @@ package rxgo
type CancelledIteratorError struct {
}

// EndOfIteratorError is triggered when an iterator is complete
type EndOfIteratorError struct {
}

// IllegalInputError is triggered when the observable receives an illegal input
type IllegalInputError struct {
reason string
}

// IndexOutOfBoundError is triggered when the observable cannot access to the specified index
type IndexOutOfBoundError struct {
}

// NoSuchElementError is triggered when an optional does not contain any element
// NoSuchElementError is triggered when an element does not exist
type NoSuchElementError struct {
}

// TimeoutError is triggered when a timeout occurs
type TimeoutError struct {
// CancelledSubscriptionError is triggered when a subscription was cancelled manually by an end user
type CancelledSubscriptionError struct {
}

func (e *CancelledIteratorError) Error() string {
return "CancelledIteratorError"
}

func (e *EndOfIteratorError) Error() string {
return "EndOfIteratorError"
return "cancelled iterator"
}

func (e *IllegalInputError) Error() string {
return e.reason
return "illegal input"
}

func (e *IndexOutOfBoundError) Error() string {
return "IndexOutOfBoundError"
return "index out of bound"
}

func (e *NoSuchElementError) Error() string {
return "NoSuchElementError"
return "no such element"
}

func (e *TimeoutError) Error() string {
return "TimeoutError"
func (e *CancelledSubscriptionError) Error() string {
return "timeout"
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/reactivex/rxgo

require (
github.com/gorilla/websocket v1.4.0
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
Expand Down
10 changes: 6 additions & 4 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package rxgo

import (
"context"

"github.com/pkg/errors"
)

type Iterator interface {
Expand All @@ -27,14 +29,14 @@ type iteratorFromSlice struct {
func (it *iteratorFromChannel) Next(ctx context.Context) (interface{}, error) {
select {
case <-ctx.Done():
return nil, &TimeoutError{}
return nil, &CancelledSubscriptionError{}
case <-it.ctx.Done():
return nil, &CancelledIteratorError{}
case next, ok := <-it.ch:
if ok {
return next, nil
}
return nil, &EndOfIteratorError{}
return nil, &NoSuchElementError{}
}
}

Expand All @@ -43,15 +45,15 @@ func (it *iteratorFromRange) Next(ctx context.Context) (interface{}, error) {
if it.current <= it.end {
return it.current, nil
}
return nil, &EndOfIteratorError{}
return nil, errors.Wrap(&NoSuchElementError{}, "range does not contain anymore elements")
}

func (it *iteratorFromSlice) Next(ctx context.Context) (interface{}, error) {
it.index++
if it.index < len(it.s) {
return it.s[it.index], nil
}
return nil, &EndOfIteratorError{}
return nil, errors.Wrap(&NoSuchElementError{}, "slice does not contain anymore elements")
}

func newIteratorFromChannel(ch chan interface{}) Iterator {
Expand Down
52 changes: 29 additions & 23 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package rxgo
import (
"container/ring"
"context"
"fmt"
"sync"
"time"

"fmt"

"github.com/pkg/errors"
"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/options"
)
Expand Down Expand Up @@ -233,7 +233,8 @@ func (o *observable) AverageFloat32() Single {
sum += v
count++
} else {
out <- &IllegalInputError{fmt.Sprintf("type: %t", item)}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: float32, got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -263,7 +264,8 @@ func (o *observable) AverageInt() Single {
sum += v
count++
} else {
out <- &IllegalInputError{fmt.Sprintf("type: %t", item)}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: int, got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -293,7 +295,8 @@ func (o *observable) AverageInt8() Single {
sum += v
count++
} else {
out <- &IllegalInputError{fmt.Sprintf("type: %t", item)}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: int8, got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -323,7 +326,8 @@ func (o *observable) AverageFloat64() Single {
sum += v
count++
} else {
out <- &IllegalInputError{fmt.Sprintf("type: %t", item)}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: float64, got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -353,7 +357,8 @@ func (o *observable) AverageInt16() Single {
sum += v
count++
} else {
out <- &IllegalInputError{fmt.Sprintf("type: %t", item)}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: int16, got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -383,7 +388,8 @@ func (o *observable) AverageInt32() Single {
sum += v
count++
} else {
out <- &IllegalInputError{fmt.Sprintf("type: %t", item)}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: int32, got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -413,7 +419,8 @@ func (o *observable) AverageInt64() Single {
sum += v
count++
} else {
out <- &IllegalInputError{fmt.Sprintf("type: %t", item)}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: int64, got: %t", item))
close(out)
return
}
Expand All @@ -440,13 +447,13 @@ func (o *observable) AverageInt64() Single {
func (o *observable) BufferWithCount(count, skip int) Observable {
f := func(out chan interface{}) {
if count <= 0 {
out <- &IllegalInputError{"count must be positive"}
out <- errors.Wrap(&IllegalInputError{}, "count must be positive")
close(out)
return
}

if skip <= 0 {
out <- &IllegalInputError{"skip must be positive"}
out <- errors.Wrap(&IllegalInputError{}, "skip must be positive")
close(out)
return
}
Expand Down Expand Up @@ -502,7 +509,7 @@ func (o *observable) BufferWithCount(count, skip int) Observable {
func (o *observable) BufferWithTime(timespan, timeshift Duration) Observable {
f := func(out chan interface{}) {
if timespan == nil || timespan.duration() == 0 {
out <- &IllegalInputError{"timespan must not be nil"}
out <- errors.Wrap(&IllegalInputError{}, "timespan must no be nil")
close(out)
return
}
Expand Down Expand Up @@ -596,13 +603,13 @@ func (o *observable) BufferWithTime(timespan, timeshift Duration) Observable {
func (o *observable) BufferWithTimeOrCount(timespan Duration, count int) Observable {
f := func(out chan interface{}) {
if timespan == nil || timespan.duration() == 0 {
out <- &IllegalInputError{"timespan must not be nil"}
out <- errors.Wrap(&IllegalInputError{}, "timespan must not be nil")
close(out)
return
}

if count <= 0 {
out <- &IllegalInputError{"count must be positive"}
out <- errors.Wrap(&IllegalInputError{}, "count must be positive")
close(out)
return
}
Expand Down Expand Up @@ -815,6 +822,7 @@ func (o *observable) ElementAt(index uint) Single {
}
}
out <- &IndexOutOfBoundError{}
out <- &IndexOutOfBoundError{}
close(out)
}
return newColdSingle(f)
Expand Down Expand Up @@ -1410,9 +1418,8 @@ func (o *observable) SumInt64() Single {
case int64:
sum += item
default:
out <- &IllegalInputError{
fmt.Sprintf("expected type: int, int8, int16, int32 or int64, got %t", item),
}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: (int|int8|int16|int32|int64), got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -1447,9 +1454,8 @@ func (o *observable) SumFloat32() Single {
case float32:
sum += item
default:
out <- &IllegalInputError{
fmt.Sprintf("expected type: float32, int, int8, int16, int32 or int64, got %t", item),
}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: (float32|int|int8|int16|int32|int64), got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -1486,9 +1492,8 @@ func (o *observable) SumFloat64() Single {
case float64:
sum += item
default:
out <- &IllegalInputError{
fmt.Sprintf("expected type: float32, float64, int, int8, int16, int32 or int64, got %t", item),
}
out <- errors.Wrap(&IllegalInputError{},
fmt.Sprintf("expected type: (float32|float64|int|int8|int16|int32|int64), got: %t", item))
close(out)
return
}
Expand Down Expand Up @@ -1604,6 +1609,7 @@ func (o *observable) TakeWhile(apply Predicate) Observable {
func (o *observable) Timeout(duration Duration) Observable {
f := func(out chan interface{}) {
it := o.Iterator(context.Background())
// TODO Handle cancel
ctx, _ := context.WithTimeout(context.Background(), duration.duration())
for {
if item, err := it.Next(ctx); err == nil {
Expand Down
10 changes: 5 additions & 5 deletions observablecreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync/atomic"
"time"

"github.com/reactivex/rxgo/options"

"github.com/pkg/errors"
"github.com/reactivex/rxgo/handlers"
"github.com/reactivex/rxgo/options"
)

func isClosed(ch <-chan interface{}) bool {
Expand Down Expand Up @@ -341,8 +341,8 @@ func Merge(observable Observable, observables ...Observable) Observable {
if item, err := it.Next(context.Background()); err == nil {
out <- item
} else {
break
wg.Done()
break
}
}
}
Expand Down Expand Up @@ -371,10 +371,10 @@ func Never() Observable {
// Range creates an Observable that emits a particular range of sequential integers.
func Range(start, count int) (Observable, error) {
if count < 0 {
return nil, &IllegalInputError{"count must be positive"}
return nil, errors.Wrap(&IllegalInputError{}, "count must be positive")
}
if start+count-1 > math.MaxInt32 {
return nil, &IllegalInputError{"max value is bigger than MaxInt32"}
return nil, errors.Wrap(&IllegalInputError{}, "max value is bigger than math.MaxInt32")
}

return newObservableFromRange(start, count), nil
Expand Down
4 changes: 2 additions & 2 deletions observablecreate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (it *statefulIterable) Next(ctx context.Context) (interface{}, error) {
if it.count < 3 {
return it.count, nil
}
return nil, &EndOfIteratorError{}
return nil, &NoSuchElementError{}
}

func (it *statefulIterable) Value() interface{} {
Expand Down Expand Up @@ -239,7 +239,7 @@ func (it *statelessIterable) Next(ctx context.Context) (interface{}, error) {
if it.count < 3 {
return it.count, nil
}
return nil, &EndOfIteratorError{}
return nil, &NoSuchElementError{}
}

func TestRange(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion optional.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package rxgo

import "github.com/pkg/errors"

var emptyOptional = new(empty)

// Optional defines a container for empty values
Expand Down Expand Up @@ -29,7 +31,7 @@ func (s *some) IsEmpty() bool {

// Get returns the content and an optional error is the optional is empty
func (e *empty) Get() (interface{}, error) {
return nil, &NoSuchElementError{}
return nil, errors.Wrap(&NoSuchElementError{}, "empty does not contain any element")
}

// IsEmpty returns whether the optional is empty
Expand Down
Loading