Skip to content

Commit 41ce80c

Browse files
committed
fix(ConnectableObservable): fix ConnectableObservable connection handling issue
1 parent be2c298 commit 41ce80c

File tree

2 files changed

+78
-5
lines changed

2 files changed

+78
-5
lines changed

spec/operators/publishReplay-spec.ts

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,48 @@ describe('Observable.prototype.publishReplay', () => {
283283
done();
284284
});
285285

286+
it('should emit replayed values and resubscribe to the source when ' +
287+
'reconnected without source completion', () => {
288+
const results1 = [];
289+
const results2 = [];
290+
let subscriptions = 0;
291+
292+
const source = new Observable((observer: Rx.Observer<number>) => {
293+
subscriptions++;
294+
observer.next(1);
295+
observer.next(2);
296+
observer.next(3);
297+
observer.next(4);
298+
// observer.complete();
299+
});
300+
301+
const connectable = source.publishReplay(2);
302+
const subscription1 = connectable.subscribe((x: number) => {
303+
results1.push(x);
304+
});
305+
306+
expect(results1).to.deep.equal([]);
307+
expect(results2).to.deep.equal([]);
308+
309+
connectable.connect().unsubscribe();
310+
subscription1.unsubscribe();
311+
312+
expect(results1).to.deep.equal([1, 2, 3, 4]);
313+
expect(results2).to.deep.equal([]);
314+
expect(subscriptions).to.equal(1);
315+
316+
const subscription2 = connectable.subscribe((x: number) => {
317+
results2.push(x);
318+
});
319+
320+
connectable.connect().unsubscribe();
321+
subscription2.unsubscribe();
322+
323+
expect(results1).to.deep.equal([1, 2, 3, 4]);
324+
expect(results2).to.deep.equal([3, 4, 1, 2, 3, 4]);
325+
expect(subscriptions).to.equal(2);
326+
});
327+
286328
it('should emit replayed values plus completed when subscribed after completed', (done: MochaDone) => {
287329
const results1 = [];
288330
const results2 = [];
@@ -358,4 +400,4 @@ describe('Observable.prototype.publishReplay', () => {
358400

359401
published.connect();
360402
});
361-
});
403+
});

src/observable/ConnectableObservable.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ export class ConnectableObservable<T> extends Observable<T> {
3333
connect(): Subscription {
3434
let connection = this._connection;
3535
if (!connection) {
36-
connection = this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this));
36+
connection = this._connection = new Subscription();
37+
connection.add(this.source
38+
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
3739
if (connection.isUnsubscribed) {
3840
this._connection = null;
3941
connection = Subscription.EMPTY;
@@ -66,9 +68,13 @@ class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
6668
const { connectable } = this;
6769
if (connectable) {
6870
this.connectable = null;
71+
const connection = (<any> connectable)._connection;
6972
(<any> connectable)._refCount = 0;
7073
(<any> connectable)._subject = null;
7174
(<any> connectable)._connection = null;
75+
if (connection) {
76+
connection.unsubscribe();
77+
}
7278
}
7379
}
7480
}
@@ -122,10 +128,35 @@ class RefCountSubscriber<T> extends Subscriber<T> {
122128
return;
123129
}
124130

131+
///
132+
// Compare the local RefCountSubscriber's connection Subscription to the
133+
// connection Subscription on the shared ConnectableObservable. In cases
134+
// where the ConnectableObservable source synchronously emits values, and
135+
// the RefCountSubscriber's dowstream Observers synchronously unsubscribe,
136+
// execution continues to here before the RefCountOperator has a chance to
137+
// supply the RefCountSubscriber with the shared connection Subscription.
138+
// For example:
139+
// ```
140+
// Observable.range(0, 10)
141+
// .publish()
142+
// .refCount()
143+
// .take(5)
144+
// .subscribe();
145+
// ```
146+
// In order to account for this case, RefCountSubscriber should only dispose
147+
// the ConnectableObservable's shared connection Subscription if the
148+
// connection Subscription exists, *and* either:
149+
// a. RefCountSubscriber doesn't have a reference to the shared connection
150+
// Subscription yet, or,
151+
// b. RefCountSubscriber's connection Subscription reference is identical
152+
// to the shared connection Subscription
153+
///
125154
const { connection } = this;
126-
if (connection) {
127-
this.connection = null;
128-
connection.unsubscribe();
155+
const sharedConnection = (<any> connectable)._connection;
156+
this.connection = null;
157+
158+
if (sharedConnection && (!connection || sharedConnection === connection)) {
159+
sharedConnection.unsubscribe();
129160
}
130161
}
131162
}

0 commit comments

Comments
 (0)