Skip to content

Commit

Permalink
Merge pull request #881 from benjchristensen/lift-performance
Browse files Browse the repository at this point in the history
Lift Performance
  • Loading branch information
benjchristensen committed Feb 17, 2014
2 parents 4f841d9 + 6898396 commit 84372e1
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void onError(Throwable e) {
public void onNext(Integer t) {
}
});
verify(events, times(6)).call(subscribe());
verify(events, atLeast(3)).call(subscribe());
verify(events, times(4)).call(onNext(1));
// one less because it originates from the inner observable sent to merge
verify(events, times(3)).call(onNext(2));
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ public static interface OnSubscribeFunc<T> extends Function {
* @return an Observable that emits values that are the result of applying the bind function to the values
* of the current Observable
*/
public <R> Observable<R> lift(final Operator<? extends R, ? super T> bind) {
public <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
subscribe(hook.onLift(bind).call(o));
f.call(hook.onLift(lift).call(o));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package rx.composition;

import rx.Observable;
import rx.perf.AbstractPerformanceTester;
import rx.perf.IntegerSumObserver;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

public class RangeMapTakeOnNextPerf extends AbstractPerformanceTester {

final static int NUM = 10;
final static long REPS = REPETITIONS / NUM;

RangeMapTakeOnNextPerf() {
super(REPS);
}

public static void main(String args[]) {

final RangeMapTakeOnNextPerf spt = new RangeMapTakeOnNextPerf();
try {
spt.runTest(new Action0() {

@Override
public void call() {
spt.test();
}
});
} catch (Exception e) {
e.printStackTrace();
}

}

/**
*
* With 'lift' calling the `f` function directly:
*
* Run: 10 - 11,152,996 ops/sec
* Run: 11 - 9,791,825 ops/sec
* Run: 12 - 10,080,035 ops/sec
* Run: 13 - 10,189,525 ops/sec
* Run: 14 - 10,145,486 ops/sec
*
* With `lift` calling `subscribe`:
*
* Run: 10 - 5,592,153 ops/sec
* Run: 11 - 5,881,799 ops/sec
* Run: 12 - 5,853,430 ops/sec
* Run: 13 - 5,902,769 ops/sec
* Run: 14 - 5,907,721 ops/sec
*/
public long test() {

Observable<Integer> s = Observable.range(0, 100).map(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer l) {
return l + 1;
}

}).take(NUM);
IntegerSumObserver o = new IntegerSumObserver();

for (long l = 0; l < REPS; l++) {
s.subscribe(o);
}
return o.sum;
}

}

0 comments on commit 84372e1

Please sign in to comment.