Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator: CombineLatest #29

Closed
benjchristensen opened this issue Jan 18, 2013 · 10 comments
Closed

Operator: CombineLatest #29

benjchristensen opened this issue Jan 18, 2013 · 10 comments

Comments

@benjchristensen
Copy link
Member

http://msdn.microsoft.com/en-us/library/hh211991(v=vs.103).aspx

@jmhofer
Copy link
Contributor

jmhofer commented Mar 25, 2013

What's the status on this operator? - There is some code, but the tests look a bit different from what I would expect this operator to do.

Shouldn't

            w1.Observer.onNext("1a");
            w1.Observer.onCompleted();

            w2.Observer.onNext("2a");
            w2.Observer.onNext("2b");
            w2.Observer.onCompleted();

            w3.Observer.onNext("3a");
            w3.Observer.onNext("3b");
            w3.Observer.onNext("3c");
            w3.Observer.onNext("3d");
            w3.Observer.onCompleted();

generate "1a2b3a", then "1a2b3b", then "1a2b3c", then "1a2b3d"? Because it starts with the latest values of w1 and w2 as soon as w3 fires its first string? - To me, this currently looks a bit like a mixture between zip and combineLatest. But maybe I'm understanding the semantics of combineLatest wrong.

The current code seems to still "wait" for an onNext from each combined observable before firing its own onNext? I'd love to try and fix that, if you can confirm that I understand combineLatest correctly.

@benjchristensen
Copy link
Member Author

As I'm reading through this I'm using the following to research the expected behavior:

http://blogs.microsoft.co.il/blogs/bnaya/archive/2010/03/10/rx-for-beginners-part-8-combine-latest-expression.aspx
http://theburningmonk.com/2010/03/rx-framework-iobservable-combinelatest/

Like IObservable.Zip, IObservable.CombineLatest com­bines ‘pairs’ of val­ues from the two observ­able col­lec­tions, but unlike Zip when a new value becomes avail­able on one col­lec­tion it does not wait till a new value to be avail­able on the other col­lec­tion, instead it takes what­ever the lat­est value is from the other col­lec­tion (pro­vided there is one):

The test case appears to be as follows:

x.onNext(1)
y.onNext("a")
x.onNext(2)
x.onNext(3)
y.onNext("b")
x.onNext(4)

This should result in:

a1
a2
a3
b3
b4

On each onNext from whatever sequence sends it the combination will be emitted using whatever the latest value is.

Extrapolating that into a unit test I get something like this:

        @Test
        public void testCombineLatestDifferentLengthObservableSequencesWithInterleaving1() {
            Observer<String> w = mock(Observer.class);

            TestObservable x = new TestObservable();
            TestObservable y = new TestObservable();

            Observable<String> combineLatestW = Observable.create(combineLatest(y, x, getConcat2StringsCombineLatestFunction()));
            combineLatestW.subscribe(w);

            /* simulate sending data */
            x.Observer.onNext("1");
            y.Observer.onNext("a");
            x.Observer.onNext("2");
            x.Observer.onNext("3");
            y.Observer.onNext("b");
            x.Observer.onNext("4");

            x.Observer.onCompleted();
            y.Observer.onCompleted();

            /* we should have been called 5 times on the Observer */
            InOrder inOrder = inOrder(w);
            inOrder.verify(w).onNext("a1");
            inOrder.verify(w).onNext("a2");
            inOrder.verify(w).onNext("a3");
            inOrder.verify(w).onNext("b3");
            inOrder.verify(w).onNext("b4");

            inOrder.verify(w, times(1)).onCompleted();
        }

This test fails with current code.

So yes, the current implementation is wrong - and it's crazy complicated.

I'll take a look at your pull request for this fix (hopefully tomorrow, though I have a lot of meetings so ...)

Thank you @jmhofer

@jmhofer
Copy link
Contributor

jmhofer commented Mar 27, 2013

I agree. Your test should work with my pull request as I've added a similar test. Also, I simplified the code a bit, but I wasn't sure how thoroughly I could/should rework it, so I kept the basic structure (I guess the complexity mostly comes from the Zip operator it seems to have been copied from originally).

The one thing I'm still unsure of is when the combined stream should complete. That first of your links above says: "the Combine Latest processing will come to end either when one of the stream will complete or throw exception." - However I don't really see a reason why it should complete before all streams are complete. - I haven't found any other specification yet for cross-checking this.

@benjchristensen
Copy link
Member Author

You can change the implementation to whatever makes sense as I obviously misunderstood the behavior when I implemented this long ago and stole the base functionality from zip which starts it from the wrong place altogether since it shouldn't do queuing and thus doesn't need a lot (or any?) of the synchronization the zip requires.

Your last question is a good one. If an onError is received at any time the whole thing should finish and propagate the error.

For the onCompleted question I'm not sure ... the C# code will provide the definitive answer. I'll try to review that (no time right now) unless you can beat me to it or find a better answer somewhere else.

https://rx.codeplex.com/SourceControl/changeset/view/aa25748a430e#Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/CombineLatest.cs

Another way would be a Rx.Net or RxJS test case if you have access to either of those or someone who does.

@jmhofer
Copy link
Contributor

jmhofer commented Mar 27, 2013

The C# code looks very much like they complete only when all streams are complete (when there are no errors). - That's good, then the pull request code should already handle that correctly.

@benjchristensen
Copy link
Member Author

Great, then I'll proceed with the review and merge. I appreciate your willingness to dive into this one and deal with that gnarly code.

@benjchristensen
Copy link
Member Author

So if I understand correctly, synchrononous (blocking) sequences make combineLatest behavior rather odd, such as this where only the last value of the first sequence will ever show up in the combined values:

            Observable<String> w = Observable.create(combineLatest(Observable.toObservable("one", "two"), Observable.toObservable(2, 3, 4), combineLatestFunction));
            w.subscribe(aObserver);

            verify(aObserver, times(1)).onNext("two2");
            verify(aObserver, times(1)).onNext("two3");
            verify(aObserver, times(1)).onNext("two4");

@jmhofer
Copy link
Contributor

jmhofer commented Mar 27, 2013

Yes, I think so. - I came to that conclusion, too. "latest" doesn't really make much sense in the synchronous case.

@jmhofer
Copy link
Contributor

jmhofer commented Mar 27, 2013

I updated my pull request: I had previously forgotten to adapt the comments. Also, I cleaned up the code a bit and tried to fix synchronization (it's hard, as always...)

@benjchristensen
Copy link
Member Author

I have merged pull request #207 to resolve this issue.

jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
* Issue#15 New optimized implementation of CircularFifoBuffer
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
* test coverage added + jacoco excludes for benchmark classes

* Issue#15 circular fifo bufer optimization (ReactiveX#29)

* Issue#15 New optimized implementation of CircularFifoBuffer

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Updated documentation

* Move all test reporting to cobertura

* Time alignment for decorator test

* Additional tests and time alignment in spin loop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants