From 538d245ba9744f57d66724982db4850e6d3ba226 Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Tue, 20 May 2014 22:46:11 +0200 Subject: [PATCH 1/5] Implement a cached thread scheduler using event loops --- .../rx/schedulers/CachedThreadScheduler.java | 180 ++++++++++++++++++ .../main/java/rx/schedulers/Schedulers.java | 6 +- .../schedulers/CachedThreadSchedulerTest.java | 60 ++++++ .../rx/schedulers/NewThreadSchedulerTest.java | 34 ---- 4 files changed, 243 insertions(+), 37 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java create mode 100644 rxjava-core/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java diff --git a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java new file mode 100644 index 0000000000..92dd486d92 --- /dev/null +++ b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -0,0 +1,180 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import rx.Scheduler; +import rx.Subscription; +import rx.functions.Action0; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; + +import java.util.Iterator; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/* package */class CachedThreadScheduler extends Scheduler { + private static final class CachedWorkerPool { + final ThreadFactory factory = new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxCachedThreadScheduler-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }; + + private final long keepAliveTime; + private final ConcurrentLinkedQueue expiringQueue; + private final ScheduledExecutorService evictExpiredWorkerExecutor; + + CachedWorkerPool(long keepAliveTime, TimeUnit unit) { + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.expiringQueue = new ConcurrentLinkedQueue(); + + evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "RxCachedWorkerPoolEvictor-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); + evictExpiredWorkerExecutor.scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + evictExpiredWorkers(); + } + }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS + ); + } + + private static CachedWorkerPool INSTANCE = new CachedWorkerPool( + 60L, TimeUnit.SECONDS + ); + + PoolWorker get() { + while (!expiringQueue.isEmpty()) { + PoolWorker poolWorker = expiringQueue.poll(); + if (poolWorker != null) { + return poolWorker; + } + } + + // No cached worker found, so create a new one. + return new PoolWorker(factory); + } + + void release(PoolWorker poolWorker) { + // Refresh expire time before putting worker back in pool + poolWorker.setExpirationTime(now() + keepAliveTime); + + expiringQueue.add(poolWorker); + } + + void evictExpiredWorkers() { + if (!expiringQueue.isEmpty()) { + long currentTimestamp = now(); + + Iterator poolWorkerIterator = expiringQueue.iterator(); + while (poolWorkerIterator.hasNext()) { + PoolWorker poolWorker = poolWorkerIterator.next(); + if (poolWorker.getExpirationTime() <= currentTimestamp) { + poolWorkerIterator.remove(); + poolWorker.unsubscribe(); + } else { + // Queue is ordered with the worker that will expire first in the beginning, so when we + // find a non-expired worker we can stop evicting. + break; + } + } + } + } + + long now() { + return System.nanoTime(); + } + } + + @Override + public Worker createWorker() { + return new EventLoopWorker(CachedWorkerPool.INSTANCE.get()); + } + + private static class EventLoopWorker extends Scheduler.Worker { + private final CompositeSubscription innerSubscription = new CompositeSubscription(); + private final PoolWorker poolWorker; + private final AtomicBoolean releasePoolWorkerOnce = new AtomicBoolean(false); + + EventLoopWorker(PoolWorker poolWorker) { + this.poolWorker = poolWorker; + } + + @Override + public void unsubscribe() { + if (releasePoolWorkerOnce.compareAndSet(false, true)) { + // unsubscribe should be idempotent, so only do this once + CachedWorkerPool.INSTANCE.release(poolWorker); + } + innerSubscription.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return innerSubscription.isUnsubscribed(); + } + + @Override + public Subscription schedule(Action0 action) { + return schedule(action, 0, null); + } + + @Override + public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + if (innerSubscription.isUnsubscribed()) { + // don't schedule, we are unsubscribed + return Subscriptions.empty(); + } + + NewThreadScheduler.NewThreadWorker.ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit); + innerSubscription.add(s); + s.addParent(innerSubscription); + return s; + } + } + + private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker { + private long expirationTime; + + PoolWorker(ThreadFactory threadFactory) { + super(threadFactory); + this.expirationTime = 0L; + } + + public long getExpirationTime() { + return expirationTime; + } + + public void setExpirationTime(long expirationTime) { + this.expirationTime = expirationTime; + } + } +} diff --git a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java index d7096b7751..53bed75151 100644 --- a/rxjava-core/src/main/java/rx/schedulers/Schedulers.java +++ b/rxjava-core/src/main/java/rx/schedulers/Schedulers.java @@ -15,11 +15,11 @@ */ package rx.schedulers; -import java.util.concurrent.Executor; - import rx.Scheduler; import rx.plugins.RxJavaPlugins; +import java.util.concurrent.Executor; + /** * Static factory methods for creating Schedulers. */ @@ -43,7 +43,7 @@ private Schedulers() { if (io != null) { ioScheduler = io; } else { - ioScheduler = NewThreadScheduler.instance(); // defaults to new thread + ioScheduler = new CachedThreadScheduler(); } Scheduler nt = RxJavaPlugins.getInstance().getDefaultSchedulers().getNewThreadScheduler(); diff --git a/rxjava-core/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java new file mode 100644 index 0000000000..f9f8ca161c --- /dev/null +++ b/rxjava-core/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java @@ -0,0 +1,60 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.schedulers; + +import org.junit.Test; +import rx.Observable; +import rx.Scheduler; +import rx.functions.Action1; +import rx.functions.Func1; + +import static org.junit.Assert.assertTrue; + +public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { + + @Override + protected Scheduler getScheduler() { + return Schedulers.io(); + } + + /** + * IO scheduler defaults to using CachedThreadScheduler + */ + @Test + public final void testIOScheduler() { + + Observable o1 = Observable.from(1, 2, 3, 4, 5); + Observable o2 = Observable.from(6, 7, 8, 9, 10); + Observable o = Observable.merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.io()).toBlocking().forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java index 37b314a0dd..963ee50fa9 100644 --- a/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java +++ b/rxjava-core/src/test/java/rx/schedulers/NewThreadSchedulerTest.java @@ -16,14 +16,7 @@ package rx.schedulers; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -import rx.Observable; import rx.Scheduler; -import rx.functions.Action1; -import rx.functions.Func1; public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -31,31 +24,4 @@ public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { protected Scheduler getScheduler() { return Schedulers.newThread(); } - - /** - * IO scheduler defaults to using NewThreadScheduler - */ - @Test - public final void testIOScheduler() { - - Observable o1 = Observable. from(1, 2, 3, 4, 5); - Observable o2 = Observable. from(6, 7, 8, 9, 10); - Observable o = Observable. merge(o1, o2).map(new Func1() { - - @Override - public String call(Integer t) { - assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler")); - return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); - } - }); - - o.subscribeOn(Schedulers.io()).toBlocking().forEach(new Action1() { - - @Override - public void call(String t) { - System.out.println("t: " + t); - } - }); - } - } From 564ce4a1d26504324df384a4437ddb8107e9786a Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Wed, 28 May 2014 21:33:13 +0200 Subject: [PATCH 2/5] Switch to use NewThreadScheduler.RxThreadFactory --- .../rx/schedulers/CachedThreadScheduler.java | 33 ++++++------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java index 92dd486d92..6e71e1f698 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -24,21 +24,17 @@ import java.util.Iterator; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; /* package */class CachedThreadScheduler extends Scheduler { - private static final class CachedWorkerPool { - final ThreadFactory factory = new ThreadFactory() { - final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxCachedThreadScheduler-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - }; + private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-"; + private static final NewThreadScheduler.RxThreadFactory WORKER_THREAD_FACTORY = + new NewThreadScheduler.RxThreadFactory(WORKER_THREAD_NAME_PREFIX); + + private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-"; + private static final NewThreadScheduler.RxThreadFactory EVICTOR_THREAD_FACTORY = + new NewThreadScheduler.RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX); + private static final class CachedWorkerPool { private final long keepAliveTime; private final ConcurrentLinkedQueue expiringQueue; private final ScheduledExecutorService evictExpiredWorkerExecutor; @@ -47,16 +43,7 @@ public Thread newThread(Runnable r) { this.keepAliveTime = unit.toNanos(keepAliveTime); this.expiringQueue = new ConcurrentLinkedQueue(); - evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { - final AtomicInteger counter = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "RxCachedWorkerPoolEvictor-" + counter.incrementAndGet()); - t.setDaemon(true); - return t; - } - }); + evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); evictExpiredWorkerExecutor.scheduleWithFixedDelay( new Runnable() { @Override @@ -80,7 +67,7 @@ PoolWorker get() { } // No cached worker found, so create a new one. - return new PoolWorker(factory); + return new PoolWorker(WORKER_THREAD_FACTORY); } void release(PoolWorker poolWorker) { From 387c765e345c46a74a403aa2b8e3b7c634864087 Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Wed, 28 May 2014 21:44:47 +0200 Subject: [PATCH 3/5] Switch to AtomicIntegerFieldUpdater and volatile int, instead of AtomicBoolean --- .../main/java/rx/schedulers/CachedThreadScheduler.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java index 6e71e1f698..2feba948fb 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -23,7 +23,7 @@ import java.util.Iterator; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /* package */class CachedThreadScheduler extends Scheduler { private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-"; @@ -109,7 +109,9 @@ public Worker createWorker() { private static class EventLoopWorker extends Scheduler.Worker { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final PoolWorker poolWorker; - private final AtomicBoolean releasePoolWorkerOnce = new AtomicBoolean(false); + volatile int once; + static final AtomicIntegerFieldUpdater ONCE_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once"); EventLoopWorker(PoolWorker poolWorker) { this.poolWorker = poolWorker; @@ -117,7 +119,7 @@ private static class EventLoopWorker extends Scheduler.Worker { @Override public void unsubscribe() { - if (releasePoolWorkerOnce.compareAndSet(false, true)) { + if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { // unsubscribe should be idempotent, so only do this once CachedWorkerPool.INSTANCE.release(poolWorker); } From 992ff293a9dc7efbedaa8290316f0f4e9e030439 Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Wed, 28 May 2014 22:13:29 +0200 Subject: [PATCH 4/5] Better name for worker class running scheduled actions --- .../rx/schedulers/CachedThreadScheduler.java | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java index 2feba948fb..acd5be7740 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -36,12 +36,12 @@ private static final class CachedWorkerPool { private final long keepAliveTime; - private final ConcurrentLinkedQueue expiringQueue; + private final ConcurrentLinkedQueue expiringWorkerQueue; private final ScheduledExecutorService evictExpiredWorkerExecutor; CachedWorkerPool(long keepAliveTime, TimeUnit unit) { this.keepAliveTime = unit.toNanos(keepAliveTime); - this.expiringQueue = new ConcurrentLinkedQueue(); + this.expiringWorkerQueue = new ConcurrentLinkedQueue(); evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); evictExpiredWorkerExecutor.scheduleWithFixedDelay( @@ -58,35 +58,35 @@ public void run() { 60L, TimeUnit.SECONDS ); - PoolWorker get() { - while (!expiringQueue.isEmpty()) { - PoolWorker poolWorker = expiringQueue.poll(); - if (poolWorker != null) { - return poolWorker; + ThreadWorker get() { + while (!expiringWorkerQueue.isEmpty()) { + ThreadWorker threadWorker = expiringWorkerQueue.poll(); + if (threadWorker != null) { + return threadWorker; } } // No cached worker found, so create a new one. - return new PoolWorker(WORKER_THREAD_FACTORY); + return new ThreadWorker(WORKER_THREAD_FACTORY); } - void release(PoolWorker poolWorker) { + void release(ThreadWorker threadWorker) { // Refresh expire time before putting worker back in pool - poolWorker.setExpirationTime(now() + keepAliveTime); + threadWorker.setExpirationTime(now() + keepAliveTime); - expiringQueue.add(poolWorker); + expiringWorkerQueue.offer(threadWorker); } void evictExpiredWorkers() { - if (!expiringQueue.isEmpty()) { + if (!expiringWorkerQueue.isEmpty()) { long currentTimestamp = now(); - Iterator poolWorkerIterator = expiringQueue.iterator(); - while (poolWorkerIterator.hasNext()) { - PoolWorker poolWorker = poolWorkerIterator.next(); - if (poolWorker.getExpirationTime() <= currentTimestamp) { - poolWorkerIterator.remove(); - poolWorker.unsubscribe(); + Iterator threadWorkerIterator = expiringWorkerQueue.iterator(); + while (threadWorkerIterator.hasNext()) { + ThreadWorker threadWorker = threadWorkerIterator.next(); + if (threadWorker.getExpirationTime() <= currentTimestamp) { + threadWorkerIterator.remove(); + threadWorker.unsubscribe(); } else { // Queue is ordered with the worker that will expire first in the beginning, so when we // find a non-expired worker we can stop evicting. @@ -108,20 +108,20 @@ public Worker createWorker() { private static class EventLoopWorker extends Scheduler.Worker { private final CompositeSubscription innerSubscription = new CompositeSubscription(); - private final PoolWorker poolWorker; + private final ThreadWorker threadWorker; volatile int once; static final AtomicIntegerFieldUpdater ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once"); - EventLoopWorker(PoolWorker poolWorker) { - this.poolWorker = poolWorker; + EventLoopWorker(ThreadWorker threadWorker) { + this.threadWorker = threadWorker; } @Override public void unsubscribe() { if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { // unsubscribe should be idempotent, so only do this once - CachedWorkerPool.INSTANCE.release(poolWorker); + CachedWorkerPool.INSTANCE.release(threadWorker); } innerSubscription.unsubscribe(); } @@ -143,17 +143,17 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { return Subscriptions.empty(); } - NewThreadScheduler.NewThreadWorker.ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit); + NewThreadScheduler.NewThreadWorker.ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; } } - private static final class PoolWorker extends NewThreadScheduler.NewThreadWorker { + private static final class ThreadWorker extends NewThreadScheduler.NewThreadWorker { private long expirationTime; - PoolWorker(ThreadFactory threadFactory) { + ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } From b6af321b38827f26639c127ddeb8d34b50f2b3c1 Mon Sep 17 00:00:00 2001 From: Joakim Bodin Date: Wed, 28 May 2014 22:45:43 +0200 Subject: [PATCH 5/5] Declare Scheduler classes in CachedThreadScheduler as final --- .../src/main/java/rx/schedulers/CachedThreadScheduler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java index acd5be7740..715c8c05d6 100644 --- a/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -25,7 +25,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -/* package */class CachedThreadScheduler extends Scheduler { +/* package */final class CachedThreadScheduler extends Scheduler { private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-"; private static final NewThreadScheduler.RxThreadFactory WORKER_THREAD_FACTORY = new NewThreadScheduler.RxThreadFactory(WORKER_THREAD_NAME_PREFIX); @@ -106,7 +106,7 @@ public Worker createWorker() { return new EventLoopWorker(CachedWorkerPool.INSTANCE.get()); } - private static class EventLoopWorker extends Scheduler.Worker { + private static final class EventLoopWorker extends Scheduler.Worker { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final ThreadWorker threadWorker; volatile int once;