-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Description
Given o.publish().refCount()
on an infinite synchronous source o
, line 93 in OnSubscribeRefCount
never completes thus emitting
is never false and disconnect cannot happen.
Here's a unit test failing on 0.20.4:
package au.gov.amsa.ais.rx;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
public class RefCountTest {
@Test(timeout=3000)
public void testRefCountUnsubscribeForSynchronousSource() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Observable<Long> o = synchronousInterval().lift(detectUnsubscription(latch));
Subscriber<Long> sub = Subscribers.empty();
o.publish().refCount().subscribeOn(Schedulers.computation()).subscribe(sub);
sub.unsubscribe();
assertTrue(latch.await(3, TimeUnit.SECONDS));
}
private Operator<Long, Long> detectUnsubscription(final CountDownLatch latch) {
return new Operator<Long,Long>(){
@Override
public Subscriber<? super Long> call(Subscriber<? super Long> subscriber) {
latch.countDown();
return Subscribers.from(subscriber);
}};
}
private Observable<Long> synchronousInterval() {
return Observable.create(new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
while (!subscriber.isUnsubscribed()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
subscriber.onNext(1L);
}
}});
}
}