From a18b8c1a572b7b9509b7a7fe1a5075ce93657771 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 6 Jan 2014 18:05:49 -0800 Subject: [PATCH] ToObservableIterable Recursion/Loop - the ImmediateScheduler no longer schedules itself but uses a loop - 10-20x faster to use a loop rather than schedule itself recursively --- .../OperationToObservableIterable.java | 34 ++++- .../src/test/java/rx/ObserveOnTests.java | 133 ------------------ .../rx/operators/OperationObserveOnTest.java | 104 ++++++++++++++ .../OperationToObservableIterableTest.java | 15 ++ .../schedulers/SchedulerPerformanceTests.java | 26 ++-- 5 files changed, 162 insertions(+), 150 deletions(-) delete mode 100644 rxjava-core/src/test/java/rx/ObserveOnTests.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java index 7fb5f0ec91..4518715908 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java @@ -19,6 +19,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.schedulers.ImmediateScheduler; import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; @@ -37,16 +38,20 @@ public final class OperationToObservableIterable { public static OnSubscribeFunc toObservableIterable(Iterable list, Scheduler scheduler) { - return new ToObservableIterable(list, scheduler); + if (scheduler instanceof ImmediateScheduler) { + return new ToObservableIterable(list); + } else { + return new ToObservableIterableScheduled(list, scheduler); + } } public static OnSubscribeFunc toObservableIterable(Iterable list) { - return toObservableIterable(list, Schedulers.immediate()); + return new ToObservableIterable(list); } - private static class ToObservableIterable implements OnSubscribeFunc { + private static class ToObservableIterableScheduled implements OnSubscribeFunc { - public ToObservableIterable(Iterable list, Scheduler scheduler) { + public ToObservableIterableScheduled(Iterable list, Scheduler scheduler) { this.iterable = list; this.scheduler = scheduler; } @@ -74,4 +79,25 @@ public void call(Action0 self) { }); } } + + private static class ToObservableIterable implements OnSubscribeFunc { + + public ToObservableIterable(Iterable list) { + this.iterable = list; + } + + final Iterable iterable; + + public Subscription onSubscribe(final Observer observer) { + try { + for (T t : iterable) { + observer.onNext(t); + } + observer.onCompleted(); + } catch (Exception e) { + observer.onError(e); + } + return Subscriptions.empty(); + } + } } diff --git a/rxjava-core/src/test/java/rx/ObserveOnTests.java b/rxjava-core/src/test/java/rx/ObserveOnTests.java deleted file mode 100644 index 72c1f1343d..0000000000 --- a/rxjava-core/src/test/java/rx/ObserveOnTests.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx; - -import static org.junit.Assert.*; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import rx.schedulers.Schedulers; -import rx.util.functions.Action1; -import rx.util.functions.Func1; - -public class ObserveOnTests { - - /** - * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream - */ - @Test - public void testObserveOnWithNewThreadScheduler() { - final AtomicInteger count = new AtomicInteger(); - final int _multiple = 99; - - Observable.range(1, 1000).map(new Func1() { - - @Override - public Integer call(Integer t1) { - return t1 * _multiple; - } - - }).observeOn(Schedulers.newThread()) - .toBlockingObservable().forEach(new Action1() { - - @Override - public void call(Integer t1) { - assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); - assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler")); - } - - }); - } - - /** - * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered. - */ - @Test - public void testObserveOnWithThreadPoolScheduler() { - final AtomicInteger count = new AtomicInteger(); - final int _multiple = 99; - - Observable.range(1, 1000).map(new Func1() { - - @Override - public Integer call(Integer t1) { - return t1 * _multiple; - } - - }).observeOn(Schedulers.threadPoolForComputation()) - .toBlockingObservable().forEach(new Action1() { - - @Override - public void call(Integer t1) { - assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - } - - }); - } - - /** - * Attempts to confirm that when pauses exist between events, the ScheduledObserver - * does not lose or reorder any events since the scheduler will not block, but will - * be re-scheduled when it receives new events after each pause. - * - * - * This is non-deterministic in proving success, but if it ever fails (non-deterministically) - * it is a sign of potential issues as thread-races and scheduling should not affect output. - */ - @Test - public void testObserveOnOrderingConcurrency() { - final AtomicInteger count = new AtomicInteger(); - final int _multiple = 99; - - Observable.range(1, 1000).map(new Func1() { - - @Override - public Integer call(Integer t1) { - if (randomIntFrom0to100() > 98) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - return t1 * _multiple; - } - - }).observeOn(Schedulers.threadPoolForComputation()) - .toBlockingObservable().forEach(new Action1() { - - @Override - public void call(Integer t1) { - assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); - assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); - } - - }); - } - - private static int randomIntFrom0to100() { - // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml - long x = System.nanoTime(); - x ^= (x << 21); - x ^= (x >>> 35); - x ^= (x << 4); - return Math.abs((int) x % 100); - } - -} diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java index 626d6409d7..dfc14750c8 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.mockito.InOrder; @@ -34,6 +35,7 @@ import rx.schedulers.TestScheduler; import rx.util.functions.Action0; import rx.util.functions.Action1; +import rx.util.functions.Func1; public class OperationObserveOnTest { @@ -210,6 +212,108 @@ public void observeSameOnMultipleSchedulers() { inOrder2.verify(observer2, times(1)).onCompleted(); verify(observer2, never()).onError(any(Throwable.class)); inOrder2.verifyNoMoreInteractions(); + } + + /** + * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream + */ + @Test + public void testObserveOnWithNewThreadScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).observeOn(Schedulers.newThread()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler")); + } + + }); + } + + /** + * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered. + */ + @Test + public void testObserveOnWithThreadPoolScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).observeOn(Schedulers.computation()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + + /** + * Attempts to confirm that when pauses exist between events, the ScheduledObserver + * does not lose or reorder any events since the scheduler will not block, but will + * be re-scheduled when it receives new events after each pause. + * + * + * This is non-deterministic in proving success, but if it ever fails (non-deterministically) + * it is a sign of potential issues as thread-races and scheduling should not affect output. + */ + @Test + public void testObserveOnOrderingConcurrency() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 10000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + if (randomIntFrom0to100() > 98) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return t1 * _multiple; + } + + }).observeOn(Schedulers.computation()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + private static int randomIntFrom0to100() { + // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml + long x = System.nanoTime(); + x ^= (x << 21); + x ^= (x >>> 35); + x ^= (x << 4); + return Math.abs((int) x % 100); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java b/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java index 8d8be93dcb..16ad47f372 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationToObservableIterableTest.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Observer; +import rx.schedulers.Schedulers; public class OperationToObservableIterableTest { @@ -42,4 +43,18 @@ public void testIterable() { verify(aObserver, Mockito.never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } + + @Test + public void testIterableScheduled() { + Observable observable = Observable.create(toObservableIterable(Arrays. asList("one", "two", "three"), Schedulers.currentThread())); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } } diff --git a/rxjava-core/src/test/java/rx/schedulers/SchedulerPerformanceTests.java b/rxjava-core/src/test/java/rx/schedulers/SchedulerPerformanceTests.java index 1f166dbfa6..986ef75c34 100644 --- a/rxjava-core/src/test/java/rx/schedulers/SchedulerPerformanceTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/SchedulerPerformanceTests.java @@ -35,12 +35,12 @@ public static void main(String args[]) { @Override public void call() { - // spt.singleResponse(Schedulers.immediate()); + spt.singleResponse(Schedulers.immediate()); // spt.singleResponse(Schedulers.currentThread()); // spt.singleResponse(Schedulers.threadPoolForComputation()); - spt.arrayResponse(Schedulers.immediate()); - // spt.arrayResponse(Schedulers.currentThread()); + // spt.arrayResponse(Schedulers.immediate()); + // spt.arrayResponse(Schedulers.currentThread()); // spt.arrayResponse(Schedulers.threadPoolForComputation()); } }); @@ -92,11 +92,11 @@ public long baseline() { * * --- Schedulers.immediate() --- * - * Run: 10 - 4,113,672 ops/sec - * Run: 11 - 4,068,351 ops/sec - * Run: 12 - 4,070,318 ops/sec - * Run: 13 - 4,161,793 ops/sec - * Run: 14 - 4,156,725 ops/sec + * Run: 10 - 14,973,870 ops/sec + * Run: 11 - 15,345,142 ops/sec + * Run: 12 - 14,962,533 ops/sec + * Run: 13 - 14,793,030 ops/sec + * Run: 14 - 15,177,685 ops/sec * * --- Schedulers.currentThread() --- * @@ -127,11 +127,11 @@ public long singleResponse(Scheduler scheduler) { * * --- Schedulers.immediate() --- * - * Run: 0 - 1,849,947 ops/sec - * Run: 1 - 2,076,067 ops/sec - * Run: 2 - 2,114,688 ops/sec - * Run: 3 - 2,114,301 ops/sec - * Run: 4 - 2,102,543 ops/sec + * Run: 10 - 9,805,017 ops/sec + * Run: 11 - 9,880,427 ops/sec + * Run: 12 - 9,615,809 ops/sec + * Run: 13 - 10,920,297 ops/sec + * Run: 14 - 10,822,721 ops/sec * * --- Schedulers.currentThread() --- *