Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Zip Operator Using Lift [Preview] #785

Merged
merged 4 commits into from
Feb 5, 2014

Conversation

benjchristensen
Copy link
Member

A preview of a re-implementation of the zip operator.

This re-implements the zip operator but not yet the zipIterable so those unit tests are still failing. I'm submitting early to get a code review and will finish the zipIterable sometime early next week.

I have already had the concurrency model reviewed by two others and all unit tests are passing but further review is justified and welcome.

The performance of this implementation (without doing any profiling) has risen from 1.42m ops/second on v0.16 to 1.67m ops/second as measured on my machine for the simple test Observable.zip(from(1), from(1), {a, b -> a+b}) and 31k ops/second to 63k ops/second for Observable.zip(range(0, 100), range(100, 200), {a, b -> a+b}).

@cloudbees-pull-request-builder

RxJava-pull-requests #704 ABORTED

@SuppressWarnings("rawtypes")
@Override
public Observer<? super Observable[]> call(final Observer<? super R> observer) {
return new Observer<Observable[]>(observer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If observer is not SafeObserver, there will be a similar issue like #766 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because counter will stay 1 and won't let any tick() to execute after a completion condition.

@akarnokd
Copy link
Member

Other than the lack of null sentinel, it looks okay.

@benjchristensen
Copy link
Member Author

Thank you both for the review.

@Override
public void onError(Throwable e) {
// emit error and shut down
observer.onError(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my ambiguous comment.

Here is the test:

    @SuppressWarnings("unchecked")
    @Test
    public void testOnErrorWithInternalObserver() {

        Observable<String> w = Observable.zip(
                Observable.<Integer> error(new RuntimeException()),
                Observable.<Integer> error(new RuntimeException()),
                new Func2<Integer,Integer,String>(){

                    @Override
                    public String call(Integer t1, Integer t2) {
                        return t1 + "" + t2;
                    }
                });

        final Observer<String> observer = mock(Observer.class);
        w.subscribe(new Observer<String>() {

            @Override
            public void onCompleted() {
                observer.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onNext(String t) {
                observer.onNext(t);
            }

        });

        verify(observer, times(1)).onError(isA(RuntimeException.class)); // The current implementation will emit 2 errors.
        verify(observer, never()).onCompleted();
        verify(observer, never()).onNext(any(String.class));
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test need to be put in the OperatorZipTest.java.

@ronanM
Copy link

ronanM commented Jan 31, 2014

Is it possible to lift (2..n)-arity operator ?

@benjchristensen
Copy link
Member Author

Is it possible to lift (2..n)-arity operator ?

If you're referring to arguments passed into an Operator implementation, yes they can take anything you want to pass in.

If you're referring to something like zip(T1, T2), then it's a little different since a lift is performed on an Observable<T>, so the T contains everything that can be done against it.

In this particular instance I'm taking the types, creating an Observable<Observable[]> out of them and then lift the OperatorZip into it. The zip operator ignores the types internally for simplicity (so we don't have N number of implementations) and then re-establish the types when it emits.

return just(new Observable<?>[] { o1, o2, o3 }).lift(new OperatorZip<R>(zipFunction));

If the specific types wanted to be retained, then the T would need to become a type that contained everything.

I think of this somewhat like currying, where an Observable represents only a single type, so if I need to work on multiple types at once I need to compose them together first. With zip implemented, most other operators needing to combine types could first use zip to do the composition and then lift that Observable.

@cloudbees-pull-request-builder

RxJava-pull-requests #716 ABORTED

- Use new lift operator implement and non-blocking synchronization approach.
- I have had the concurrency model reviewed by some colleagues and all unit tests are passing but further review is justified and welcome.
benjchristensen added a commit that referenced this pull request Feb 5, 2014
Reimplement Zip Operator Using Lift [Preview]
@benjchristensen benjchristensen merged commit dc15e2b into ReactiveX:master Feb 5, 2014
@benjchristensen benjchristensen deleted the operator-zip branch February 5, 2014 03:53
@cloudbees-pull-request-builder

RxJava-pull-requests #732 FAILURE
Looks like there's a problem with this pull request

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants