Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lift and Observer+Subscription #784

Merged
merged 24 commits into from
Jan 26, 2014

Conversation

benjchristensen
Copy link
Member

Make Observer implement Subscription and rename bind to lift as per decisions in #775

Signatures are now:

// Observable.create
public final static <T> Observable<T> create(OnSubscribe<T> f)

// Observable.OnSubscribe typed function interface
public static interface OnSubscribe<T> extends Action1<Observer<? super T>>

// lift function
public <R> Observable<R> lift(final Func1<Observer<? super R>, Observer<? super T>> bind)

// Observer
public abstract class Observer<T> implements Subscription {
     public abstract void onNext(T t);
     public abstract void onError(Throwable e);
     public abstract void onCompleted();
     public final void add(Subscription s)
     public final void unsubscribe()
     public final boolean isUnsubscribed()
}

// Subject
public abstract class Subject<T, R> extends Observer<T> {
    public abstract Observable<R> toObservable();
}

This is a major set of changes to the internals, particularly the unit tests because Mockito has issues with abstract classes as opposed to Observer being an interface.

There are still some unit tests failing that I haven't yet figured out:

  • OperationJoinsTest.whenComplicated in rxjava-core
  • OperationConditionalsTest.testDoWhileManyTimes in rxjava-computation-expressions
  • OperationConditionalsTest.testWhileDoManyTimes in rxjava-computation-expressions

I intend on merging this sooner rather than later so everyone can be working off the same codebase, even though I do not consider this code ready for release, even once those unit tests are fixed.

Interestingly, these performance tests on my machine are much better:

// OperatorFromIterablePerformance.timeTenLongs()
v0.16
     * Run: 10 - 8,096,667 ops/sec
     * Run: 11 - 8,382,131 ops/sec
     * Run: 12 - 8,256,288 ops/sec
     * Run: 13 - 8,139,703 ops/sec
     * Run: 14 - 8,011,023 ops/sec

... after v0.17 work:

     * Run: 10 - 31,296,553 ops/sec
     * Run: 11 - 30,080,435 ops/sec
     * Run: 12 - 31,886,941 ops/sec
     * Run: 13 - 32,281,807 ops/sec
     * Run: 14 - 33,519,028 ops/sec


// OperatorMapPerformance.timeMapPlusOne();

v0.16

     * Run: 10 - 11,375,632 ops/sec
     * Run: 11 - 11,390,325 ops/sec
     * Run: 12 - 11,655,527 ops/sec
     * Run: 13 - 11,528,440 ops/sec
     * Run: 14 - 11,321,181 ops/sec

... after v0.17 work:

     * Run: 10 - 17,674,464 ops/sec
     * Run: 11 - 17,890,239 ops/sec
     * Run: 12 - 17,919,155 ops/sec
     * Run: 13 - 16,356,974 ops/sec
     * Run: 14 - 16,723,414 ops/sec

// OperatorMergePerformance.timeRepetitionsEmissionSynchronous()

v0.16

     * Run: 10 - 32,609,617 ops/sec
     * Run: 11 - 33,511,839 ops/sec
     * Run: 12 - 34,768,096 ops/sec
     * Run: 13 - 32,376,499 ops/sec
     * Run: 14 - 33,166,835 ops/sec

... after v0.17 work:

     * Run: 10 - 45,945,747 ops/sec
     * Run: 11 - 46,342,209 ops/sec
     * Run: 12 - 44,493,090 ops/sec
     * Run: 13 - 44,999,640 ops/sec
     * Run: 14 - 47,389,771 ops/sec

Follow same pattern as rx.observables, rx.schedulers, rx.subjects, rx.subscriptions
Now that Observer is an abstract class, Mockito is having issues with it so unit tests are a mess.
- work around inability of Mockito to correctly mock an abstract class
- 15 of 590 tests still failing
Remove possibility of infinite loop
… numbers from my machine. Can’t fully explain the increases in performance.
- confirmed the assertions, leaving broken until can be fixed (not sure what’s wrong)
@benjchristensen
Copy link
Member Author

/cc @abersnaze @akarnokd @headinthebox

@akarnokd
Copy link
Member

If I factor out the subjects toObservable calls to be the same, the test passes:

Observable<Integer> xso = xs.toObservable();
Observable<Integer> yso = ys.toObservable();
Observable<Integer> zso = zs.toObservable();

Observable<Integer> m = Observable.when(
      xso.and(yso).then(add2), // 1+4=5, 2+5=7, 3+6=9
      xso.and(zso).then(mul2), // 1*7=7, 2*8=16, 3*9=27
      yso.and(zso).then(sub2)  // 4-7=-3, 5-8=-3, 6-9=-3
);

Otherwise, it seems you had like 6 different and operations and source numbers were replicated to all of them. This way, there were multiple queues per source and did independent matching.

As for the conditionals, change the SerialSubscription to MultipleAssignmentSubscription in OperationConditionals#L235 and any dependent places.

- now correctly creates only 1 Observable instance for the life of the Subject
- this fixes the OperationJoinsTest
- thanks @akarnokd for pointing out my mistake!
- all rxjava-core unit tests are now passing
@benjchristensen
Copy link
Member Author

@akarnokd Thanks for the pointer to the problem. I have fixed the Subject.toObservable implementations to correctly have only 1 Observable instance for the life of the Subject.

Looking at OperationConditionals ...

@headinthebox
Copy link
Contributor

For Observable.join and Observable.groupJoin you must draw a marble diagram (at least I do) otherwise your brain will melt.


protected Observer(CompositeSubscription cs) {
if (cs == null) {
throw new IllegalArgumentException("The CompositeException can not be null");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/CompositeException/CompositeSubscription

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Viktor, thanks for pointing that out! Fix just pushed.

@benjchristensen
Copy link
Member Author

As for the conditionals, change the SerialSubscription to MultipleAssignmentSubscription in OperationConditionals#L235 and any dependent places.

That indeed fixes it, though I have yet to track down why the unsubscribe on each pass causes things to fail now whereas it didn't before.

@cloudbees-pull-request-builder

RxJava-pull-requests #703 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member Author

Merging so work can continue on master branch.

benjchristensen added a commit that referenced this pull request Jan 26, 2014
@benjchristensen benjchristensen merged commit 86993d3 into ReactiveX:master Jan 26, 2014
@benjchristensen benjchristensen deleted the lift-observer branch January 26, 2014 05:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants