diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java index 254a8fcf83..b8e782ef37 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -67,7 +67,7 @@ public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) { @Override public Subscriber> call(final Subscriber subscriber) { - return new Subscriber>() { + return new Subscriber>(subscriber) { @Override public void onCompleted() { @@ -98,29 +98,13 @@ public void call(final Inner inner) { return; } else { // no buffering (async subscribe) - scheduler.schedule(new Action1() { + subscriber.add(scheduler.schedule(new Action1() { @Override public void call(final Inner inner) { - o.subscribe(new Subscriber(subscriber) { - - @Override - public void onCompleted() { - subscriber.onCompleted(); - } - - @Override - public void onError(Throwable e) { - subscriber.onError(e); - } - - @Override - public void onNext(T t) { - subscriber.onNext(t); - } - }); + o.subscribe(subscriber); } - }); + })); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java index 3a35f766e7..3f12e441f9 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java @@ -46,7 +46,7 @@ public class OperatorSubscribeOnTest { - private class ThreadSubscription implements Subscription { + private static class ThreadSubscription implements Subscription { private volatile Thread thread; private final CountDownLatch latch = new CountDownLatch(1); @@ -111,9 +111,10 @@ public void call(Subscriber t1) { observer.assertTerminalEvent(); } - @Test + @Test(timeout = 2000) public void testIssue813() throws InterruptedException { // https://github.com/Netflix/RxJava/issues/813 + final CountDownLatch scheduled = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch doneLatch = new CountDownLatch(1); @@ -126,16 +127,14 @@ public void testIssue813() throws InterruptedException { public void call( final Subscriber subscriber) { subscriber.add(s); + scheduled.countDown(); try { latch.await(); - // Already called "unsubscribe", "isUnsubscribed" - // shouble be true - if (!subscriber.isUnsubscribed()) { - throw new IllegalStateException( - "subscriber.isUnsubscribed should be true"); - } + + // this should not run because the await above will be interrupted by the unsubscribe subscriber.onCompleted(); } catch (InterruptedException e) { + System.out.println("Interrupted because it is unsubscribed"); Thread.currentThread().interrupt(); } catch (Throwable e) { subscriber.onError(e); @@ -145,13 +144,17 @@ public void call( } }).subscribeOn(Schedulers.computation()).subscribe(observer); + // wait for scheduling + scheduled.await(); + // trigger unsubscribe subscription.unsubscribe(); // As unsubscribe is called in other thread, we need to wait for it. s.getThread(); latch.countDown(); doneLatch.await(); assertEquals(0, observer.getOnErrorEvents().size()); - assertEquals(1, observer.getOnCompletedEvents().size()); + // 0 because the unsubscribe interrupts and prevents onCompleted from being executed + assertEquals(0, observer.getOnCompletedEvents().size()); } public static class SlowScheduler extends Scheduler { @@ -395,4 +398,5 @@ public void call(Subscriber sub) { ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); assertEquals(10, count.get()); } + }