Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ abstract class AbstractDirectTask

protected final Runnable runnable;

protected final boolean interruptOnCancel;

protected Thread runner;

protected static final FutureTask<Void> FINISHED = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);

protected static final FutureTask<Void> DISPOSED = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);

AbstractDirectTask(Runnable runnable) {
AbstractDirectTask(Runnable runnable, boolean interruptOnCancel) {
this.runnable = runnable;
this.interruptOnCancel = interruptOnCancel;
}

@Override
Expand All @@ -51,7 +54,7 @@ public final void dispose() {
if (f != FINISHED && f != DISPOSED) {
if (compareAndSet(f, DISPOSED)) {
if (f != null) {
f.cancel(runner != Thread.currentThread());
cancelFuture(f);
}
}
}
Expand All @@ -70,7 +73,7 @@ public final void setFuture(Future<?> future) {
break;
}
if (f == DISPOSED) {
future.cancel(runner != Thread.currentThread());
cancelFuture(future);
break;
}
if (compareAndSet(f, future)) {
Expand All @@ -79,6 +82,14 @@ public final void setFuture(Future<?> future) {
}
}

private void cancelFuture(Future<?> future) {
if (runner == Thread.currentThread()) {
future.cancel(false);
} else {
future.cancel(interruptOnCancel);
}
}

@Override
public Runnable getWrappedRunnable() {
return runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
if (executor instanceof ExecutorService) {
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun, interruptibleWorker);
Future<?> f = ((ExecutorService)executor).submit(task);
task.setFuture(f);
return task;
Expand All @@ -85,7 +85,7 @@ public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (executor instanceof ScheduledExecutorService) {
try {
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun);
ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun, interruptibleWorker);
Future<?> f = ((ScheduledExecutorService)executor).schedule(task, delay, unit);
task.setFuture(f);
return task;
Expand All @@ -110,7 +110,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
if (executor instanceof ScheduledExecutorService) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, interruptibleWorker);
Future<?> f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
return task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonN
* @return the ScheduledRunnable instance
*/
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run), true);
try {
Future<?> f;
if (delayTime <= 0L) {
Expand Down Expand Up @@ -104,7 +104,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo

return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, true);
try {
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implem

private static final long serialVersionUID = 1811839108042568751L;

public ScheduledDirectPeriodicTask(Runnable runnable) {
super(runnable);
public ScheduledDirectPeriodicTask(Runnable runnable, boolean interruptOnCancel) {
super(runnable, interruptOnCancel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public final class ScheduledDirectTask extends AbstractDirectTask implements Cal

private static final long serialVersionUID = 1811839108042568751L;

public ScheduledDirectTask(Runnable runnable) {
super(runnable);
public ScheduledDirectTask(Runnable runnable, boolean interruptOnCancel) {
super(runnable, interruptOnCancel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Worker createWorker() {
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run), true);
try {
Future<?> f;
if (delay <= 0L) {
Expand Down Expand Up @@ -145,7 +145,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial

return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, true);
try {
Future<?> f = executor.get().scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void dispose() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Void> to = Completable.timer(1, TimeUnit.MILLISECONDS, s)
.doOnComplete(new Action() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void timerDelayZero() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestSubscriber<Long> ts = Flowable.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void dispose() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Long> to = Maybe.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void timerDelayZero() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Long> to = Observable.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void disposed() {
public void timerInterruptible() throws Exception {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) {
for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) {
final AtomicBoolean interrupted = new AtomicBoolean();
TestObserver<Long> to = Single.timer(1, TimeUnit.MILLISECONDS, s)
.map(new Function<Long, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class AbstractDirectTaskTest extends RxJavaTest {

@Test
public void cancelSetFuture() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -58,7 +58,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void cancelSetFutureCurrentThread() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -91,7 +91,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void setFutureCancel() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -119,7 +119,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void setFutureCancelSameThread() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -148,7 +148,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void finished() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -177,7 +177,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {

@Test
public void finishedCancel() {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};
final Boolean[] interrupted = { null };
Expand Down Expand Up @@ -211,7 +211,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
@Test
public void disposeSetFutureRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) {
final AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) {
private static final long serialVersionUID = 208585707945686116L;
};

Expand Down Expand Up @@ -246,7 +246,7 @@ static class TestDirectTask extends AbstractDirectTask {
private static final long serialVersionUID = 587679821055711738L;

TestDirectTask() {
super(Functions.EMPTY_RUNNABLE);
super(Functions.EMPTY_RUNNABLE, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void runnableThrows() {
public void run() {
throw new TestException();
}
});
}, true);

try {
task.run();
Expand Down
Loading