diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTask.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTask.java index 7ae69a3ea5..e0e77a6705 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTask.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTask.java @@ -35,14 +35,17 @@ abstract class AbstractDirectTask protected final Runnable runnable; + protected final boolean interruptOnCancel; + protected Thread runner; protected static final FutureTask FINISHED = new FutureTask<>(Functions.EMPTY_RUNNABLE, null); protected static final FutureTask DISPOSED = new FutureTask<>(Functions.EMPTY_RUNNABLE, null); - AbstractDirectTask(Runnable runnable) { + AbstractDirectTask(Runnable runnable, boolean interruptOnCancel) { this.runnable = runnable; + this.interruptOnCancel = interruptOnCancel; } @Override @@ -51,7 +54,7 @@ public final void dispose() { if (f != FINISHED && f != DISPOSED) { if (compareAndSet(f, DISPOSED)) { if (f != null) { - f.cancel(runner != Thread.currentThread()); + cancelFuture(f); } } } @@ -70,7 +73,7 @@ public final void setFuture(Future future) { break; } if (f == DISPOSED) { - future.cancel(runner != Thread.currentThread()); + cancelFuture(future); break; } if (compareAndSet(f, future)) { @@ -79,6 +82,14 @@ public final void setFuture(Future future) { } } + private void cancelFuture(Future future) { + if (runner == Thread.currentThread()) { + future.cancel(false); + } else { + future.cancel(interruptOnCancel); + } + } + @Override public Runnable getWrappedRunnable() { return runnable; diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java index 2e76702b1b..596609579d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java @@ -58,7 +58,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { if (executor instanceof ExecutorService) { - ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun); + ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun, interruptibleWorker); Future f = ((ExecutorService)executor).submit(task); task.setFuture(f); return task; @@ -85,7 +85,7 @@ public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); if (executor instanceof ScheduledExecutorService) { try { - ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun); + ScheduledDirectTask task = new ScheduledDirectTask(decoratedRun, interruptibleWorker); Future f = ((ScheduledExecutorService)executor).schedule(task, delay, unit); task.setFuture(f); return task; @@ -110,7 +110,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial if (executor instanceof ScheduledExecutorService) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); try { - ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun); + ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, interruptibleWorker); Future f = ((ScheduledExecutorService)executor).scheduleAtFixedRate(task, initialDelay, period, unit); task.setFuture(f); return task; diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/NewThreadWorker.java index a342b246b8..72dc814df1 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/NewThreadWorker.java @@ -59,7 +59,7 @@ public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonN * @return the ScheduledRunnable instance */ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) { - ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run)); + ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run), true); try { Future f; if (delayTime <= 0L) { @@ -104,7 +104,7 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo return periodicWrapper; } - ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun); + ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, true); try { Future f = executor.scheduleAtFixedRate(task, initialDelay, period, unit); task.setFuture(f); diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java index a3b90bdf76..19456af932 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java @@ -27,8 +27,8 @@ public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implem private static final long serialVersionUID = 1811839108042568751L; - public ScheduledDirectPeriodicTask(Runnable runnable) { - super(runnable); + public ScheduledDirectPeriodicTask(Runnable runnable, boolean interruptOnCancel) { + super(runnable, interruptOnCancel); } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectTask.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectTask.java index 4959342d06..ed2348caef 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectTask.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectTask.java @@ -29,8 +29,8 @@ public final class ScheduledDirectTask extends AbstractDirectTask implements Cal private static final long serialVersionUID = 1811839108042568751L; - public ScheduledDirectTask(Runnable runnable) { - super(runnable); + public ScheduledDirectTask(Runnable runnable, boolean interruptOnCancel) { + super(runnable, interruptOnCancel); } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SingleScheduler.java index 2f42acbf35..6bd1d4d35c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SingleScheduler.java @@ -105,7 +105,7 @@ public Worker createWorker() { @NonNull @Override public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { - ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run)); + ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run), true); try { Future f; if (delay <= 0L) { @@ -145,7 +145,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial return periodicWrapper; } - ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun); + ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun, true); try { Future f = executor.get().scheduleAtFixedRate(task, initialDelay, period, unit); task.setFuture(f); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableTimerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableTimerTest.java index 7f6b339961..0479402216 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableTimerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableTimerTest.java @@ -37,7 +37,7 @@ public void dispose() { public void timerInterruptible() throws Exception { ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); try { - for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) { final AtomicBoolean interrupted = new AtomicBoolean(); TestObserver to = Completable.timer(1, TimeUnit.MILLISECONDS, s) .doOnComplete(new Action() { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimerTest.java index 4e299dcbd1..21b0d122fb 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimerTest.java @@ -352,7 +352,7 @@ public void timerDelayZero() { public void timerInterruptible() throws Exception { ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); try { - for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) { final AtomicBoolean interrupted = new AtomicBoolean(); TestSubscriber ts = Flowable.timer(1, TimeUnit.MILLISECONDS, s) .map(new Function() { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimerTest.java index 06a54ad90c..3069795b51 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimerTest.java @@ -37,7 +37,7 @@ public void dispose() { public void timerInterruptible() throws Exception { ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); try { - for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) { final AtomicBoolean interrupted = new AtomicBoolean(); TestObserver to = Maybe.timer(1, TimeUnit.MILLISECONDS, s) .map(new Function() { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimerTest.java index 8eddbce491..1dd128c62a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimerTest.java @@ -317,7 +317,7 @@ public void timerDelayZero() { public void timerInterruptible() throws Exception { ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); try { - for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) { final AtomicBoolean interrupted = new AtomicBoolean(); TestObserver to = Observable.timer(1, TimeUnit.MILLISECONDS, s) .map(new Function() { diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimerTest.java index 2924364459..83af02d2eb 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimerTest.java @@ -37,7 +37,7 @@ public void disposed() { public void timerInterruptible() throws Exception { ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); try { - for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec) }) { + for (Scheduler s : new Scheduler[] { Schedulers.single(), Schedulers.computation(), Schedulers.newThread(), Schedulers.io(), Schedulers.from(exec, true) }) { final AtomicBoolean interrupted = new AtomicBoolean(); TestObserver to = Single.timer(1, TimeUnit.MILLISECONDS, s) .map(new Function() { diff --git a/src/test/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTaskTest.java b/src/test/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTaskTest.java index dfd3ffeae9..d96f5f04ee 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTaskTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/schedulers/AbstractDirectTaskTest.java @@ -27,7 +27,7 @@ public class AbstractDirectTaskTest extends RxJavaTest { @Test public void cancelSetFuture() { - AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) { private static final long serialVersionUID = 208585707945686116L; }; final Boolean[] interrupted = { null }; @@ -58,7 +58,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { @Test public void cancelSetFutureCurrentThread() { - AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) { private static final long serialVersionUID = 208585707945686116L; }; final Boolean[] interrupted = { null }; @@ -91,7 +91,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { @Test public void setFutureCancel() { - AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) { private static final long serialVersionUID = 208585707945686116L; }; final Boolean[] interrupted = { null }; @@ -119,7 +119,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { @Test public void setFutureCancelSameThread() { - AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) { private static final long serialVersionUID = 208585707945686116L; }; final Boolean[] interrupted = { null }; @@ -148,7 +148,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { @Test public void finished() { - AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) { private static final long serialVersionUID = 208585707945686116L; }; final Boolean[] interrupted = { null }; @@ -177,7 +177,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { @Test public void finishedCancel() { - AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) { private static final long serialVersionUID = 208585707945686116L; }; final Boolean[] interrupted = { null }; @@ -211,7 +211,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { @Test public void disposeSetFutureRace() { for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { - final AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE) { + final AbstractDirectTask task = new AbstractDirectTask(Functions.EMPTY_RUNNABLE, true) { private static final long serialVersionUID = 208585707945686116L; }; @@ -246,7 +246,7 @@ static class TestDirectTask extends AbstractDirectTask { private static final long serialVersionUID = 587679821055711738L; TestDirectTask() { - super(Functions.EMPTY_RUNNABLE); + super(Functions.EMPTY_RUNNABLE, true); } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTaskTest.java b/src/test/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTaskTest.java index 7736d5e40b..ee9aaae547 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTaskTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTaskTest.java @@ -35,7 +35,7 @@ public void runnableThrows() { public void run() { throw new TestException(); } - }); + }, true); try { task.run(); diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java index e9c953b049..919b12e8a3 100644 --- a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java +++ b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java @@ -505,4 +505,464 @@ public void run() { worker.dispose(); } } + + @Test + public void interruptibleDirectTaskScheduledExecutor() throws Exception { + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Scheduler scheduler = Schedulers.from(exec, true); + + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertTrue("Interruption did not propagate", isInterrupted.get()); + } finally { + exec.shutdown(); + } + } + + @Test + public void interruptibleWorkerTaskScheduledExecutor() throws Exception { + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Scheduler scheduler = Schedulers.from(exec, true); + + Worker worker = scheduler.createWorker(); + + try { + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = worker.schedule(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertTrue("Interruption did not propagate", isInterrupted.get()); + } finally { + worker.dispose(); + } + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleDirectTask() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleWorkerTask() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + Worker worker = scheduler.createWorker(); + + try { + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = worker.schedule(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + worker.dispose(); + } + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleDirectTaskScheduledExecutor() throws Exception { + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleWorkerTaskScheduledExecutor() throws Exception { + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + Worker worker = scheduler.createWorker(); + + try { + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = worker.schedule(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + worker.dispose(); + } + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleDirectTaskTimed() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }, 1, TimeUnit.MILLISECONDS); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleWorkerTaskTimed() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + Worker worker = scheduler.createWorker(); + + try { + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = worker.schedule(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }, 1, TimeUnit.MILLISECONDS); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + worker.dispose(); + } + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleDirectTaskScheduledExecutorTimed() throws Exception { + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = scheduler.scheduleDirect(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }, 1, TimeUnit.MILLISECONDS); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + exec.shutdown(); + } + } + + @Test + public void nonInterruptibleWorkerTaskScheduledExecutorTimed() throws Exception { + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Scheduler scheduler = Schedulers.from(exec, false); + + Worker worker = scheduler.createWorker(); + + try { + final AtomicInteger sync = new AtomicInteger(2); + + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + Disposable d = worker.schedule(new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + } + }, 1, TimeUnit.MILLISECONDS); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + + Thread.sleep(500); + + d.dispose(); + + int i = 20; + while (i-- > 0 && !isInterrupted.get()) { + Thread.sleep(50); + } + + assertFalse("Interruption happened", isInterrupted.get()); + } finally { + worker.dispose(); + } + } finally { + exec.shutdown(); + } + } }