diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala deleted file mode 100644 index 0ffe30b2a2..0000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ExecutorScheduler.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.lang.scala.schedulers - -import java.util.concurrent.{ScheduledExecutorService, Executor} -import rx.lang.scala.Scheduler - -object ExecutorScheduler { - - /** - * Returns a [[rx.lang.scala.Scheduler]] that queues work on an `java.util.concurrent.Executor`. - * - * Note that this does not support scheduled actions with a delay. - */ - def apply(executor: Executor): ExecutorScheduler = { - new ExecutorScheduler(rx.schedulers.Schedulers.executor(executor)) - } - - def apply(executor: ScheduledExecutorService): ExecutorScheduler = { - new ExecutorScheduler(rx.schedulers.Schedulers.executor(executor)) - } -} - -class ExecutorScheduler private[scala] (val asJavaScheduler: rx.Scheduler) - extends Scheduler {} - - - diff --git a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java index f2d6091c96..8eddc63820 100644 --- a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationDeferFutureTest.java @@ -58,7 +58,7 @@ public Observable call() throws Exception { } }; - Observable result = Async.deferFuture(func, Schedulers.threadPoolForComputation()); + Observable result = Async.deferFuture(func, Schedulers.computation()); final Observer observer = mock(Observer.class); diff --git a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java index 8fd6e4c91e..75e6ff074a 100644 --- a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationForEachFutureTest.java @@ -54,7 +54,7 @@ public void testSimple() { try { Observable source = Observable.from(1, 2, 3) - .subscribeOn(Schedulers.threadPoolForComputation()); + .subscribeOn(Schedulers.computation()); final AtomicInteger sum = new AtomicInteger(); Action1 add = new Action1() { @@ -93,7 +93,7 @@ public void testSimpleThrowing() { try { Observable source = Observable.error(new CustomException()) - .subscribeOn(Schedulers.threadPoolForComputation()); + .subscribeOn(Schedulers.computation()); final AtomicInteger sum = new AtomicInteger(); Action1 add = new Action1() { @@ -128,7 +128,7 @@ public void call(Integer t1) { @Test public void testSimpleScheduled() { Observable source = Observable.from(1, 2, 3) - .subscribeOn(Schedulers.threadPoolForComputation()); + .subscribeOn(Schedulers.computation()); final AtomicInteger sum = new AtomicInteger(); Action1 add = new Action1() { @@ -158,7 +158,7 @@ public void call(Integer t1) { public void testSimpleScheduledThrowing() { Observable source = Observable.error(new CustomException()) - .subscribeOn(Schedulers.threadPoolForComputation()); + .subscribeOn(Schedulers.computation()); final AtomicInteger sum = new AtomicInteger(); Action1 add = new Action1() { diff --git a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java index babee1c7ac..42856ba2d7 100644 --- a/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java +++ b/rxjava-contrib/rxjava-async-util/src/test/java/rx/util/async/operators/OperationStartFutureTest.java @@ -104,7 +104,7 @@ public Integer call() throws Exception { } }; - Observable result = Async.startFuture(func, Schedulers.threadPoolForComputation()); + Observable result = Async.startFuture(func, Schedulers.computation()); final Observer observer = mock(Observer.class); diff --git a/rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java b/rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java index 7c49a28cf9..18530022cd 100644 --- a/rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java +++ b/rxjava-contrib/rxjava-computation-expressions/src/test/java/rx/operators/OperationConditionalsTest.java @@ -442,7 +442,7 @@ public void testWhileDoZeroTimes() { @Test public void testWhileDoManyTimes() { - Observable source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread()); + Observable source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.trampoline()); List expected = new ArrayList(numRecursion * 3); for (int i = 0; i < numRecursion; i++) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index eaad7ca545..9ce6774d60 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -197,7 +197,7 @@ public Subscription onSubscribe(final Observer> observer) { * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(Observable source, long timespan, TimeUnit unit) { - return buffer(source, timespan, unit, Schedulers.threadPoolForComputation()); + return buffer(source, timespan, unit, Schedulers.computation()); } /** @@ -259,7 +259,7 @@ public Subscription onSubscribe(final Observer> observer) { * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(Observable source, long timespan, TimeUnit unit, int count) { - return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation()); + return buffer(source, timespan, unit, count, Schedulers.computation()); } /** @@ -325,7 +325,7 @@ public Subscription onSubscribe(final Observer> observer) { * the {@link Func1} object representing the specified buffer operation */ public static OnSubscribeFunc> buffer(Observable source, long timespan, long timeshift, TimeUnit unit) { - return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation()); + return buffer(source, timespan, timeshift, unit, Schedulers.computation()); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationWindow.java b/rxjava-core/src/main/java/rx/operators/OperationWindow.java index 8280236a54..054be1f29b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWindow.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWindow.java @@ -195,7 +195,7 @@ public Subscription onSubscribe(final Observer> observer) * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(Observable source, long timespan, TimeUnit unit) { - return window(source, timespan, unit, Schedulers.threadPoolForComputation()); + return window(source, timespan, unit, Schedulers.computation()); } /** @@ -255,7 +255,7 @@ public Subscription onSubscribe(final Observer> observer) * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(Observable source, long timespan, TimeUnit unit, int count) { - return window(source, timespan, unit, count, Schedulers.threadPoolForComputation()); + return window(source, timespan, unit, count, Schedulers.computation()); } /** @@ -318,7 +318,7 @@ public Subscription onSubscribe(final Observer> observer) * the {@link rx.functions.Func1} object representing the specified window operation */ public static OnSubscribeFunc> window(Observable source, long timespan, long timeshift, TimeUnit unit) { - return window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation()); + return window(source, timespan, timeshift, unit, Schedulers.computation()); } /** diff --git a/rxjava-core/src/main/java/rx/schedulers/ComputationScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ComputationScheduler.java new file mode 100644 index 0000000000..27214c6e0f --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/ComputationScheduler.java @@ -0,0 +1,109 @@ +package rx.schedulers; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Scheduler; +import rx.Subscription; +import rx.functions.Action0; +import rx.schedulers.NewThreadScheduler.OnActionComplete; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; + +/* package */class ComputationScheduler extends Scheduler { + + private static class ComputationSchedulerPool { + final int cores = Runtime.getRuntime().availableProcessors(); + final ThreadFactory factory = new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }; + + final EventLoopScheduler[] eventLoops; + + ComputationSchedulerPool() { + // initialize event loops + eventLoops = new EventLoopScheduler[cores]; + for (int i = 0; i < cores; i++) { + eventLoops[i] = new EventLoopScheduler(factory); + } + } + + private static ComputationSchedulerPool INSTANCE = new ComputationSchedulerPool(); + + long n = 0; + + public EventLoopScheduler getEventLoop() { + // round-robin selection (improvements to come) + return eventLoops[(int) (n++ % cores)]; + } + + } + + @Override + public Inner createInner() { + return new EventLoop(); + } + + private static class EventLoop extends Scheduler.Inner { + private final CompositeSubscription innerSubscription = new CompositeSubscription(); + private final EventLoopScheduler pooledEventLoop; + private final OnActionComplete onComplete; + + EventLoop() { + pooledEventLoop = ComputationSchedulerPool.INSTANCE.getEventLoop(); + onComplete = new OnActionComplete() { + + @Override + public void complete(Subscription s) { + innerSubscription.remove(s); + } + + }; + } + + @Override + public void unsubscribe() { + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + + @Override + public Subscription schedule(Action0 action) { + if (innerSubscription.isUnsubscribed()) { + // don't schedule, we are unsubscribed + return Subscriptions.empty(); + } + return pooledEventLoop.schedule(action, onComplete); + } + + @Override + public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + if (innerSubscription.isUnsubscribed()) { + // don't schedule, we are unsubscribed + return Subscriptions.empty(); + } + + return pooledEventLoop.schedule(action, delayTime, unit, onComplete); + } + + } + + private static class EventLoopScheduler extends NewThreadScheduler.EventLoopScheduler { + EventLoopScheduler(ThreadFactory threadFactory) { + super(threadFactory); + } + } + +} diff --git a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java deleted file mode 100644 index abf29a8684..0000000000 --- a/rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.schedulers; - -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import rx.Scheduler; -import rx.Subscription; -import rx.functions.Action0; -import rx.subscriptions.MultipleAssignmentSubscription; -import rx.subscriptions.Subscriptions; - -/** - * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. - *

- * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a - * system-wide Timer will be used to handle delayed events. - */ -public class ExecutorScheduler extends Scheduler { - private final Executor executor; - - /** - * @deprecated Use Schedulers.executor(); - */ - @Deprecated - public ExecutorScheduler(Executor executor) { - this.executor = executor; - } - - /** - * @deprecated Use Schedulers.executor(); - */ - @Deprecated - public ExecutorScheduler(ScheduledExecutorService executor) { - this.executor = executor; - } - - @Override - public Inner createInner() { - return new InnerExecutorScheduler(); - } - - private class InnerExecutorScheduler extends Scheduler.Inner { - - private final MultipleAssignmentSubscription innerSubscription = new MultipleAssignmentSubscription(); - - @Override - public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { - if (innerSubscription.isUnsubscribed()) { - // don't schedule, we are unsubscribed - return Subscriptions.empty(); - } - - if (executor instanceof ScheduledExecutorService) { - // we are a ScheduledExecutorService so can do proper scheduling - ScheduledFuture f = ((ScheduledExecutorService) executor).schedule(new Runnable() { - @Override - public void run() { - if (innerSubscription.isUnsubscribed()) { - // don't execute if unsubscribed - return; - } - // when the delay has passed we now do the work on the actual scheduler - action.call(); - } - }, delayTime, unit); - // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - Subscription s = Subscriptions.from(f); - innerSubscription.set(s); - return s; - } else { - // we are not a ScheduledExecutorService so can't directly schedule - if (delayTime == 0) { - // no delay so put on the thread-pool right now - return schedule(action); - } else { - // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService - // to handle the scheduling and once it's ready then execute on this Executor - ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { - - @Override - public void run() { - if (innerSubscription.isUnsubscribed()) { - // don't execute if unsubscribed - return; - } - // now execute on the real Executor (by using the other overload that schedules for immediate execution) - schedule(action); - } - }, delayTime, unit); - // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens - Subscription s = Subscriptions.from(f); - innerSubscription.set(s); - return s; - } - } - } - - @Override - public Subscription schedule(final Action0 action) { - if (innerSubscription.isUnsubscribed()) { - // don't schedule, we are unsubscribed - return Subscriptions.empty(); - } - - // work to be done on a thread - Runnable r = new Runnable() { - @Override - public void run() { - if (innerSubscription.isUnsubscribed()) { - // don't execute if unsubscribed - return; - } - action.call(); - } - }; - - // submit for immediate execution - if (executor instanceof ExecutorService) { - // we are an ExecutorService so get a Future back that supports unsubscribe - Future f = ((ExecutorService) executor).submit(r); - // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens - Subscription s = Subscriptions.from(f); - innerSubscription.set(s); - return s; - } else { - // we are the lowest common denominator so can't unsubscribe once we execute - executor.execute(r); - return Subscriptions.empty(); - } - } - - @Override - public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { - if (executor instanceof ScheduledExecutorService) { - ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - if (isUnsubscribed()) { - // don't execute if unsubscribed - return; - } - action.call(); - } - }, initialDelay, period, unit); - - Subscription s = Subscriptions.from(f); - innerSubscription.set(s); - return s; - } else { - return super.schedulePeriodically(action, initialDelay, period, unit); - } - } - - @Override - public void unsubscribe() { - innerSubscription.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return innerSubscription.isUnsubscribed(); - } - - } - -} diff --git a/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java b/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java index f9b3bbe402..5b19b40ba5 100644 --- a/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java +++ b/rxjava-core/src/main/java/rx/schedulers/GenericScheduledExecutorService.java @@ -40,7 +40,7 @@ private GenericScheduledExecutorService() { int count = Runtime.getRuntime().availableProcessors(); - if (count > 8) { + if (count > 4) { count = count / 2; } // we don't need more than 8 to handle just scheduling and doing no work diff --git a/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java index bff77eb0b8..e74296fe81 100644 --- a/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/ImmediateScheduler.java @@ -29,15 +29,6 @@ public final class ImmediateScheduler extends Scheduler { private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); - /** - * @deprecated Use Schedulers.immediate(); - * @return - */ - @Deprecated - public static ImmediateScheduler getInstance() { - return INSTANCE; - } - /* package */static ImmediateScheduler instance() { return INSTANCE; } diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index 0aa112df4d..cebbdba64a 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -46,15 +46,6 @@ public Thread newThread(Runnable r) { } }; - /** - * @deprecated Use Schedulers.newThread(); - * @return - */ - @Deprecated - public static NewThreadScheduler getInstance() { - return INSTANCE; - } - /* package */static NewThreadScheduler instance() { return INSTANCE; } @@ -65,19 +56,23 @@ private NewThreadScheduler() { @Override public Inner createInner() { - return new EventLoopScheduler(); + return new EventLoopScheduler(THREAD_FACTORY); } - private class EventLoopScheduler extends Scheduler.Inner implements Subscription { + /* package */static class EventLoopScheduler extends Scheduler.Inner implements Subscription { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final ExecutorService executor; - private EventLoopScheduler() { - executor = Executors.newSingleThreadExecutor(THREAD_FACTORY); + /* package */EventLoopScheduler(ThreadFactory threadFactory) { + executor = Executors.newSingleThreadExecutor(threadFactory); } @Override public Subscription schedule(final Action0 action) { + return schedule(action, null); + } + + /* package */Subscription schedule(final Action0 action, final OnActionComplete onComplete) { if (innerSubscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.empty(); @@ -99,6 +94,9 @@ public void run() { if (s != null) { innerSubscription.remove(s); } + if (onComplete != null) { + onComplete.complete(s); + } } } })); @@ -110,6 +108,10 @@ public void run() { @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { + return schedule(action, delayTime, unit, null); + } + + /* package */Subscription schedule(final Action0 action, long delayTime, TimeUnit unit, final OnActionComplete onComplete) { final AtomicReference sf = new AtomicReference(); // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep // we will instead schedule the event then launch the thread after the delay has passed @@ -129,6 +131,9 @@ public void run() { if (s != null) { innerSubscription.remove(s); } + if (onComplete != null) { + onComplete.complete(s); + } } } }, delayTime, unit); @@ -152,4 +157,10 @@ public boolean isUnsubscribed() { } + /* package */static interface OnActionComplete { + + public void complete(Subscription s); + + } + } diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index 7aa8cc0e55..74f033c1cf 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -16,11 +16,6 @@ package rx.schedulers; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; import rx.plugins.RxJavaPlugins; @@ -41,14 +36,14 @@ private Schedulers() { if (c != null) { computationScheduler = c; } else { - computationScheduler = executor(createComputationExecutor()); + computationScheduler = new ComputationScheduler(); } Scheduler io = RxJavaPlugins.getInstance().getDefaultSchedulers().getIOScheduler(); if (io != null) { ioScheduler = io; } else { - ioScheduler = executor(createIOExecutor()); + ioScheduler = NewThreadScheduler.instance(); // defaults to new thread } Scheduler nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadScheduler(); @@ -57,7 +52,6 @@ private Schedulers() { } else { newThreadScheduler = NewThreadScheduler.instance(); } - } /** @@ -69,17 +63,6 @@ public static Scheduler immediate() { return ImmediateScheduler.instance(); } - /** - * {@link Scheduler} that queues work on the current thread to be executed after the current work completes. - * - * @return {@link TrampolineScheduler} instance - * @deprecated use {@link #trampoline()} instead - */ - @Deprecated - public static Scheduler currentThread() { - return TrampolineScheduler.instance(); - } - /** * {@link Scheduler} that queues work on the current thread to be executed after the current work completes. * @@ -98,44 +81,6 @@ public static Scheduler newThread() { return INSTANCE.newThreadScheduler; } - /** - * {@link Scheduler} that queues work on an {@link Executor}. - *

- * Note that this does not support scheduled actions with a delay. - * - * @return {@link ExecutorScheduler} instance - */ - public static Scheduler executor(Executor executor) { - return new ExecutorScheduler(executor); - } - - /** - * {@link Scheduler} that queues work on an {@link ScheduledExecutorService}. - * - * @return {@link ExecutorScheduler} instance - */ - public static Scheduler executor(ScheduledExecutorService executor) { - return new ExecutorScheduler(executor); - } - - /** - * {@link Scheduler} intended for computational work. - *

- * The implementation is backed by a {@link ScheduledExecutorService} thread-pool sized to the number of CPU - * cores. - *

- * This can be used for event-loops, processing callbacks and other computational work. - *

- * Do not perform IO-bound work on this scheduler. Use {@link #io()} instead. - * - * @return {@link ExecutorScheduler} for computation-bound work - * @deprecated use {@link #computation()} - */ - @Deprecated - public static Scheduler threadPoolForComputation() { - return computation(); - } - /** * {@link Scheduler} intended for computational work. *

@@ -149,23 +94,6 @@ public static Scheduler computation() { return INSTANCE.computationScheduler; } - /** - * {@link Scheduler} intended for IO-bound work. - *

- * The implementation is backed by an {@link Executor} thread-pool that will grow as needed. - *

- * This can be used for asynchronously performing blocking IO. - *

- * Do not perform computational work on this scheduler. Use {@link #computation()} instead. - * - * @return {@link ExecutorScheduler} for IO-bound work - * @deprecated use {@link #io()} instead - */ - @Deprecated - public static Scheduler threadPoolForIO() { - return io(); - } - /** * {@link Scheduler} intended for IO-bound work. *

@@ -181,35 +109,6 @@ public static Scheduler io() { return INSTANCE.ioScheduler; } - private static ScheduledExecutorService createComputationExecutor() { - int cores = Runtime.getRuntime().availableProcessors(); - return Executors.newScheduledThreadPool(cores, new ThreadFactory() { - final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - }); - } - - private static Executor createIOExecutor() { - Executor result = Executors.newCachedThreadPool(new ThreadFactory() { - final AtomicLong counter = new AtomicLong(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - }); - - return result; - } - public static TestScheduler test() { return new TestScheduler(); } diff --git a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java index c915d7a481..af4cea9801 100644 --- a/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/TrampolineScheduler.java @@ -31,15 +31,6 @@ public class TrampolineScheduler extends Scheduler { private static final TrampolineScheduler INSTANCE = new TrampolineScheduler(); - /** - * @deprecated Use Schedulers.trampoline(); - * @return - */ - @Deprecated - public static TrampolineScheduler getInstance() { - return INSTANCE; - } - /* package */static TrampolineScheduler instance() { return INSTANCE; } diff --git a/rxjava-core/src/perf/java/rx/archive/schedulers/SchedulerPerformanceTests.java b/rxjava-core/src/perf/java/rx/archive/schedulers/SchedulerPerformanceTests.java index 392dc50283..4df996d998 100644 --- a/rxjava-core/src/perf/java/rx/archive/schedulers/SchedulerPerformanceTests.java +++ b/rxjava-core/src/perf/java/rx/archive/schedulers/SchedulerPerformanceTests.java @@ -36,7 +36,7 @@ public static void main(String args[]) { @Override public void call() { - spt.singleResponse(Schedulers.immediate()); + spt.singleResponse(Schedulers.computation()); // spt.singleResponse(Schedulers.currentThread()); // spt.singleResponse(Schedulers.computation()); @@ -109,8 +109,11 @@ public long baseline() { * * --- Schedulers.computation() --- * - * Run: 0 - 224,004 ops/sec - * Run: 1 - 227,101 ops/sec + * Run: 10 - 12,616,099 ops/sec + * Run: 11 - 12,661,625 ops/sec + * Run: 12 - 12,775,536 ops/sec + * Run: 13 - 12,711,358 ops/sec + * Run: 14 - 12,815,452 ops/sec * */ public long singleResponse(Scheduler scheduler) { diff --git a/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java b/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java index 4e1e51f432..6335dc2109 100644 --- a/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java +++ b/rxjava-core/src/perf/java/rx/archive/schedulers/TestRecursionMemoryUsage.java @@ -30,52 +30,13 @@ public class TestRecursionMemoryUsage { public static void main(String args[]) { - usingFunc2(Schedulers.newThread()); - usingAction0(Schedulers.newThread()); - - usingFunc2(Schedulers.immediate()); - usingAction0(Schedulers.immediate()); - - usingFunc2(Schedulers.computation()); - usingAction0(Schedulers.computation()); + testScheduler(Schedulers.newThread()); + testScheduler(Schedulers.computation()); System.exit(0); } - protected static void usingFunc2(final Scheduler scheduler) { - System.out.println("************ usingFunc2: " + scheduler); - Observable.create(new OnSubscribe() { - - @Override - public void call(final Subscriber o) { - final Inner inner = scheduler.createInner(); - o.add(inner); - inner.schedule(new Action0() { - long i = 0; - - @Override - public void call() { - i++; - if (i % 500000 == 0) { - System.out.println(i + " Total Memory: " - + Runtime.getRuntime().totalMemory() - + " Free: " - + Runtime.getRuntime().freeMemory()); - o.onNext(i); - } - if (i == 100000000L) { - o.onCompleted(); - return; - } - - inner.schedule(this); - } - }); - } - }).toBlockingObservable().last(); - } - - protected static void usingAction0(final Scheduler scheduler) { + protected static void testScheduler(final Scheduler scheduler) { System.out.println("************ usingAction0: " + scheduler); Observable.create(new OnSubscribe() { diff --git a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java index 988f5bc0c0..480b02fdf7 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationWindowTest.java @@ -99,7 +99,7 @@ public void testSkipAndCountGaplessWindows() { @Test public void testOverlappingWindows() { - Observable subject = Observable.from(new String[] { "zero", "one", "two", "three", "four", "five" }, Schedulers.currentThread()); + Observable subject = Observable.from(new String[] { "zero", "one", "two", "three", "four", "five" }, Schedulers.trampoline()); Observable> windowed = Observable.create(window(subject, 3, 1)); List> windows = toLists(windowed); diff --git a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java b/rxjava-core/src/test/java/rx/schedulers/ComputationSchedulerTests.java similarity index 83% rename from rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java rename to rxjava-core/src/test/java/rx/schedulers/ComputationSchedulerTests.java index 32dae60435..ef8644950b 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ExecutorSchedulerTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/ComputationSchedulerTests.java @@ -30,7 +30,7 @@ import rx.functions.Action1; import rx.functions.Func1; -public class ExecutorSchedulerTests extends AbstractSchedulerConcurrencyTests { +public class ComputationSchedulerTests extends AbstractSchedulerConcurrencyTests { @Override protected Scheduler getScheduler() { @@ -44,9 +44,9 @@ public void testThreadSafetyWhenSchedulerIsHoppingBetweenThreads() { final int NUM = 1000000; final CountDownLatch latch = new CountDownLatch(1); final HashMap map = new HashMap(); - + final Scheduler.Inner inner = Schedulers.computation().createInner(); - + inner.schedule(new Action0() { private HashMap statefulMap = map; @@ -113,28 +113,6 @@ public void call(String t) { }); } - @Test - public final void testIOThreadPool1() { - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().startsWith("RxIOThreadPool")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.subscribeOn(Schedulers.threadPoolForIO()).toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } @Test public final void testMergeWithExecutorScheduler() { diff --git a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java index 622f190798..36cae71fee 100644 --- a/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/ImmediateSchedulerTest.java @@ -28,7 +28,7 @@ public class ImmediateSchedulerTest extends AbstractSchedulerTests { @Override protected Scheduler getScheduler() { - return ImmediateScheduler.getInstance(); + return Schedulers.immediate(); } @Override diff --git a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java index a05e60925d..2bc2f2088d 100644 --- a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java @@ -16,13 +16,46 @@ package rx.schedulers; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +import rx.Observable; import rx.Scheduler; +import rx.functions.Action1; +import rx.functions.Func1; public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @Override protected Scheduler getScheduler() { - return NewThreadScheduler.getInstance(); + return Schedulers.newThread(); + } + + /** + * IO scheduler defaults to using NewThreadScheduler + */ + @Test + public final void testIOScheduler() { + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.io()).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); } } diff --git a/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java index e936e8caef..0add9db59d 100644 --- a/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/TrampolineSchedulerTest.java @@ -28,7 +28,7 @@ public class TrampolineSchedulerTest extends AbstractSchedulerTests { @Override protected Scheduler getScheduler() { - return TrampolineScheduler.getInstance(); + return Schedulers.trampoline(); } @Test @@ -38,7 +38,7 @@ public final void testMergeWithCurrentThreadScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.trampoline()).map(new Func1() { @Override public String call(Integer t) {