Skip to content

Commit

Permalink
2.x: Fix refCount eager disconnect not resetting the connection (#6297)
Browse files Browse the repository at this point in the history
* 2.x: Fix refCount eager disconnect not resetting the connection

* Remove unnecessary comment.
  • Loading branch information
akarnokd committed Nov 12, 2018
1 parent 390b7b0 commit 6afd2b8
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 2 deletions.
Expand Up @@ -141,7 +141,11 @@ void timeout(RefConnection rc) {
if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(connectionObject);
if (connectionObject == null) {
rc.disconnectedEarly = true;
} else {
((ResettableConnectable)source).resetIf(connectionObject);
}
}
}
}
Expand All @@ -160,6 +164,8 @@ static final class RefConnection extends AtomicReference<Disposable>

boolean connected;

boolean disconnectedEarly;

RefConnection(FlowableRefCount<?> parent) {
this.parent = parent;
}
Expand All @@ -172,6 +178,11 @@ public void run() {
@Override
public void accept(Disposable t) throws Exception {
DisposableHelper.replace(this, t);
synchronized (parent) {
if (disconnectedEarly) {
((ResettableConnectable)parent.source).resetIf(t);
}
}
}
}

Expand Down
Expand Up @@ -135,10 +135,15 @@ void timeout(RefConnection rc) {
connection = null;
Disposable connectionObject = rc.get();
DisposableHelper.dispose(rc);

if (source instanceof Disposable) {
((Disposable)source).dispose();
} else if (source instanceof ResettableConnectable) {
((ResettableConnectable)source).resetIf(connectionObject);
if (connectionObject == null) {
rc.disconnectedEarly = true;
} else {
((ResettableConnectable)source).resetIf(connectionObject);
}
}
}
}
Expand All @@ -157,6 +162,8 @@ static final class RefConnection extends AtomicReference<Disposable>

boolean connected;

boolean disconnectedEarly;

RefConnection(ObservableRefCount<?> parent) {
this.parent = parent;
}
Expand All @@ -169,6 +176,11 @@ public void run() {
@Override
public void accept(Disposable t) throws Exception {
DisposableHelper.replace(this, t);
synchronized (parent) {
if (disconnectedEarly) {
((ResettableConnectable)parent.source).resetIf(t);
}
}
}
}

Expand Down
Expand Up @@ -1394,4 +1394,19 @@ public void timeoutDisposesSource() {

assertTrue(((Disposable)o.source).isDisposed());
}

@Test
public void disconnectBeforeConnect() {
BehaviorProcessor<Integer> processor = BehaviorProcessor.create();

Flowable<Integer> flowable = processor
.replay(1)
.refCount();

flowable.takeUntil(Flowable.just(1)).test();

processor.onNext(2);

flowable.take(1).test().assertResult(2);
}
}
Expand Up @@ -1345,4 +1345,19 @@ public void timeoutDisposesSource() {

assertTrue(((Disposable)o.source).isDisposed());
}

@Test
public void disconnectBeforeConnect() {
BehaviorSubject<Integer> subject = BehaviorSubject.create();

Observable<Integer> observable = subject
.replay(1)
.refCount();

observable.takeUntil(Observable.just(1)).test();

subject.onNext(2);

observable.take(1).test().assertResult(2);
}
}

0 comments on commit 6afd2b8

Please sign in to comment.