Operator: Window #102

Closed
benjchristensen opened this Issue Jan 18, 2013 · 5 comments

Projects

None yet

3 participants

@daveray
Contributor
daveray commented Jan 18, 2013

That's enough issues I think. :)

@bcotton
bcotton commented Aug 18, 2013

Started work on window()

@benjchristensen
Member

Thanks for getting involved @bcotton !

@bcotton
bcotton commented Aug 31, 2013

Trying to figure out what the behavior should be, for windows when onComplete() is never called on the observer.

This test fails, as there are "hanging" values, and I only get back the first window.

        public void testNonOverlappingWindowsNoComplete() {
            Observable<Observable<String>> subject = Observable.create(new Func1<Observer<String>, Subscription>() {
                @Override
                public Subscription call(Observer<String> observer) {
                    observer.onNext("one");
                    observer.onNext("two");
                    observer.onNext("three");
                    observer.onNext("four");
                    observer.onNext("five");
                    return Subscriptions.empty();
                }
            }).window(3);

            List<List<String>> windows = toLists(subject);

            assertEquals(2, windows.size());
            assertEquals(list("one", "two", "three"), windows.get(0));
            assertEquals(list("four", "five"), windows.get(1));
        }

Perhaps I'm missing something about the internals of Observables and what happens at "the end".

@benjchristensen
Member

Each window Observable you emit will need to invoke onCompleted as that will not come from the origin.

As for the origin, it is possible for it to never emit an onCompleted if it is an infinite stream, in which case the Observable would continue emitting events and windows indefinitely.

However, in your example above, since you're not trying to do an infinite stream, you need to call observer.onCompleted() after observer.onNext("five") to comply with the Observable contract.

You can see some examples here: https://github.com/Netflix/RxJava/wiki/How-To-Use#implementing-an-observable

The contract for an Observer is that onNext can be called 0 or more times and then onError or onCompleted called once with nothing following. (Javadoc) If it's an infinite stream then onCompleted may never be called.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment