From f0216a07f5892354c250bb447e3894d7adab7047 Mon Sep 17 00:00:00 2001 From: teivah Date: Fri, 7 Jun 2019 21:41:23 +0100 Subject: [PATCH 1/2] Errors management improvement --- errors.go | 27 +++++++-------------- go.mod | 2 +- go.sum | 4 ++-- iterator.go | 10 ++++---- observable.go | 52 ++++++++++++++++++++++------------------ observablecreate.go | 10 ++++---- observablecreate_test.go | 4 ++-- optional.go | 4 +++- optional_test.go | 4 +++- 9 files changed, 60 insertions(+), 57 deletions(-) diff --git a/errors.go b/errors.go index 387f61a9..c720ebe8 100644 --- a/errors.go +++ b/errors.go @@ -4,47 +4,38 @@ package rxgo type CancelledIteratorError struct { } -// EndOfIteratorError is triggered when an iterator is complete -type EndOfIteratorError struct { -} - // IllegalInputError is triggered when the observable receives an illegal input type IllegalInputError struct { - reason string } // IndexOutOfBoundError is triggered when the observable cannot access to the specified index type IndexOutOfBoundError struct { } -// NoSuchElementError is triggered when an optional does not contain any element +// NoSuchElementError is triggered when an element does not exist type NoSuchElementError struct { } -// TimeoutError is triggered when a timeout occurs -type TimeoutError struct { +// CancelledSubscriptionError is triggered when a subscription was cancelled manually by an end user +type CancelledSubscriptionError struct { } func (e *CancelledIteratorError) Error() string { - return "CancelledIteratorError" -} - -func (e *EndOfIteratorError) Error() string { - return "EndOfIteratorError" + return "cancelled iterator" } func (e *IllegalInputError) Error() string { - return e.reason + return "illegal input" } func (e *IndexOutOfBoundError) Error() string { - return "IndexOutOfBoundError" + return "index out of bound" } func (e *NoSuchElementError) Error() string { - return "NoSuchElementError" + return "no such element" } -func (e *TimeoutError) Error() string { - return "TimeoutError" +func (e *CancelledSubscriptionError) Error() string { + return "timeout" } diff --git a/go.mod b/go.mod index 22dcfd8e..4c66d2db 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/reactivex/rxgo require ( - github.com/gorilla/websocket v1.4.0 github.com/onsi/ginkgo v1.8.0 github.com/onsi/gomega v1.5.0 + github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.3.0 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f ) diff --git a/go.sum b/go.sum index b06817b8..875f7a69 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,6 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -13,6 +11,8 @@ github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= diff --git a/iterator.go b/iterator.go index 1367894a..6cf62ee0 100644 --- a/iterator.go +++ b/iterator.go @@ -2,6 +2,8 @@ package rxgo import ( "context" + + "github.com/pkg/errors" ) type Iterator interface { @@ -27,14 +29,14 @@ type iteratorFromSlice struct { func (it *iteratorFromChannel) Next(ctx context.Context) (interface{}, error) { select { case <-ctx.Done(): - return nil, &TimeoutError{} + return nil, &CancelledSubscriptionError{} case <-it.ctx.Done(): return nil, &CancelledIteratorError{} case next, ok := <-it.ch: if ok { return next, nil } - return nil, &EndOfIteratorError{} + return nil, &NoSuchElementError{} } } @@ -43,7 +45,7 @@ func (it *iteratorFromRange) Next(ctx context.Context) (interface{}, error) { if it.current <= it.end { return it.current, nil } - return nil, &EndOfIteratorError{} + return nil, errors.Wrap(&NoSuchElementError{}, "range does not contain anymore elements") } func (it *iteratorFromSlice) Next(ctx context.Context) (interface{}, error) { @@ -51,7 +53,7 @@ func (it *iteratorFromSlice) Next(ctx context.Context) (interface{}, error) { if it.index < len(it.s) { return it.s[it.index], nil } - return nil, &EndOfIteratorError{} + return nil, errors.Wrap(&NoSuchElementError{}, "slice does not contain anymore elements") } func newIteratorFromChannel(ch chan interface{}) Iterator { diff --git a/observable.go b/observable.go index 6ab218c7..4b3932cf 100644 --- a/observable.go +++ b/observable.go @@ -3,11 +3,11 @@ package rxgo import ( "container/ring" "context" + "fmt" "sync" "time" - "fmt" - + "github.com/pkg/errors" "github.com/reactivex/rxgo/handlers" "github.com/reactivex/rxgo/options" ) @@ -233,7 +233,8 @@ func (o *observable) AverageFloat32() Single { sum += v count++ } else { - out <- &IllegalInputError{fmt.Sprintf("type: %t", item)} + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: float32, got: %t", item)) close(out) return } @@ -263,7 +264,8 @@ func (o *observable) AverageInt() Single { sum += v count++ } else { - out <- &IllegalInputError{fmt.Sprintf("type: %t", item)} + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: int, got: %t", item)) close(out) return } @@ -293,7 +295,8 @@ func (o *observable) AverageInt8() Single { sum += v count++ } else { - out <- &IllegalInputError{fmt.Sprintf("type: %t", item)} + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: int8, got: %t", item)) close(out) return } @@ -323,7 +326,8 @@ func (o *observable) AverageFloat64() Single { sum += v count++ } else { - out <- &IllegalInputError{fmt.Sprintf("type: %t", item)} + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: float64, got: %t", item)) close(out) return } @@ -353,7 +357,8 @@ func (o *observable) AverageInt16() Single { sum += v count++ } else { - out <- &IllegalInputError{fmt.Sprintf("type: %t", item)} + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: int16, got: %t", item)) close(out) return } @@ -383,7 +388,8 @@ func (o *observable) AverageInt32() Single { sum += v count++ } else { - out <- &IllegalInputError{fmt.Sprintf("type: %t", item)} + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: int32, got: %t", item)) close(out) return } @@ -413,7 +419,8 @@ func (o *observable) AverageInt64() Single { sum += v count++ } else { - out <- &IllegalInputError{fmt.Sprintf("type: %t", item)} + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: int64, got: %t", item)) close(out) return } @@ -440,13 +447,13 @@ func (o *observable) AverageInt64() Single { func (o *observable) BufferWithCount(count, skip int) Observable { f := func(out chan interface{}) { if count <= 0 { - out <- &IllegalInputError{"count must be positive"} + out <- errors.Wrap(&IllegalInputError{}, "count must be positive") close(out) return } if skip <= 0 { - out <- &IllegalInputError{"skip must be positive"} + out <- errors.Wrap(&IllegalInputError{}, "skip must be positive") close(out) return } @@ -502,7 +509,7 @@ func (o *observable) BufferWithCount(count, skip int) Observable { func (o *observable) BufferWithTime(timespan, timeshift Duration) Observable { f := func(out chan interface{}) { if timespan == nil || timespan.duration() == 0 { - out <- &IllegalInputError{"timespan must not be nil"} + out <- errors.Wrap(&IllegalInputError{}, "timespan must no be nil") close(out) return } @@ -596,13 +603,13 @@ func (o *observable) BufferWithTime(timespan, timeshift Duration) Observable { func (o *observable) BufferWithTimeOrCount(timespan Duration, count int) Observable { f := func(out chan interface{}) { if timespan == nil || timespan.duration() == 0 { - out <- &IllegalInputError{"timespan must not be nil"} + out <- errors.Wrap(&IllegalInputError{}, "timespan must not be nil") close(out) return } if count <= 0 { - out <- &IllegalInputError{"count must be positive"} + out <- errors.Wrap(&IllegalInputError{}, "count must be positive") close(out) return } @@ -815,6 +822,7 @@ func (o *observable) ElementAt(index uint) Single { } } out <- &IndexOutOfBoundError{} + out <- &IndexOutOfBoundError{} close(out) } return newColdSingle(f) @@ -1410,9 +1418,8 @@ func (o *observable) SumInt64() Single { case int64: sum += item default: - out <- &IllegalInputError{ - fmt.Sprintf("expected type: int, int8, int16, int32 or int64, got %t", item), - } + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: (int|int8|int16|int32|int64), got: %t", item)) close(out) return } @@ -1447,9 +1454,8 @@ func (o *observable) SumFloat32() Single { case float32: sum += item default: - out <- &IllegalInputError{ - fmt.Sprintf("expected type: float32, int, int8, int16, int32 or int64, got %t", item), - } + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: (float32|int|int8|int16|int32|int64), got: %t", item)) close(out) return } @@ -1486,9 +1492,8 @@ func (o *observable) SumFloat64() Single { case float64: sum += item default: - out <- &IllegalInputError{ - fmt.Sprintf("expected type: float32, float64, int, int8, int16, int32 or int64, got %t", item), - } + out <- errors.Wrap(&IllegalInputError{}, + fmt.Sprintf("expected type: (float32|float64|int|int8|int16|int32|int64), got: %t", item)) close(out) return } @@ -1604,6 +1609,7 @@ func (o *observable) TakeWhile(apply Predicate) Observable { func (o *observable) Timeout(duration Duration) Observable { f := func(out chan interface{}) { it := o.Iterator(context.Background()) + // TODO Handle cancel ctx, _ := context.WithTimeout(context.Background(), duration.duration()) for { if item, err := it.Next(ctx); err == nil { diff --git a/observablecreate.go b/observablecreate.go index a46fc308..b609f9cb 100644 --- a/observablecreate.go +++ b/observablecreate.go @@ -7,9 +7,9 @@ import ( "sync/atomic" "time" - "github.com/reactivex/rxgo/options" - + "github.com/pkg/errors" "github.com/reactivex/rxgo/handlers" + "github.com/reactivex/rxgo/options" ) func isClosed(ch <-chan interface{}) bool { @@ -341,8 +341,8 @@ func Merge(observable Observable, observables ...Observable) Observable { if item, err := it.Next(context.Background()); err == nil { out <- item } else { - break wg.Done() + break } } } @@ -371,10 +371,10 @@ func Never() Observable { // Range creates an Observable that emits a particular range of sequential integers. func Range(start, count int) (Observable, error) { if count < 0 { - return nil, &IllegalInputError{"count must be positive"} + return nil, errors.Wrap(&IllegalInputError{}, "count must be positive") } if start+count-1 > math.MaxInt32 { - return nil, &IllegalInputError{"max value is bigger than MaxInt32"} + return nil, errors.Wrap(&IllegalInputError{}, "max value is bigger than math.MaxInt32") } return newObservableFromRange(start, count), nil diff --git a/observablecreate_test.go b/observablecreate_test.go index 8f18b577..9eaad9e2 100644 --- a/observablecreate_test.go +++ b/observablecreate_test.go @@ -210,7 +210,7 @@ func (it *statefulIterable) Next(ctx context.Context) (interface{}, error) { if it.count < 3 { return it.count, nil } - return nil, &EndOfIteratorError{} + return nil, &NoSuchElementError{} } func (it *statefulIterable) Value() interface{} { @@ -239,7 +239,7 @@ func (it *statelessIterable) Next(ctx context.Context) (interface{}, error) { if it.count < 3 { return it.count, nil } - return nil, &EndOfIteratorError{} + return nil, &NoSuchElementError{} } func TestRange(t *testing.T) { diff --git a/optional.go b/optional.go index 8838520d..46220270 100644 --- a/optional.go +++ b/optional.go @@ -1,5 +1,7 @@ package rxgo +import "github.com/pkg/errors" + var emptyOptional = new(empty) // Optional defines a container for empty values @@ -29,7 +31,7 @@ func (s *some) IsEmpty() bool { // Get returns the content and an optional error is the optional is empty func (e *empty) Get() (interface{}, error) { - return nil, &NoSuchElementError{} + return nil, errors.Wrap(&NoSuchElementError{}, "empty does not contain any element") } // IsEmpty returns whether the optional is empty diff --git a/optional_test.go b/optional_test.go index d2184b40..67412e6e 100644 --- a/optional_test.go +++ b/optional_test.go @@ -3,6 +3,8 @@ package rxgo import ( "testing" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) @@ -30,7 +32,7 @@ func TestEmpty(t *testing.T) { got, err := empty.Get() assert.True(t, empty.IsEmpty()) if err != nil { - assert.Equal(t, &NoSuchElementError{}, err) + assert.IsType(t, &NoSuchElementError{}, errors.Cause(err)) } else { assert.Fail(t, "error is not nil") } From f77182365a51add3e516defd68f26b35a057efd5 Mon Sep 17 00:00:00 2001 From: teivah Date: Fri, 7 Jun 2019 21:45:20 +0100 Subject: [PATCH 2/2] Missing package --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 96b0d3dd..c3d0eb9e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,6 +20,7 @@ install: - go get github.com/onsi/ginkgo - go get github.com/onsi/gomega - go get golang.org/x/sync/semaphore + - go get github.com/pkg/errors script: - go test -v -race -cover -coverprofile=/tmp/coverage.out