Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip calls onNext later than C#'s Zip does #387

Closed
samuelgruetter opened this issue Sep 17, 2013 · 5 comments
Closed

Zip calls onNext later than C#'s Zip does #387

samuelgruetter opened this issue Sep 17, 2013 · 5 comments

Comments

@samuelgruetter
Copy link
Contributor

In the C# example below, the zipped Observable completes as soon as o3 has completed, because all of o3's elements have been paired with an element from o6.

static void Main() {
    var o3 = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Take(3);
    var o6 = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Take(10);

    var watch = new Stopwatch();
    watch.Start();

    Observable.Zip(o3, o6).Subscribe(
        list => Console.WriteLine("(" + list[0] + ", " + list[1] + ") at t=" + watch.ElapsedMilliseconds), 
        e => Console.WriteLine(e.StackTrace),
        () => Console.WriteLine("complete at t=" + watch.ElapsedMilliseconds)
    );
    Console.ReadLine();
}

outputs

(0, 0) at t=1055
(1, 1) at t=2045
(2, 2) at t=3047
complete at t=4058

In the corresponding Java code, however, the zipped Observable only completes once both o3 and o6 have completed.

static Func2<Long, Long, Long> zipFunc = new Func2<Long, Long, Long>() {
    public Long call(Long n1, Long n2) {
        if (n1.equals(n2)) {
            return n1;
        } else {
            throw new RuntimeException("numbers not equal");
        }
    }
};

public static void main(String[] args) {
    Observable<Long> o3 = Observable.interval(1000, TimeUnit.MILLISECONDS).take(3);
    Observable<Long> o6 = Observable.interval(1000, TimeUnit.MILLISECONDS).take(10);

    final long startTime = System.currentTimeMillis();

    Observable.zip(o3, o6, zipFunc).subscribe(
        new Action1<Long>() { public void call(Long n) { 
            System.out.println(n + " at t=" + (System.currentTimeMillis()-startTime));
        }},
        new Action1<Throwable>() { public void call(Throwable t) {
            t.printStackTrace();
        }},
        new Action0() { public void call() {
            System.out.println("complete at t=" + (System.currentTimeMillis()-startTime));
        }}
    );
}

outputs

0 at t=1019
1 at t=2019
2 at t=3019
complete at t=10019

I'd like RxJava to follow C# here, unless there are very good reasons against doing so.

This difference becomes even more important if one of the observables never completes: Then, the zipped Observable never completes either, which was very unexpected for me.

@benjchristensen
Copy link
Member

This unit test demonstrates the issue:

        @Test
        public void testOnFirstCompletion() {
            PublishSubject<String> oA = PublishSubject.create();
            PublishSubject<String> oB = PublishSubject.create();

            @SuppressWarnings("unchecked")
            Observer<String> observer = mock(Observer.class);

            Observable<String> o = Observable.create(zip(oA, oB, getConcat2Strings()));
            o.subscribe(observer);

            InOrder inOrder = inOrder(observer);

            oA.onNext("a1");
            inOrder.verify(observer, never()).onNext(anyString());
            oB.onNext("b1");
            inOrder.verify(observer, times(1)).onNext("a1-b1");
            oB.onNext("b2");
            inOrder.verify(observer, never()).onNext(anyString());
            oA.onNext("a2");
            inOrder.verify(observer, times(1)).onNext("a2-b2");

            oA.onNext("a3");
            oA.onNext("a4");
            oA.onNext("a5");
            oA.onCompleted();

            // assert we complete the zip stream here
            inOrder.verify(observer, times(1)).onCompleted();

            oB.onNext("b3");
            oB.onNext("b4");
            oB.onNext("b5");
            oB.onNext("b6");
            oB.onNext("b7");
            // never completes (infinite stream for example)

            // we should receive nothing else despite oB continuing after oA completed
            inOrder.verifyNoMoreInteractions();
        }

@benjchristensen
Copy link
Member

I need to play more with the C# version before I finish the changes, as fixing this unit test could be done in different ways, and the most obvious one breaks other unit tests.

@benjchristensen
Copy link
Member

Branch with unit test at benjchristensen@6921f72

@akarnokd
Copy link
Member

Hello, I've ported my Zip implementation and it appears to be doing almost the expected behavior: akarnokd@cd0a5e7

0 at t=1016
1 at t=2016
2 at t=3017
complete at t=3017

The difference to Rx seems to be that it terminates right after the 3rd item whereas Rx.NET after rejecting the 4th item of the second observable.

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Nov 22, 2013
Added tests while validating pull request.

This fixes issue ReactiveX#387
@benjchristensen
Copy link
Member

I believe this is fixed in #510 thanks to @akarnokd

rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
Added tests while validating pull request.

This fixes issue ReactiveX#387
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
)

* deprecating async retry and merge it with retry

* add async retry unit test

* fix the java doc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants