Skip to content

Commit

Permalink
ToObservableIterable Recursion/Loop
Browse files Browse the repository at this point in the history
- the ImmediateScheduler no longer schedules itself but uses a loop
- 10-20x faster to use a loop rather than schedule itself recursively
  • Loading branch information
benjchristensen committed Jan 7, 2014
1 parent 2e53b67 commit a18b8c1
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
Expand All @@ -37,16 +38,20 @@
public final class OperationToObservableIterable<T> {

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
return new ToObservableIterable<T>(list, scheduler);
if (scheduler instanceof ImmediateScheduler) {
return new ToObservableIterable<T>(list);
} else {
return new ToObservableIterableScheduled<T>(list, scheduler);
}
}

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
return toObservableIterable(list, Schedulers.immediate());
return new ToObservableIterable<T>(list);
}

private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
private static class ToObservableIterableScheduled<T> implements OnSubscribeFunc<T> {

public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
public ToObservableIterableScheduled(Iterable<? extends T> list, Scheduler scheduler) {
this.iterable = list;
this.scheduler = scheduler;
}
Expand Down Expand Up @@ -74,4 +79,25 @@ public void call(Action0 self) {
});
}
}

private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {

public ToObservableIterable(Iterable<? extends T> list) {
this.iterable = list;
}

final Iterable<? extends T> iterable;

public Subscription onSubscribe(final Observer<? super T> observer) {
try {
for (T t : iterable) {
observer.onNext(t);
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
}
}
}
133 changes: 0 additions & 133 deletions rxjava-core/src/test/java/rx/ObserveOnTests.java

This file was deleted.

104 changes: 104 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;
Expand All @@ -34,6 +35,7 @@
import rx.schedulers.TestScheduler;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class OperationObserveOnTest {

Expand Down Expand Up @@ -210,6 +212,108 @@ public void observeSameOnMultipleSchedulers() {
inOrder2.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
inOrder2.verifyNoMoreInteractions();
}

/**
* Confirm that running on a NewThreadScheduler uses the same thread for the entire stream
*/
@Test
public void testObserveOnWithNewThreadScheduler() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;

Observable.range(1, 100000).map(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer t1) {
return t1 * _multiple;
}

}).observeOn(Schedulers.newThread())
.toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler"));
}

});
}

/**
* Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
*/
@Test
public void testObserveOnWithThreadPoolScheduler() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;

Observable.range(1, 100000).map(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer t1) {
return t1 * _multiple;
}

}).observeOn(Schedulers.computation())
.toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
}

});
}

/**
* Attempts to confirm that when pauses exist between events, the ScheduledObserver
* does not lose or reorder any events since the scheduler will not block, but will
* be re-scheduled when it receives new events after each pause.
*
*
* This is non-deterministic in proving success, but if it ever fails (non-deterministically)
* it is a sign of potential issues as thread-races and scheduling should not affect output.
*/
@Test
public void testObserveOnOrderingConcurrency() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;

Observable.range(1, 10000).map(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer t1) {
if (randomIntFrom0to100() > 98) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return t1 * _multiple;
}

}).observeOn(Schedulers.computation())
.toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
}

});
}

private static int randomIntFrom0to100() {
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
long x = System.nanoTime();
x ^= (x << 21);
x ^= (x >>> 35);
x ^= (x << 4);
return Math.abs((int) x % 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import rx.Observable;
import rx.Observer;
import rx.schedulers.Schedulers;

public class OperationToObservableIterableTest {

Expand All @@ -42,4 +43,18 @@ public void testIterable() {
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testIterableScheduled() {
Observable<String> observable = Observable.create(toObservableIterable(Arrays.<String> asList("one", "two", "three"), Schedulers.currentThread()));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, times(1)).onNext("three");
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ public static void main(String args[]) {

@Override
public void call() {
// spt.singleResponse(Schedulers.immediate());
spt.singleResponse(Schedulers.immediate());
// spt.singleResponse(Schedulers.currentThread());
// spt.singleResponse(Schedulers.threadPoolForComputation());

spt.arrayResponse(Schedulers.immediate());
// spt.arrayResponse(Schedulers.currentThread());
// spt.arrayResponse(Schedulers.immediate());
// spt.arrayResponse(Schedulers.currentThread());
// spt.arrayResponse(Schedulers.threadPoolForComputation());
}
});
Expand Down Expand Up @@ -92,11 +92,11 @@ public long baseline() {
*
* --- Schedulers.immediate() ---
*
* Run: 10 - 4,113,672 ops/sec
* Run: 11 - 4,068,351 ops/sec
* Run: 12 - 4,070,318 ops/sec
* Run: 13 - 4,161,793 ops/sec
* Run: 14 - 4,156,725 ops/sec
* Run: 10 - 14,973,870 ops/sec
* Run: 11 - 15,345,142 ops/sec
* Run: 12 - 14,962,533 ops/sec
* Run: 13 - 14,793,030 ops/sec
* Run: 14 - 15,177,685 ops/sec
*
* --- Schedulers.currentThread() ---
*
Expand Down Expand Up @@ -127,11 +127,11 @@ public long singleResponse(Scheduler scheduler) {
*
* --- Schedulers.immediate() ---
*
* Run: 0 - 1,849,947 ops/sec
* Run: 1 - 2,076,067 ops/sec
* Run: 2 - 2,114,688 ops/sec
* Run: 3 - 2,114,301 ops/sec
* Run: 4 - 2,102,543 ops/sec
* Run: 10 - 9,805,017 ops/sec
* Run: 11 - 9,880,427 ops/sec
* Run: 12 - 9,615,809 ops/sec
* Run: 13 - 10,920,297 ops/sec
* Run: 14 - 10,822,721 ops/sec
*
* --- Schedulers.currentThread() ---
*
Expand Down

0 comments on commit a18b8c1

Please sign in to comment.