From 788a46ba6f943ff7424c0b7ba01a72c4b758d9da Mon Sep 17 00:00:00 2001 From: teivah Date: Tue, 25 Jun 2019 00:32:44 +0100 Subject: [PATCH 1/7] Mocking first try --- observable_mock_test.go | 411 ++++++++++++++++++++++++++++++++++++++++ observable_test.go | 27 ++- 2 files changed, 430 insertions(+), 8 deletions(-) create mode 100644 observable_mock_test.go diff --git a/observable_mock_test.go b/observable_mock_test.go new file mode 100644 index 00000000..d1098dbd --- /dev/null +++ b/observable_mock_test.go @@ -0,0 +1,411 @@ +package rxgo + +import ( + "bufio" + "context" + "github.com/reactivex/rxgo/handlers" + "github.com/reactivex/rxgo/options" + "github.com/stretchr/testify/mock" + "strconv" + "strings" + "testing" +) + +type MockObservable struct { + mock.Mock +} + +type MockIterator struct { + mock.Mock +} + +type task struct { + observable int + item int + close bool +} + +func TestMockIterators(t *testing.T) { + mockIterators("1:1,1:2,2:0,1:3,1:4,2:0,1:5,1:6,2:0,1:close,2:close") +} + +func countTab(line string) int { + i := 0 + for _, runeValue := range line { + if runeValue == '\t' { + i++ + } else { + break + } + } + return i +} + +func mockIterators(in string) ([]*MockIterator, error) { + scanner := bufio.NewScanner(strings.NewReader(in)) + m := make(map[int]int) + tasks := make([]task, 0) + count := 0 + for scanner.Scan() { + s := scanner.Text() + if s == "" { + continue + } + observable := countTab(s) + v := strings.TrimSpace(s) + if v == "x" { + tasks = append(tasks, task{ + observable: observable, + close: true, + }) + } else { + n, err := strconv.Atoi(v) + if err != nil { + return nil, err + } + tasks = append(tasks, task{ + observable: observable, + item: n, + }) + } + if _, contains := m[observable]; !contains { + m[observable] = count + count++ + } + } + + observables := make([]*MockIterator, 0, len(m)) + calls := make([]*mock.Call, len(m)) + for i := 0; i < len(m); i++ { + observables = append(observables, new(MockIterator)) + } + + item, err := args(tasks[0]) + call := observables[0].On("Next", mock.Anything).Once().Return(item, err) + calls[0] = call + + var lastCh chan struct{} + lastObs := tasks[0].observable + for i := 1; i < len(tasks); i++ { + t := tasks[i] + index := m[t.observable] + obs := observables[index] + item, err := args(t) + if lastObs == t.observable { + if calls[index] == nil { + calls[index] = obs.On("Next", mock.Anything).Once().Arguments.Return(item, err) + } else { + calls[index].On("Next", mock.Anything).Once().Return(item, err) + } + } else { + lastObs = t.observable + if lastCh == nil { + ch := make(chan struct{}) + lastCh = ch + if calls[index] == nil { + calls[index] = obs.On("Next", mock.Anything).Once().Return(item, err). + Run(func(args mock.Arguments) { + run(ch, nil) + }) + } else { + calls[index].On("Next", mock.Anything).Once().Return(item, err). + Run(func(args mock.Arguments) { + run(ch, nil) + }) + } + } else { + ch := make(chan struct{}) + previous := lastCh + if calls[index] == nil { + calls[index] = obs.On("Next", mock.Anything).Once().Return(item, err). + Run(func(args mock.Arguments) { + run(ch, previous) + }) + } else { + calls[index].On("Next", mock.Anything).Once().Return(item, err). + Run(func(args mock.Arguments) { + run(ch, previous) + }) + } + lastCh = ch + } + } + } + return observables, nil +} + +func args(t task) (interface{}, error) { + if t.close { + return nil, &NoSuchElementError{} + } + return t.item, nil +} + +func run(wait chan struct{}, send chan struct{}) { + if send != nil { + send <- struct{}{} + } + if wait != nil { + <-wait + } +} + +func (m *MockIterator) Next(ctx context.Context) (interface{}, error) { + args := m.Called(ctx) + return args.Get(0), args.Error(1) +} + +func (m *MockObservable) Iterator(ctx context.Context) Iterator { + args := m.Called(ctx) + return args.Get(0).(Iterator) +} + +func (m *MockObservable) All(predicate Predicate) Single { + args := m.Called(predicate) + return args.Get(0).(Single) +} + +func (m *MockObservable) AverageFloat32() Single { + panic("implement me") +} + +func (m *MockObservable) AverageFloat64() Single { + panic("implement me") +} + +func (m *MockObservable) AverageInt() Single { + panic("implement me") +} + +func (m *MockObservable) AverageInt8() Single { + panic("implement me") +} + +func (m *MockObservable) AverageInt16() Single { + panic("implement me") +} + +func (m *MockObservable) AverageInt32() Single { + panic("implement me") +} + +func (m *MockObservable) AverageInt64() Single { + panic("implement me") +} + +func (m *MockObservable) BufferWithCount(count, skip int) Observable { + panic("implement me") +} + +func (m *MockObservable) BufferWithTime(timespan, timeshift Duration) Observable { + panic("implement me") +} + +func (m *MockObservable) BufferWithTimeOrCount(timespan Duration, count int) Observable { + panic("implement me") +} + +func (m *MockObservable) Contains(equal Predicate) Single { + panic("implement me") +} + +func (m *MockObservable) Count() Single { + panic("implement me") +} + +func (m *MockObservable) DefaultIfEmpty(defaultValue interface{}) Observable { + panic("implement me") +} + +func (m *MockObservable) Distinct(apply Function) Observable { + panic("implement me") +} + +func (m *MockObservable) DistinctUntilChanged(apply Function) Observable { + panic("implement me") +} + +func (m *MockObservable) DoOnEach(onNotification Consumer) Observable { + panic("implement me") +} + +func (m *MockObservable) ElementAt(index uint) Single { + panic("implement me") +} + +func (m *MockObservable) Filter(apply Predicate) Observable { + panic("implement me") +} + +func (m *MockObservable) First() Observable { + panic("implement me") +} + +func (m *MockObservable) FirstOrDefault(defaultValue interface{}) Single { + panic("implement me") +} + +func (m *MockObservable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable { + panic("implement me") +} + +func (m *MockObservable) ForEach(nextFunc handlers.NextFunc, errFunc handlers.ErrFunc, + doneFunc handlers.DoneFunc, opts ...options.Option) Observer { + panic("implement me") +} + +func (m *MockObservable) IgnoreElements() Observable { + panic("implement me") +} + +func (m *MockObservable) Last() Observable { + panic("implement me") +} + +func (m *MockObservable) LastOrDefault(defaultValue interface{}) Single { + panic("implement me") +} + +func (m *MockObservable) Map(apply Function) Observable { + panic("implement me") +} + +func (m *MockObservable) Max(comparator Comparator) OptionalSingle { + panic("implement me") +} + +func (m *MockObservable) Min(comparator Comparator) OptionalSingle { + panic("implement me") +} + +func (m *MockObservable) OnErrorResumeNext(resumeSequence ErrorToObservableFunction) Observable { + panic("implement me") +} + +func (m *MockObservable) OnErrorReturn(resumeFunc ErrorFunction) Observable { + panic("implement me") +} + +func (m *MockObservable) OnErrorReturnItem(item interface{}) Observable { + panic("implement me") +} + +func (m *MockObservable) Publish() ConnectableObservable { + panic("implement me") +} + +func (m *MockObservable) Reduce(apply Function2) OptionalSingle { + panic("implement me") +} + +func (m *MockObservable) Repeat(count int64, frequency Duration) Observable { + panic("implement me") +} + +func (m *MockObservable) Sample(obs Observable) Observable { + panic("implement me") +} + +func (m *MockObservable) Scan(apply Function2) Observable { + panic("implement me") +} + +func (m *MockObservable) SequenceEqual(obs Observable) Single { + panic("implement me") +} + +func (m *MockObservable) Skip(nth uint) Observable { + panic("implement me") +} + +func (m *MockObservable) SkipLast(nth uint) Observable { + panic("implement me") +} + +func (m *MockObservable) SkipWhile(apply Predicate) Observable { + panic("implement me") +} + +func (m *MockObservable) StartWithItems(item interface{}, items ...interface{}) Observable { + panic("implement me") +} + +func (m *MockObservable) StartWithIterable(iterable Iterable) Observable { + panic("implement me") +} + +func (m *MockObservable) StartWithObservable(observable Observable) Observable { + panic("implement me") +} + +func (m *MockObservable) Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer { + panic("implement me") +} + +func (m *MockObservable) SumFloat32() Single { + panic("implement me") +} + +func (m *MockObservable) SumFloat64() Single { + panic("implement me") +} + +func (m *MockObservable) SumInt64() Single { + panic("implement me") +} + +func (m *MockObservable) Take(nth uint) Observable { + panic("implement me") +} + +func (m *MockObservable) TakeLast(nth uint) Observable { + panic("implement me") +} + +func (m *MockObservable) TakeUntil(apply Predicate) Observable { + panic("implement me") +} + +func (m *MockObservable) TakeWhile(apply Predicate) Observable { + panic("implement me") +} + +func (m *MockObservable) Timeout(duration Duration) Observable { + panic("implement me") +} + +func (m *MockObservable) ToChannel(opts ...options.Option) Channel { + panic("implement me") +} + +func (m *MockObservable) ToMap(keySelector Function) Single { + panic("implement me") +} + +func (m *MockObservable) ToMapWithValueSelector(keySelector, valueSelector Function) Single { + panic("implement me") +} + +func (m *MockObservable) ToSlice() Single { + panic("implement me") +} + +func (m *MockObservable) ZipFromObservable(publisher Observable, zipper Function2) Observable { + panic("implement me") +} + +func (m *MockObservable) getIgnoreElements() bool { + panic("implement me") +} + +func (m *MockObservable) getOnErrorResumeNext() ErrorToObservableFunction { + panic("implement me") +} + +func (m *MockObservable) getOnErrorReturn() ErrorFunction { + panic("implement me") +} + +func (m *MockObservable) getOnErrorReturnItem() interface{} { + panic("implement me") +} diff --git a/observable_test.go b/observable_test.go index 5ca8c351..d85d2f4c 100644 --- a/observable_test.go +++ b/observable_test.go @@ -3,6 +3,7 @@ package rxgo import ( "context" "errors" + "github.com/stretchr/testify/mock" "net/http" "strconv" "sync" @@ -1642,14 +1643,24 @@ func TestSample(t *testing.T) { } func TestSample_NotRepeatedItems(t *testing.T) { - ch := make(chan interface{}) - obs := FromChannel(ch).Sample(Interval(make(chan struct{}), 50*time.Millisecond)) - go func() { - ch <- 1 - time.Sleep(200 * time.Millisecond) - close(ch) - }() - AssertObservable(t, obs, HasItems(1)) + //iterators, err := mockIterators("1:1,1:2,2:0,1:3,1:4,1:5,2:0,1:close,2:close") + iterators, err := mockIterators(` +1 +2 + 0 +x + x +`) + if err != nil { + assert.FailNow(t, err.Error()) + } + mockIterator1 := iterators[0] + mockIterator2 := iterators[1] + mockObservable1 := new(MockObservable) + mockObservable1.On("Iterator", mock.Anything).Return(mockIterator2) + obs := FromIterator(mockIterator1).Sample(mockObservable1) + + AssertObservable(t, obs, HasItems(2)) } func TestSample_SourceObsClosedBeforeIntervalFired(t *testing.T) { From 4bba843ae30879305a43ca33e5b4fe6c499d9b39 Mon Sep 17 00:00:00 2001 From: teivah Date: Tue, 25 Jun 2019 01:32:28 +0100 Subject: [PATCH 2/7] New synchronous iterable --- observable_mock_test.go | 19 ++++++++++++++++++- observable_test.go | 13 +++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/observable_mock_test.go b/observable_mock_test.go index d1098dbd..06fa6d85 100644 --- a/observable_mock_test.go +++ b/observable_mock_test.go @@ -11,6 +11,23 @@ import ( "testing" ) +func newSyncObservable(iterator Iterator) Observable { + return &observable{ + observableType: cold, + iterable: &syncIterable{ + iterator: iterator, + }, + } +} + +type syncIterable struct { + iterator Iterator +} + +func (s *syncIterable) Iterator(ctx context.Context) Iterator { + return s.iterator +} + type MockObservable struct { mock.Mock } @@ -93,7 +110,7 @@ func mockIterators(in string) ([]*MockIterator, error) { item, err := args(t) if lastObs == t.observable { if calls[index] == nil { - calls[index] = obs.On("Next", mock.Anything).Once().Arguments.Return(item, err) + calls[index] = obs.On("Next", mock.Anything).Once().Return(item, err) } else { calls[index].On("Next", mock.Anything).Once().Return(item, err) } diff --git a/observable_test.go b/observable_test.go index d85d2f4c..3c58a850 100644 --- a/observable_test.go +++ b/observable_test.go @@ -1648,6 +1648,15 @@ func TestSample_NotRepeatedItems(t *testing.T) { 1 2 0 +3 +4 +5 + 0 +6 + 0 +7 +8 + 0 x x `) @@ -1658,9 +1667,9 @@ x mockIterator2 := iterators[1] mockObservable1 := new(MockObservable) mockObservable1.On("Iterator", mock.Anything).Return(mockIterator2) - obs := FromIterator(mockIterator1).Sample(mockObservable1) + obs := newSyncObservable(mockIterator1).Sample(mockObservable1) - AssertObservable(t, obs, HasItems(2)) + AssertObservable(t, obs, HasItems(2, 5, 6, 8)) } func TestSample_SourceObsClosedBeforeIntervalFired(t *testing.T) { From cf039718390c0b1d9589cb9c43fc3485d33b284e Mon Sep 17 00:00:00 2001 From: teivah Date: Tue, 25 Jun 2019 01:32:45 +0100 Subject: [PATCH 3/7] Cleaning --- observable_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/observable_test.go b/observable_test.go index 3c58a850..6757d02f 100644 --- a/observable_test.go +++ b/observable_test.go @@ -1643,7 +1643,6 @@ func TestSample(t *testing.T) { } func TestSample_NotRepeatedItems(t *testing.T) { - //iterators, err := mockIterators("1:1,1:2,2:0,1:3,1:4,1:5,2:0,1:close,2:close") iterators, err := mockIterators(` 1 2 From c325ab6343e376bc9bd86a6506b9117eab369168 Mon Sep 17 00:00:00 2001 From: teivah Date: Tue, 25 Jun 2019 21:22:01 +0100 Subject: [PATCH 4/7] Mock observable simplification --- connectableobservable.go | 4 +- observable.go | 36 +++-- observable_mock_test.go | 306 +++------------------------------------ observable_test.go | 65 +++++---- observablecreate.go | 5 +- observablecreate_test.go | 30 ++-- 6 files changed, 98 insertions(+), 348 deletions(-) diff --git a/connectableobservable.go b/connectableobservable.go index 0905f582..1c04be96 100644 --- a/connectableobservable.go +++ b/connectableobservable.go @@ -289,8 +289,8 @@ func (c *connectableObservable) TakeWhile(apply Predicate) Observable { return c.observable.TakeWhile(apply) } -func (c *connectableObservable) Timeout(duration Duration) Observable { - return c.observable.Timeout(duration) +func (c *connectableObservable) Timeout(observable Observable) Observable { + return c.observable.Timeout(observable) } func (c *connectableObservable) ToChannel(opts ...options.Option) Channel { diff --git a/observable.go b/observable.go index 51613fcc..f8968084 100644 --- a/observable.go +++ b/observable.go @@ -75,7 +75,7 @@ type Observable interface { TakeLast(nth uint) Observable TakeUntil(apply Predicate) Observable TakeWhile(apply Predicate) Observable - Timeout(duration Duration) Observable + Timeout(observable Observable) Observable ToChannel(opts ...options.Option) Channel ToMap(keySelector Function) Single ToMapWithValueSelector(keySelector, valueSelector Function) Single @@ -1669,22 +1669,30 @@ func (o *observable) TakeWhile(apply Predicate) Observable { return newColdObservableFromFunction(f) } -func (o *observable) Timeout(duration Duration) Observable { +func (o *observable) Timeout(observable Observable) Observable { f := func(out chan interface{}) { - it := o.Iterator(context.Background()) - // TODO Handle cancel - ctx, _ := context.WithTimeout(context.Background(), duration.duration()) - for { - if item, err := it.Next(ctx); err == nil { - out <- item - } else { - out <- err - break + fmt.Printf("%v\n", "f") + ctx, cancel := context.WithCancel(context.Background()) + go func() { + it := o.Iterator(ctx) + for { + if item, err := it.Next(ctx); err == nil { + fmt.Printf("%v\n", item) + out <- item + } else { + fmt.Printf("cancel\n") + out <- err + break + } } - } - close(out) + }() + go func() { + it := observable.Iterator(context.Background()) + next, e := it.Next(context.Background()) + fmt.Printf("%v %v\n", next, e) + cancel() + }() } - return newColdObservableFromFunction(f) } diff --git a/observable_mock_test.go b/observable_mock_test.go index 06fa6d85..f916e460 100644 --- a/observable_mock_test.go +++ b/observable_mock_test.go @@ -3,36 +3,16 @@ package rxgo import ( "bufio" "context" - "github.com/reactivex/rxgo/handlers" - "github.com/reactivex/rxgo/options" "github.com/stretchr/testify/mock" "strconv" "strings" - "testing" ) -func newSyncObservable(iterator Iterator) Observable { - return &observable{ - observableType: cold, - iterable: &syncIterable{ - iterator: iterator, - }, - } -} - -type syncIterable struct { +type mockIterable struct { iterator Iterator } -func (s *syncIterable) Iterator(ctx context.Context) Iterator { - return s.iterator -} - -type MockObservable struct { - mock.Mock -} - -type MockIterator struct { +type mockIterator struct { mock.Mock } @@ -42,8 +22,17 @@ type task struct { close bool } -func TestMockIterators(t *testing.T) { - mockIterators("1:1,1:2,2:0,1:3,1:4,2:0,1:5,1:6,2:0,1:close,2:close") +func (s *mockIterable) Iterator(ctx context.Context) Iterator { + return s.iterator +} + +func mockObservable(iterator Iterator) Observable { + return &observable{ + observableType: cold, + iterable: &mockIterable{ + iterator: iterator, + }, + } } func countTab(line string) int { @@ -58,7 +47,7 @@ func countTab(line string) int { return i } -func mockIterators(in string) ([]*MockIterator, error) { +func mockIterators(in string) ([]*mockIterator, error) { scanner := bufio.NewScanner(strings.NewReader(in)) m := make(map[int]int) tasks := make([]task, 0) @@ -91,10 +80,10 @@ func mockIterators(in string) ([]*MockIterator, error) { } } - observables := make([]*MockIterator, 0, len(m)) + observables := make([]*mockIterator, 0, len(m)) calls := make([]*mock.Call, len(m)) for i := 0; i < len(m); i++ { - observables = append(observables, new(MockIterator)) + observables = append(observables, new(mockIterator)) } item, err := args(tasks[0]) @@ -102,20 +91,20 @@ func mockIterators(in string) ([]*MockIterator, error) { calls[0] = call var lastCh chan struct{} - lastObs := tasks[0].observable + lastObservableType := tasks[0].observable for i := 1; i < len(tasks); i++ { t := tasks[i] index := m[t.observable] obs := observables[index] item, err := args(t) - if lastObs == t.observable { + if lastObservableType == t.observable { if calls[index] == nil { calls[index] = obs.On("Next", mock.Anything).Once().Return(item, err) } else { calls[index].On("Next", mock.Anything).Once().Return(item, err) } } else { - lastObs = t.observable + lastObservableType = t.observable if lastCh == nil { ch := make(chan struct{}) lastCh = ch @@ -167,262 +156,7 @@ func run(wait chan struct{}, send chan struct{}) { } } -func (m *MockIterator) Next(ctx context.Context) (interface{}, error) { +func (m *mockIterator) Next(ctx context.Context) (interface{}, error) { args := m.Called(ctx) return args.Get(0), args.Error(1) } - -func (m *MockObservable) Iterator(ctx context.Context) Iterator { - args := m.Called(ctx) - return args.Get(0).(Iterator) -} - -func (m *MockObservable) All(predicate Predicate) Single { - args := m.Called(predicate) - return args.Get(0).(Single) -} - -func (m *MockObservable) AverageFloat32() Single { - panic("implement me") -} - -func (m *MockObservable) AverageFloat64() Single { - panic("implement me") -} - -func (m *MockObservable) AverageInt() Single { - panic("implement me") -} - -func (m *MockObservable) AverageInt8() Single { - panic("implement me") -} - -func (m *MockObservable) AverageInt16() Single { - panic("implement me") -} - -func (m *MockObservable) AverageInt32() Single { - panic("implement me") -} - -func (m *MockObservable) AverageInt64() Single { - panic("implement me") -} - -func (m *MockObservable) BufferWithCount(count, skip int) Observable { - panic("implement me") -} - -func (m *MockObservable) BufferWithTime(timespan, timeshift Duration) Observable { - panic("implement me") -} - -func (m *MockObservable) BufferWithTimeOrCount(timespan Duration, count int) Observable { - panic("implement me") -} - -func (m *MockObservable) Contains(equal Predicate) Single { - panic("implement me") -} - -func (m *MockObservable) Count() Single { - panic("implement me") -} - -func (m *MockObservable) DefaultIfEmpty(defaultValue interface{}) Observable { - panic("implement me") -} - -func (m *MockObservable) Distinct(apply Function) Observable { - panic("implement me") -} - -func (m *MockObservable) DistinctUntilChanged(apply Function) Observable { - panic("implement me") -} - -func (m *MockObservable) DoOnEach(onNotification Consumer) Observable { - panic("implement me") -} - -func (m *MockObservable) ElementAt(index uint) Single { - panic("implement me") -} - -func (m *MockObservable) Filter(apply Predicate) Observable { - panic("implement me") -} - -func (m *MockObservable) First() Observable { - panic("implement me") -} - -func (m *MockObservable) FirstOrDefault(defaultValue interface{}) Single { - panic("implement me") -} - -func (m *MockObservable) FlatMap(apply func(interface{}) Observable, maxInParallel uint) Observable { - panic("implement me") -} - -func (m *MockObservable) ForEach(nextFunc handlers.NextFunc, errFunc handlers.ErrFunc, - doneFunc handlers.DoneFunc, opts ...options.Option) Observer { - panic("implement me") -} - -func (m *MockObservable) IgnoreElements() Observable { - panic("implement me") -} - -func (m *MockObservable) Last() Observable { - panic("implement me") -} - -func (m *MockObservable) LastOrDefault(defaultValue interface{}) Single { - panic("implement me") -} - -func (m *MockObservable) Map(apply Function) Observable { - panic("implement me") -} - -func (m *MockObservable) Max(comparator Comparator) OptionalSingle { - panic("implement me") -} - -func (m *MockObservable) Min(comparator Comparator) OptionalSingle { - panic("implement me") -} - -func (m *MockObservable) OnErrorResumeNext(resumeSequence ErrorToObservableFunction) Observable { - panic("implement me") -} - -func (m *MockObservable) OnErrorReturn(resumeFunc ErrorFunction) Observable { - panic("implement me") -} - -func (m *MockObservable) OnErrorReturnItem(item interface{}) Observable { - panic("implement me") -} - -func (m *MockObservable) Publish() ConnectableObservable { - panic("implement me") -} - -func (m *MockObservable) Reduce(apply Function2) OptionalSingle { - panic("implement me") -} - -func (m *MockObservable) Repeat(count int64, frequency Duration) Observable { - panic("implement me") -} - -func (m *MockObservable) Sample(obs Observable) Observable { - panic("implement me") -} - -func (m *MockObservable) Scan(apply Function2) Observable { - panic("implement me") -} - -func (m *MockObservable) SequenceEqual(obs Observable) Single { - panic("implement me") -} - -func (m *MockObservable) Skip(nth uint) Observable { - panic("implement me") -} - -func (m *MockObservable) SkipLast(nth uint) Observable { - panic("implement me") -} - -func (m *MockObservable) SkipWhile(apply Predicate) Observable { - panic("implement me") -} - -func (m *MockObservable) StartWithItems(item interface{}, items ...interface{}) Observable { - panic("implement me") -} - -func (m *MockObservable) StartWithIterable(iterable Iterable) Observable { - panic("implement me") -} - -func (m *MockObservable) StartWithObservable(observable Observable) Observable { - panic("implement me") -} - -func (m *MockObservable) Subscribe(handler handlers.EventHandler, opts ...options.Option) Observer { - panic("implement me") -} - -func (m *MockObservable) SumFloat32() Single { - panic("implement me") -} - -func (m *MockObservable) SumFloat64() Single { - panic("implement me") -} - -func (m *MockObservable) SumInt64() Single { - panic("implement me") -} - -func (m *MockObservable) Take(nth uint) Observable { - panic("implement me") -} - -func (m *MockObservable) TakeLast(nth uint) Observable { - panic("implement me") -} - -func (m *MockObservable) TakeUntil(apply Predicate) Observable { - panic("implement me") -} - -func (m *MockObservable) TakeWhile(apply Predicate) Observable { - panic("implement me") -} - -func (m *MockObservable) Timeout(duration Duration) Observable { - panic("implement me") -} - -func (m *MockObservable) ToChannel(opts ...options.Option) Channel { - panic("implement me") -} - -func (m *MockObservable) ToMap(keySelector Function) Single { - panic("implement me") -} - -func (m *MockObservable) ToMapWithValueSelector(keySelector, valueSelector Function) Single { - panic("implement me") -} - -func (m *MockObservable) ToSlice() Single { - panic("implement me") -} - -func (m *MockObservable) ZipFromObservable(publisher Observable, zipper Function2) Observable { - panic("implement me") -} - -func (m *MockObservable) getIgnoreElements() bool { - panic("implement me") -} - -func (m *MockObservable) getOnErrorResumeNext() ErrorToObservableFunction { - panic("implement me") -} - -func (m *MockObservable) getOnErrorReturn() ErrorFunction { - panic("implement me") -} - -func (m *MockObservable) getOnErrorReturnItem() interface{} { - panic("implement me") -} diff --git a/observable_test.go b/observable_test.go index 6757d02f..5f57eed0 100644 --- a/observable_test.go +++ b/observable_test.go @@ -3,7 +3,6 @@ package rxgo import ( "context" "errors" - "github.com/stretchr/testify/mock" "net/http" "strconv" "sync" @@ -1283,7 +1282,7 @@ func TestBufferWithTimeWithMockedTime(t *testing.T) { timeshift.AssertNotCalled(t, "duration") } -func TestBufferWithTimeWithMinorMockedTime(t *testing.T) { +func TestBufferWithTime_MinorMockedTime(t *testing.T) { ch := make(chan interface{}) from := FromIterator(newIteratorFromChannel(ch)) @@ -1295,7 +1294,6 @@ func TestBufferWithTimeWithMinorMockedTime(t *testing.T) { obs := from.BufferWithTime(timespan, timeshift) - time.Sleep(10 * time.Millisecond) ch <- 1 close(ch) @@ -1656,6 +1654,9 @@ func TestSample_NotRepeatedItems(t *testing.T) { 7 8 0 + 0 +9 + 0 x x `) @@ -1664,11 +1665,9 @@ x } mockIterator1 := iterators[0] mockIterator2 := iterators[1] - mockObservable1 := new(MockObservable) - mockObservable1.On("Iterator", mock.Anything).Return(mockIterator2) - obs := newSyncObservable(mockIterator1).Sample(mockObservable1) + obs := mockObservable(mockIterator1).Sample(mockObservable(mockIterator2)) - AssertObservable(t, obs, HasItems(2, 5, 6, 8)) + AssertObservable(t, obs, HasItems(2, 5, 6, 8, 9)) } func TestSample_SourceObsClosedBeforeIntervalFired(t *testing.T) { @@ -1773,26 +1772,32 @@ func TestStartWithObservable_Empty2(t *testing.T) { AssertObservable(t, obs, HasItems(1, 2, 3)) } -//var _ = Describe("Timeout operator", func() { -// FIXME -//Context("when creating an observable with timeout operator", func() { -// ch := make(chan interface{}, 10) -// duration := WithDuration(pollingInterval) -// o := FromChannel(ch).Timeout(duration) -// Context("after a given period without items", func() { -// outNext, outErr, _ := subscribe(o) -// -// ch <- 1 -// ch <- 2 -// ch <- 3 -// time.Sleep(time.Second) -// ch <- 4 -// It("should receive the elements before the timeout", func() { -// Expect(pollItems(outNext, timeout)).Should(Equal([]interface{}{1, 2, 3})) -// }) -// It("should receive a TimeoutError", func() { -// Expect(pollItem(outErr, timeout)).Should(Equal(&TimeoutError{})) -// }) -// }) -//}) -//}) +func TestTimeout(t *testing.T) { + iterators, err := mockIterators(` +1 +2 +3 + 0 +4 +5 +x + x +`) + if err != nil { + assert.FailNow(t, err.Error()) + } + mockIterator1 := iterators[0] + mockIterator2 := iterators[1] + obs := mockObservable(mockIterator1).Timeout(mockObservable(mockIterator2)) + + // + // + //ch := make(chan interface{}, 10) + //obs := FromChannel(ch).Timeout(Timer(WithDuration(2000 * time.Millisecond))) + //ch <- 1 + //ch <- 2 + //ch <- 3 + //time.Sleep(time.Second) + //ch <- 4 + AssertObservable(t, obs, HasItems(1, 2, 3)) +} diff --git a/observablecreate.go b/observablecreate.go index b609f9cb..f278e12b 100644 --- a/observablecreate.go +++ b/observablecreate.go @@ -409,8 +409,7 @@ func Start(f Supplier, fs ...Supplier) Observable { return newColdObservableFromChannel(out) } -// Timer returns an Observable that emits the zeroed value of a float64 after a -// specified delay, and then completes. +// Timer returns an Observable that emits an empty structure after a specified delay, and then completes. func Timer(d Duration) Observable { out := make(chan interface{}) go func() { @@ -419,7 +418,7 @@ func Timer(d Duration) Observable { } else { time.Sleep(d.duration()) } - out <- 0. + out <- struct{}{} close(out) }() return newColdObservableFromChannel(out) diff --git a/observablecreate_test.go b/observablecreate_test.go index 43dea7b5..22553fb4 100644 --- a/observablecreate_test.go +++ b/observablecreate_test.go @@ -298,19 +298,23 @@ func TestMerge(t *testing.T) { } func TestAmb(t *testing.T) { - ch1 := make(chan interface{}, 3) - ch2 := make(chan interface{}, 3) - obs := Amb(FromChannel(ch1), FromChannel(ch2)) - ch1 <- 1 - ch1 <- 2 - ch1 <- 3 - close(ch1) - time.Sleep(wait) - ch2 <- 10 - ch2 <- 20 - ch2 <- 30 - close(ch2) - AssertObservable(t, obs, HasItems(1, 2, 3)) + iterators, err := mockIterators(` +1 +2 +x + 3 + 4 + x + 5 + x +`) + if err != nil { + assert.FailNow(t, err.Error()) + } + mockIterator1 := iterators[0] + mockIterator2 := iterators[1] + obs := Amb(mockObservable(mockIterator1), mockObservable(mockIterator2)) + AssertObservable(t, obs, HasItems(1, 2)) } // FIXME Not stable From 92d6959dcb17f55d5de80736eacb3407c50e5683 Mon Sep 17 00:00:00 2001 From: teivah Date: Tue, 25 Jun 2019 22:05:32 +0100 Subject: [PATCH 5/7] Improvement of the mock testing API (context management) --- observable.go | 6 +--- observable_mock_test.go | 62 +++++++++++++++++++++++++++++----------- observable_test.go | 41 +++++++++++--------------- observablecreate_test.go | 13 +++------ 4 files changed, 67 insertions(+), 55 deletions(-) diff --git a/observable.go b/observable.go index f8968084..e47e9103 100644 --- a/observable.go +++ b/observable.go @@ -1671,16 +1671,13 @@ func (o *observable) TakeWhile(apply Predicate) Observable { func (o *observable) Timeout(observable Observable) Observable { f := func(out chan interface{}) { - fmt.Printf("%v\n", "f") ctx, cancel := context.WithCancel(context.Background()) go func() { it := o.Iterator(ctx) for { if item, err := it.Next(ctx); err == nil { - fmt.Printf("%v\n", item) out <- item } else { - fmt.Printf("cancel\n") out <- err break } @@ -1688,8 +1685,7 @@ func (o *observable) Timeout(observable Observable) Observable { }() go func() { it := observable.Iterator(context.Background()) - next, e := it.Next(context.Background()) - fmt.Printf("%v %v\n", next, e) + it.Next(context.Background()) cancel() }() } diff --git a/observable_mock_test.go b/observable_mock_test.go index f916e460..d4dcad62 100644 --- a/observable_mock_test.go +++ b/observable_mock_test.go @@ -3,11 +3,15 @@ package rxgo import ( "bufio" "context" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "strconv" "strings" + "testing" ) +const signalCh = byte(0) + type mockIterable struct { iterator Iterator } @@ -26,7 +30,7 @@ func (s *mockIterable) Iterator(ctx context.Context) Iterator { return s.iterator } -func mockObservable(iterator Iterator) Observable { +func newMockObservable(iterator Iterator) Observable { return &observable{ observableType: cold, iterable: &mockIterable{ @@ -47,7 +51,7 @@ func countTab(line string) int { return i } -func mockIterators(in string) ([]*mockIterator, error) { +func mockObservables(t *testing.T, in string) []Observable { scanner := bufio.NewScanner(strings.NewReader(in)) m := make(map[int]int) tasks := make([]task, 0) @@ -67,7 +71,7 @@ func mockIterators(in string) ([]*mockIterator, error) { } else { n, err := strconv.Atoi(v) if err != nil { - return nil, err + assert.FailNow(t, err.Error()) } tasks = append(tasks, task{ observable: observable, @@ -80,14 +84,14 @@ func mockIterators(in string) ([]*mockIterator, error) { } } - observables := make([]*mockIterator, 0, len(m)) + iterators := make([]*mockIterator, 0, len(m)) calls := make([]*mock.Call, len(m)) for i := 0; i < len(m); i++ { - observables = append(observables, new(mockIterator)) + iterators = append(iterators, new(mockIterator)) } item, err := args(tasks[0]) - call := observables[0].On("Next", mock.Anything).Once().Return(item, err) + call := iterators[0].On("Next", mock.Anything).Once().Return(item, err) calls[0] = call var lastCh chan struct{} @@ -95,7 +99,7 @@ func mockIterators(in string) ([]*mockIterator, error) { for i := 1; i < len(tasks); i++ { t := tasks[i] index := m[t.observable] - obs := observables[index] + obs := iterators[index] item, err := args(t) if lastObservableType == t.observable { if calls[index] == nil { @@ -111,12 +115,12 @@ func mockIterators(in string) ([]*mockIterator, error) { if calls[index] == nil { calls[index] = obs.On("Next", mock.Anything).Once().Return(item, err). Run(func(args mock.Arguments) { - run(ch, nil) + run(args, ch, nil) }) } else { calls[index].On("Next", mock.Anything).Once().Return(item, err). Run(func(args mock.Arguments) { - run(ch, nil) + run(args, ch, nil) }) } } else { @@ -125,19 +129,24 @@ func mockIterators(in string) ([]*mockIterator, error) { if calls[index] == nil { calls[index] = obs.On("Next", mock.Anything).Once().Return(item, err). Run(func(args mock.Arguments) { - run(ch, previous) + run(args, ch, previous) }) } else { calls[index].On("Next", mock.Anything).Once().Return(item, err). Run(func(args mock.Arguments) { - run(ch, previous) + run(args, ch, previous) }) } lastCh = ch } } } - return observables, nil + + observables := make([]Observable, 0, len(iterators)) + for _, iterator := range iterators { + observables = append(observables, newMockObservable(iterator)) + } + return observables } func args(t task) (interface{}, error) { @@ -147,16 +156,35 @@ func args(t task) (interface{}, error) { return t.item, nil } -func run(wait chan struct{}, send chan struct{}) { +func run(args mock.Arguments, wait chan struct{}, send chan struct{}) { if send != nil { send <- struct{}{} } - if wait != nil { - <-wait + if wait == nil { + return + } + if len(args) == 1 { + if ctx, ok := args[0].(context.Context); ok { + if sig, ok := ctx.Value(signalCh).(chan struct{}); ok { + select { + case <-wait: + case <-ctx.Done(): + sig <- struct{}{} + } + return + } + } } + <-wait } func (m *mockIterator) Next(ctx context.Context) (interface{}, error) { - args := m.Called(ctx) - return args.Get(0), args.Error(1) + sig := make(chan struct{}, 1) + outputs := m.Called(context.WithValue(ctx, signalCh, sig)) + select { + case <-sig: + return nil, &CancelledIteratorError{} + default: + return outputs.Get(0), outputs.Error(1) + } } diff --git a/observable_test.go b/observable_test.go index 5f57eed0..ee5e8ab2 100644 --- a/observable_test.go +++ b/observable_test.go @@ -1641,7 +1641,7 @@ func TestSample(t *testing.T) { } func TestSample_NotRepeatedItems(t *testing.T) { - iterators, err := mockIterators(` + observables := mockObservables(t, ` 1 2 0 @@ -1660,12 +1660,7 @@ func TestSample_NotRepeatedItems(t *testing.T) { x x `) - if err != nil { - assert.FailNow(t, err.Error()) - } - mockIterator1 := iterators[0] - mockIterator2 := iterators[1] - obs := mockObservable(mockIterator1).Sample(mockObservable(mockIterator2)) + obs := observables[0].Sample(observables[1]) AssertObservable(t, obs, HasItems(2, 5, 6, 8, 9)) } @@ -1773,7 +1768,7 @@ func TestStartWithObservable_Empty2(t *testing.T) { } func TestTimeout(t *testing.T) { - iterators, err := mockIterators(` + observables := mockObservables(t, ` 1 2 3 @@ -1783,21 +1778,19 @@ func TestTimeout(t *testing.T) { x x `) - if err != nil { - assert.FailNow(t, err.Error()) - } - mockIterator1 := iterators[0] - mockIterator2 := iterators[1] - obs := mockObservable(mockIterator1).Timeout(mockObservable(mockIterator2)) - - // - // - //ch := make(chan interface{}, 10) - //obs := FromChannel(ch).Timeout(Timer(WithDuration(2000 * time.Millisecond))) - //ch <- 1 - //ch <- 2 - //ch <- 3 - //time.Sleep(time.Second) - //ch <- 4 + obs := observables[0].Timeout(observables[1]) + AssertObservable(t, obs, HasItems(1, 2, 3)) +} + +func TestTimeout_ClosedChannel(t *testing.T) { + observables := mockObservables(t, ` +1 +2 +3 +x + 0 + x +`) + obs := observables[0].Timeout(observables[1]) AssertObservable(t, obs, HasItems(1, 2, 3)) } diff --git a/observablecreate_test.go b/observablecreate_test.go index 22553fb4..b1eb44ed 100644 --- a/observablecreate_test.go +++ b/observablecreate_test.go @@ -267,14 +267,14 @@ func TestTimer(t *testing.T) { obs := Timer(d) - AssertObservable(t, obs, HasItems(float64(0))) + AssertObservable(t, obs, HasItems(struct{}{})) d.AssertCalled(t, "duration") } func TestTimerWithNilDuration(t *testing.T) { obs := Timer(nil) - AssertObservable(t, obs, HasItems(float64(0))) + AssertObservable(t, obs, HasItems(struct{}{})) } func TestMerge(t *testing.T) { @@ -298,7 +298,7 @@ func TestMerge(t *testing.T) { } func TestAmb(t *testing.T) { - iterators, err := mockIterators(` + observables := mockObservables(t, ` 1 2 x @@ -308,12 +308,7 @@ x 5 x `) - if err != nil { - assert.FailNow(t, err.Error()) - } - mockIterator1 := iterators[0] - mockIterator2 := iterators[1] - obs := Amb(mockObservable(mockIterator1), mockObservable(mockIterator2)) + obs := Amb(observables[0], observables[1]) AssertObservable(t, obs, HasItems(1, 2)) } From bbcb260490261693accd74b7008de779d7e254c2 Mon Sep 17 00:00:00 2001 From: teivah Date: Tue, 25 Jun 2019 23:27:17 +0100 Subject: [PATCH 6/7] Improvement of the mock testing API (errors management) --- observable_mock_test.go | 25 ++++++++++++-- observablecreate_test.go | 74 +++++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 39 deletions(-) diff --git a/observable_mock_test.go b/observable_mock_test.go index d4dcad62..fdb0fa5d 100644 --- a/observable_mock_test.go +++ b/observable_mock_test.go @@ -3,6 +3,7 @@ package rxgo import ( "bufio" "context" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "strconv" @@ -12,6 +13,8 @@ import ( const signalCh = byte(0) +var mockError = errors.New("") + type mockIterable struct { iterator Iterator } @@ -23,6 +26,7 @@ type mockIterator struct { type task struct { observable int item int + error error close bool } @@ -51,6 +55,7 @@ func countTab(line string) int { return i } +// TODO Causality with more than two observables func mockObservables(t *testing.T, in string) []Observable { scanner := bufio.NewScanner(strings.NewReader(in)) m := make(map[int]int) @@ -63,12 +68,18 @@ func mockObservables(t *testing.T, in string) []Observable { } observable := countTab(s) v := strings.TrimSpace(s) - if v == "x" { + switch v { + case "x": tasks = append(tasks, task{ observable: observable, close: true, }) - } else { + case "e": + tasks = append(tasks, task{ + observable: observable, + error: mockError, + }) + default: n, err := strconv.Atoi(v) if err != nil { assert.FailNow(t, err.Error()) @@ -124,7 +135,11 @@ func mockObservables(t *testing.T, in string) []Observable { }) } } else { - ch := make(chan struct{}) + var ch chan struct{} + // If this is the latest task we do not set any wait channel + if i != len(tasks)-1 { + ch = make(chan struct{}) + } previous := lastCh if calls[index] == nil { calls[index] = obs.On("Next", mock.Anything).Once().Return(item, err). @@ -153,6 +168,9 @@ func args(t task) (interface{}, error) { if t.close { return nil, &NoSuchElementError{} } + if t.error != nil { + return t.error, nil + } return t.item, nil } @@ -180,6 +198,7 @@ func run(args mock.Arguments, wait chan struct{}, send chan struct{}) { func (m *mockIterator) Next(ctx context.Context) (interface{}, error) { sig := make(chan struct{}, 1) + defer close(sig) outputs := m.Called(context.WithValue(ctx, signalCh, sig)) select { case <-sig: diff --git a/observablecreate_test.go b/observablecreate_test.go index b1eb44ed..ff21e185 100644 --- a/observablecreate_test.go +++ b/observablecreate_test.go @@ -228,18 +228,6 @@ func TestFromStatefulIterable(t *testing.T) { AssertObservable(t, obs, IsEmpty()) } -type statelessIterable struct { - count int -} - -func (it *statelessIterable) Next(ctx context.Context) (interface{}, error) { - it.count++ - if it.count < 3 { - return it.count, nil - } - return nil, &NoSuchElementError{} -} - func TestRange(t *testing.T) { obs, err := Range(5, 3) if err != nil { @@ -312,30 +300,44 @@ x AssertObservable(t, obs, HasItems(1, 2)) } -// FIXME Not stable -//func TestCombineLatest(t *testing.T) { -// ch1 := make(chan interface{}, 10) -// ch2 := make(chan interface{}, 10) -// ch3 := make(chan interface{}, 10) -// -// obs := CombineLatest(func(ii ...interface{}) interface{} { -// sum := 0 -// for _, v := range ii { -// sum += v.(int) -// } -// return sum -// }, FromChannel(ch1), FromChannel(ch2), FromChannel(ch3)) -// -// //TODO AssertObservableEventually(t, obs, wait, IsEmpty()) -// ch1 <- 1 -// close(ch1) -// ch2 <- 2 -// close(ch2) -// ch3 <- 3 -// close(ch3) -// AssertObservable(t, obs, HasItems(6), HasNotRaisedAnyError()) -// //TODO AssertObservableEventually(t, obs, wait, 6, 13 etc.) -//} +func TestCombineLatest(t *testing.T) { + observables := mockObservables(t, ` +1 + 10 +2 + 11 +102 +x + x +`) + obs := CombineLatest(func(ii ...interface{}) interface{} { + sum := 0 + for _, v := range ii { + sum += v.(int) + } + return sum + }, observables[0], observables[1:]...) + AssertObservable(t, obs, HasItems(11, 12, 13, 113), HasNotRaisedAnyError()) +} + +func TestCombineLatest_Error(t *testing.T) { + observables := mockObservables(t, ` +1 + 10 +2 + e +102 +x +`) + obs := CombineLatest(func(ii ...interface{}) interface{} { + sum := 0 + for _, v := range ii { + sum += v.(int) + } + return sum + }, observables[0], observables[1:]...) + AssertObservable(t, obs, HasItems(11, 12), HasRaisedAnError()) +} // FIXME //Context("when creating a hot observable with FromEventSource operator without back-pressure strategy", func() { From cb0461beb1e12701d09718aa01cc14c9228165a0 Mon Sep 17 00:00:00 2001 From: teivah Date: Tue, 25 Jun 2019 23:29:37 +0100 Subject: [PATCH 7/7] gofmt --- observable_mock_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/observable_mock_test.go b/observable_mock_test.go index fdb0fa5d..963e5ac8 100644 --- a/observable_mock_test.go +++ b/observable_mock_test.go @@ -3,12 +3,13 @@ package rxgo import ( "bufio" "context" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "strconv" "strings" "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) const signalCh = byte(0)