Skip to content

Commit

Permalink
SubscribeOn Scheduler/Unsubscribe Behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Feb 13, 2014
1 parent 7084cd0 commit 43437fe
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 29 deletions.
24 changes: 4 additions & 20 deletions rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
return new Subscriber<Observable<T>>() {
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
Expand Down Expand Up @@ -98,29 +98,13 @@ public void call(final Inner inner) {
return;
} else {
// no buffering (async subscribe)
scheduler.schedule(new Action1<Inner>() {
subscriber.add(scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
o.subscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
o.subscribe(subscriber);
}
});
}));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

public class OperatorSubscribeOnTest {

private class ThreadSubscription implements Subscription {
private static class ThreadSubscription implements Subscription {
private volatile Thread thread;

private final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -111,9 +111,10 @@ public void call(Subscriber<? super Integer> t1) {
observer.assertTerminalEvent();
}

@Test
@Test(timeout = 2000)
public void testIssue813() throws InterruptedException {
// https://github.com/Netflix/RxJava/issues/813
final CountDownLatch scheduled = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(1);

Expand All @@ -126,16 +127,14 @@ public void testIssue813() throws InterruptedException {
public void call(
final Subscriber<? super Integer> subscriber) {
subscriber.add(s);
scheduled.countDown();
try {
latch.await();
// Already called "unsubscribe", "isUnsubscribed"
// shouble be true
if (!subscriber.isUnsubscribed()) {
throw new IllegalStateException(
"subscriber.isUnsubscribed should be true");
}

// this should not run because the await above will be interrupted by the unsubscribe
subscriber.onCompleted();
} catch (InterruptedException e) {
System.out.println("Interrupted because it is unsubscribed");
Thread.currentThread().interrupt();
} catch (Throwable e) {
subscriber.onError(e);
Expand All @@ -145,13 +144,17 @@ public void call(
}
}).subscribeOn(Schedulers.computation()).subscribe(observer);

// wait for scheduling
scheduled.await();
// trigger unsubscribe
subscription.unsubscribe();
// As unsubscribe is called in other thread, we need to wait for it.
s.getThread();
latch.countDown();
doneLatch.await();
assertEquals(0, observer.getOnErrorEvents().size());
assertEquals(1, observer.getOnCompletedEvents().size());
// 0 because the unsubscribe interrupts and prevents onCompleted from being executed
assertEquals(0, observer.getOnCompletedEvents().size());
}

public static class SlowScheduler extends Scheduler {
Expand Down Expand Up @@ -395,4 +398,5 @@ public void call(Subscriber<? super Integer> sub) {
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
assertEquals(10, count.get());
}

}

0 comments on commit 43437fe

Please sign in to comment.