Skip to content

Commit

Permalink
Merge pull request #1048 from benjchristensen/executor-scheduler
Browse files Browse the repository at this point in the history
Remove ExecutorScheduler - New ComputationScheduler
  • Loading branch information
benjchristensen committed Apr 19, 2014
2 parents c37fc2b + 15a20e1 commit 320495f
Show file tree
Hide file tree
Showing 21 changed files with 199 additions and 449 deletions.

This file was deleted.

Expand Up @@ -58,7 +58,7 @@ public Observable<Integer> call() throws Exception {
}
};

Observable<Integer> result = Async.deferFuture(func, Schedulers.threadPoolForComputation());
Observable<Integer> result = Async.deferFuture(func, Schedulers.computation());

final Observer<Integer> observer = mock(Observer.class);

Expand Down
Expand Up @@ -54,7 +54,7 @@ public void testSimple() {

try {
Observable<Integer> source = Observable.from(1, 2, 3)
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -93,7 +93,7 @@ public void testSimpleThrowing() {

try {
Observable<Integer> source = Observable.<Integer>error(new CustomException())
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -128,7 +128,7 @@ public void call(Integer t1) {
@Test
public void testSimpleScheduled() {
Observable<Integer> source = Observable.from(1, 2, 3)
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -158,7 +158,7 @@ public void call(Integer t1) {
public void testSimpleScheduledThrowing() {

Observable<Integer> source = Observable.<Integer>error(new CustomException())
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down
Expand Up @@ -104,7 +104,7 @@ public Integer call() throws Exception {
}
};

Observable<Integer> result = Async.startFuture(func, Schedulers.threadPoolForComputation());
Observable<Integer> result = Async.startFuture(func, Schedulers.computation());

final Observer<Integer> observer = mock(Observer.class);

Expand Down
Expand Up @@ -442,7 +442,7 @@ public void testWhileDoZeroTimes() {

@Test
public void testWhileDoManyTimes() {
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.trampoline());

List<Integer> expected = new ArrayList<Integer>(numRecursion * 3);
for (int i = 0; i < numRecursion; i++) {
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Expand Up @@ -197,7 +197,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit) {
return buffer(source, timespan, unit, Schedulers.threadPoolForComputation());
return buffer(source, timespan, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -259,7 +259,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit, int count) {
return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation());
return buffer(source, timespan, unit, count, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -325,7 +325,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
return buffer(source, timespan, timeshift, unit, Schedulers.computation());
}

/**
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationWindow.java
Expand Up @@ -195,7 +195,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
return window(source, timespan, unit, Schedulers.threadPoolForComputation());
return window(source, timespan, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -255,7 +255,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
return window(source, timespan, unit, count, Schedulers.threadPoolForComputation());
return window(source, timespan, unit, count, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -318,7 +318,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
return window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
return window(source, timespan, timeshift, unit, Schedulers.computation());
}

/**
Expand Down
109 changes: 109 additions & 0 deletions rxjava-core/src/main/java/rx/schedulers/ComputationScheduler.java
@@ -0,0 +1,109 @@
package rx.schedulers;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.schedulers.NewThreadScheduler.OnActionComplete;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/* package */class ComputationScheduler extends Scheduler {

private static class ComputationSchedulerPool {
final int cores = Runtime.getRuntime().availableProcessors();
final ThreadFactory factory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet());
t.setDaemon(true);
return t;
}
};

final EventLoopScheduler[] eventLoops;

ComputationSchedulerPool() {
// initialize event loops
eventLoops = new EventLoopScheduler[cores];
for (int i = 0; i < cores; i++) {
eventLoops[i] = new EventLoopScheduler(factory);
}
}

private static ComputationSchedulerPool INSTANCE = new ComputationSchedulerPool();

long n = 0;

public EventLoopScheduler getEventLoop() {
// round-robin selection (improvements to come)
return eventLoops[(int) (n++ % cores)];
}

}

@Override
public Inner createInner() {
return new EventLoop();
}

private static class EventLoop extends Scheduler.Inner {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final EventLoopScheduler pooledEventLoop;
private final OnActionComplete onComplete;

EventLoop() {
pooledEventLoop = ComputationSchedulerPool.INSTANCE.getEventLoop();
onComplete = new OnActionComplete() {

@Override
public void complete(Subscription s) {
innerSubscription.remove(s);
}

};
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}
return pooledEventLoop.schedule(action, onComplete);
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}

return pooledEventLoop.schedule(action, delayTime, unit, onComplete);
}

}

private static class EventLoopScheduler extends NewThreadScheduler.EventLoopScheduler {
EventLoopScheduler(ThreadFactory threadFactory) {
super(threadFactory);
}
}

}

0 comments on commit 320495f

Please sign in to comment.