Skip to content

Commit ef95141

Browse files
committed
fix(refCount): Support multiple, sync subscribes.
1 parent cbfd53f commit ef95141

File tree

2 files changed

+37
-13
lines changed

2 files changed

+37
-13
lines changed

source/operators/refCountAuditTime-spec.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
/*tslint:disable:no-unused-expression*/
66

77
import { expect } from "chai";
8-
import { of } from "rxjs";
8+
import { defer, of } from "rxjs";
99
import { mergeMapTo, publish, toArray } from "rxjs/operators";
1010
import { marbles } from "rxjs-marbles";
1111
import { refCountAuditTime } from "./refCountAuditTime";
@@ -129,4 +129,19 @@ describe("refCountAuditTime", () => {
129129
const published = source.pipe(publish(), refCountAuditTime(0), toArray());
130130
return published.toPromise().then(value => expect(value).to.deep.equal([1, 2, 3]));
131131
});
132+
133+
it("should support mutliple, synchronous subscriptions", () => {
134+
135+
let subscribes = 0;
136+
const source = defer(() => {
137+
++subscribes;
138+
return of(1, 2, 3);
139+
});
140+
141+
const published = source.pipe(publish(), refCountAuditTime(0), toArray());
142+
return Promise.all([
143+
published.toPromise(),
144+
published.toPromise()
145+
]).then(() => expect(subscribes).to.equal(1));
146+
});
132147
});

source/operators/refCountAuditTime.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55

66
import {
7+
asapScheduler,
78
ConnectableObservable,
89
MonoTypeOperatorFunction,
910
NEVER,
@@ -19,7 +20,7 @@ import { scan, switchMap, tap } from "rxjs/operators";
1920

2021
export function refCountAuditTime<T>(
2122
duration: number,
22-
scheduler?: SchedulerLike
23+
scheduler: SchedulerLike = asapScheduler
2324
): MonoTypeOperatorFunction<T> {
2425

2526
return (source: Observable<T>) => {
@@ -32,31 +33,39 @@ export function refCountAuditTime<T>(
3233
// https://github.com/ReactiveX/rxjs/issues/171#issuecomment-267881847
3334

3435
const connectable: ConnectableObservable<T> = source as any;
35-
let subscription: Subscription | null = null;
36+
let connectableSubscription: Subscription | null = null;
37+
let connectorSubscription: Subscription | null = null;
3638

3739
const notifier = new Subject<number>();
3840
const connector = notifier.pipe(
3941
scan((count, step) => count + step, 0),
4042
switchMap(count => {
41-
switch (count) {
42-
case 0:
43+
if (count === 0) {
4344
return timer(duration, scheduler).pipe(tap(() => {
44-
subscription!.unsubscribe();
45-
subscription = null;
45+
if (connectableSubscription) {
46+
connectableSubscription.unsubscribe();
47+
connectableSubscription = null;
48+
}
49+
if (connectorSubscription) {
50+
connectorSubscription.unsubscribe();
51+
connectorSubscription = null;
52+
}
4653
}));
47-
case 1:
54+
}
55+
if (!connectableSubscription && (count > 0)) {
4856
return timer(0, scheduler).pipe(tap(() => {
49-
subscription!.add(connectable.connect());
57+
if (!connectableSubscription) {
58+
connectableSubscription = connectable.connect();
59+
}
5060
}));
51-
default:
52-
return NEVER;
5361
}
62+
return NEVER;
5463
})
5564
);
5665

5766
return using(() => {
58-
if (subscription === null) {
59-
subscription = connector.subscribe();
67+
if (!connectorSubscription) {
68+
connectorSubscription = connector.subscribe();
6069
}
6170
notifier.next(1);
6271
return { unsubscribe: () => notifier.next(-1) };

0 commit comments

Comments
 (0)