Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip many rewritten, Concat with iterable, Merge with Iterable. #495

Closed
wants to merge 9 commits into from
Closed

Conversation

akarnokd
Copy link
Member

I've rewritten the Zip method to terminate as soon as possible. In addition, the concat(Iterable<Observable> and merge(Iterable<Observable>) where missing from Observable.

@cloudbees-pull-request-builder

RxJava-pull-requests #418 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

@akarnokd Thank you for this. I'm checking out the branch now to play with it and see how the problematic unit tests behave.

Would you mind re-submitting the pull request though after rebasing this onto a new branch so we don't have all the commit clutter resulting from you working on your master branch? The commit "Merge origin/master" akarnokd@0dcbe89 and commits before that are a result of merging /Netflix/RxJava master into your master then submitting back from the same branch. It is preferable to always commit only to a clean branch and submit a pull request from that so only the relevant commits are applied. The only time anything should commit to your forked master is when fetching from the upstream to sync back up.

Thanks!

return a;

public static <T1, T2, R> OnSubscribeFunc<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return zip(Arrays.asList(o1, o2), new FuncN<R>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of FuncN on all of these overloads has made this all be non-type-safe and require casting. Any particular reason why that was done?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, you've somewhat replicated the work that Functions.fromFunc does ... but it's throwing warnings everywhere. I'd like this to be cleaned up.

You can replace all that boilerplate with the utility functions that already exist such as:

return zip(Arrays.asList(o1, o2, o3), Functions.fromFunc(zipFunction));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By 'throwing warnings' I mean the compiler and IDE are warning about unchecked casts and generic arrays via varargs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll update the code.

@benjchristensen
Copy link
Member

I want to confirm a unit test for this code. @headinthebox can you please validate something?

The following unit test shows how this will correctly complete once the first stream completes. However, it still waits until ALL items are emitted before the onCompleted is sent. I want to ensure that's how it should behave as opposed to skipping those and completing as soon as onComplete is sent (like onError should).

    @Test
    public void testOnFirstCompletion() {
        PublishSubject<String> oA = PublishSubject.create();
        PublishSubject<String> oB = PublishSubject.create();

        @SuppressWarnings("unchecked")
        Observer<String> observer = mock(Observer.class);

        Observable<String> o = Observable.create(zip(oA, oB, getConcat2Strings()));
        o.subscribe(observer);

        InOrder inOrder = inOrder(observer);

        oA.onNext("a1");
        inOrder.verify(observer, never()).onNext(anyString());
        oB.onNext("b1");
        inOrder.verify(observer, times(1)).onNext("a1-b1");
        oB.onNext("b2");
        inOrder.verify(observer, never()).onNext(anyString());
        oA.onNext("a2");
        inOrder.verify(observer, times(1)).onNext("a2-b2");

        oA.onNext("a3");
        oA.onNext("a4");
        oA.onNext("a5");
        oA.onCompleted();

        // SHOULD ONCOMPLETE BE EMITTED HERE INSTEAD OF WAITING
       // FOR B3, B4, B5 TO BE EMITTED?

        oB.onNext("b3");
        oB.onNext("b4");
        oB.onNext("b5");

        inOrder.verify(observer, times(1)).onNext("a3-b3");
        inOrder.verify(observer, times(1)).onNext("a4-b4");
        inOrder.verify(observer, times(1)).onNext("a5-b5");

        // WE RECEIVE THE ONCOMPLETE HERE
        inOrder.verify(observer, times(1)).onCompleted();

        oB.onNext("b6");
        oB.onNext("b7");
        oB.onNext("b8");
        oB.onNext("b9");
        // never completes (infinite stream for example)

        // we should receive nothing else despite oB continuing after oA completed
        inOrder.verifyNoMoreInteractions();
    }

    private Func2<String, String, String> getConcat2Strings() {
        return new Func2<String, String, String>() {

            @Override
            public String call(String t1, String t2) {
                return t1 + "-" + t2;
            }
        };
    }

@benjchristensen
Copy link
Member

This confirms the onError event is propagated immediately:

@Test
    public void testOnErrorTermination() {
        PublishSubject<String> oA = PublishSubject.create();
        PublishSubject<String> oB = PublishSubject.create();

        @SuppressWarnings("unchecked")
        Observer<String> observer = mock(Observer.class);

        Observable<String> o = Observable.create(zip(oA, oB, getConcat2Strings()));
        o.subscribe(observer);

        InOrder inOrder = inOrder(observer);

        oA.onNext("a1");
        inOrder.verify(observer, never()).onNext(anyString());
        oB.onNext("b1");
        inOrder.verify(observer, times(1)).onNext("a1-b1");
        oB.onNext("b2");
        inOrder.verify(observer, never()).onNext(anyString());
        oA.onNext("a2");
        inOrder.verify(observer, times(1)).onNext("a2-b2");

        oA.onNext("a3");
        oA.onNext("a4");
        oA.onNext("a5");
        oA.onError(new RuntimeException("forced failure"));

        // it should emit failure immediately
        inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));

        oB.onNext("b3");
        oB.onNext("b4");
        oB.onNext("b5");
        oB.onNext("b6");
        oB.onNext("b7");
        oB.onNext("b8");
        oB.onNext("b9");
        // never completes (infinite stream for example)

        // we should receive nothing else despite oB continuing after oA completed
        inOrder.verifyNoMoreInteractions();
    }

@Override
@SuppressWarnings("unchecked")
public void onNext(T value) {
lock.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a separate ItemObserver for each Observable, why does it need a lock for onNext/onCompleted/onError? All events of an Observable sequence are already sequential, and even if we did need to synchronize we could just add the synchronized keyword to the methods (but we don't need to).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was ported from an Observer which enforced the event semantics, but perhaps unnecessarily. Anyways, what is the procedure on such pull requests: should I close this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay. The way pull request work is that they track whatever branch you are committing to until they are accepted. So you can submit new commits and they automatically append to the pull request. If they are large changes that negate previous commits it is generally cleaner to close the pull request, create a new branch and rebase the changes so the commits are clean and submit a new request.

@akarnokd
Copy link
Member Author

Sorry for the clutter, I'm new to Git in this manner. Will do a new pull shortly.

@benjchristensen
Copy link
Member

Sorry for the clutter, I'm new to Git in this manner. Will do a new pull shortly.

Not a problem, the Git flow for pull requests is not obvious when first starting. I totally messed up a few repos when I first started, had to wipe them out and start fresh with clean forks until I got the hang of it :-)

@benjchristensen
Copy link
Member

Closing as replaced by #497

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants