Skip to content

Commit

Permalink
Merge pull request #472 from benjchristensen/issue-431-newThreadUnsub…
Browse files Browse the repository at this point in the history
…scribe

BugFix: Issue 431 Unsubscribe with Schedulers.newThread
  • Loading branch information
benjchristensen committed Nov 7, 2013
2 parents 2e2ab16 + ac74a79 commit a57042c
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void run() {
}
}, initialDelay, period, unit);

subscriptions.add(Subscriptions.create(f));
subscriptions.add(Subscriptions.from(f));
return subscriptions;

} else {
Expand All @@ -84,7 +84,7 @@ public void run() {
}
}, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.create(f));
subscription.add(Subscriptions.from(f));
} else {
// we are not a ScheduledExecutorService so can't directly schedule
if (delayTime == 0) {
Expand All @@ -106,7 +106,7 @@ public void run() {
}
}, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.create(f));
subscription.add(Subscriptions.from(f));
}
}
return subscription;
Expand Down Expand Up @@ -134,7 +134,7 @@ public void run() {
// we are an ExecutorService so get a Future back that supports unsubscribe
Future<?> f = ((ExecutorService) executor).submit(r);
// add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.create(f));
subscription.add(Subscriptions.from(f));
} else {
// we are the lowest common denominator so can't unsubscribe once we execute
executor.execute(r);
Expand Down
21 changes: 14 additions & 7 deletions rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,22 @@ public Thread newThread(Runnable r) {
}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
// all subscriptions that may need to be unsubscribed
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);

final Scheduler _scheduler = this;
return Subscriptions.from(executor.submit(new Runnable() {
subscription.add(Subscriptions.from(executor.submit(new Runnable() {

@Override
public void run() {
action.call(_scheduler, state);
Subscription s = discardableAction.call(_scheduler);
subscription.add(s);
}
}));
})));

return subscription;
}

@Override
Expand All @@ -89,15 +96,15 @@ public void run() {
}, delayTime, unit);

// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.create(f));
subscription.add(Subscriptions.from(f));

return subscription;
}

}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
EventLoopScheduler s = new EventLoopScheduler();
return s.schedule(state, action);
}
Expand All @@ -122,7 +129,7 @@ public void run() {
}, delay, unit);

// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.create(f));
subscription.add(Subscriptions.from(f));

return subscription;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package rx.concurrency;

import static org.junit.Assert.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Func1;

public class SchedulerUnsubscribeTest {

/**
* Bug report: https://github.com/Netflix/RxJava/issues/431
*/
@Test
public void testUnsubscribeOfNewThread() throws InterruptedException {
testUnSubscribeForScheduler(Schedulers.newThread());
}

@Test
public void testUnsubscribeOfThreadPoolForIO() throws InterruptedException {
testUnSubscribeForScheduler(Schedulers.threadPoolForIO());
}

@Test
public void testUnsubscribeOfThreadPoolForComputation() throws InterruptedException {
testUnSubscribeForScheduler(Schedulers.threadPoolForComputation());
}

@Test
public void testUnsubscribeOfCurrentThread() throws InterruptedException {
testUnSubscribeForScheduler(Schedulers.currentThread());
}

public void testUnSubscribeForScheduler(Scheduler scheduler) throws InterruptedException {

final AtomicInteger countReceived = new AtomicInteger();
final AtomicInteger countGenerated = new AtomicInteger();
final SafeObservableSubscription s = new SafeObservableSubscription();
final CountDownLatch latch = new CountDownLatch(1);

s.wrap(Observable.interval(50, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
System.out.println("generated " + aLong);
countGenerated.incrementAndGet();
return aLong;
}
})
.subscribeOn(scheduler)
.observeOn(Schedulers.currentThread())
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
System.out.println("--- completed");
}

@Override
public void onError(Throwable e) {
System.out.println("--- onError");
}

@Override
public void onNext(Long args) {
if (countReceived.incrementAndGet() == 2) {
s.unsubscribe();
latch.countDown();
}
System.out.println("==> Received " + args);
}
}));

latch.await(1000, TimeUnit.MILLISECONDS);

System.out.println("----------- it thinks it is finished ------------------ ");
Thread.sleep(100);

assertEquals(2, countGenerated.get());
}
}

0 comments on commit a57042c

Please sign in to comment.