Skip to content

Commit

Permalink
feat(refCountOn): Add operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Aug 18, 2018
1 parent 9899b22 commit fbc8b02
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
1 change: 1 addition & 0 deletions source/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export * from "./pluck";
export * from "./prioritize";
export * from "./rateLimit";
export * from "./refCountAuditTime";
export * from "./refCountOn";
export * from "./reschedule";
export * from "./subsequent";
export * from "./switchMapUntil";
Expand Down
45 changes: 45 additions & 0 deletions source/operators/refCountOn-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/
/*tslint:disable:no-unused-expression*/

import { expect } from "chai";
import { defer, of, queueScheduler } from "rxjs";
import { publish } from "rxjs/operators";
import { refCountOn } from "./refCountOn";

describe("refCountOn", () => {

it("should support queue-scheduled actions", () => {

let received = false;
let subscribed = false;

const source = defer(() => { subscribed = true; return of("foo"); });

queueScheduler.schedule(() => {
const subscription = source.pipe(
publish(),
refCountOn(queueScheduler)
).subscribe(() => received = true);
subscription.unsubscribe();
});

expect(received).to.be.false;
expect(subscribed).to.be.false;

queueScheduler.schedule(() => {
const subscription = source.pipe(
publish(),
refCountOn(queueScheduler)
).subscribe(() => received = true);
queueScheduler.schedule(() => {
subscription.unsubscribe();
});
});

expect(received).to.be.true;
expect(subscribed).to.be.true;
});
});
43 changes: 43 additions & 0 deletions source/operators/refCountOn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/

import {
ConnectableObservable,
MonoTypeOperatorFunction,
Observable,
SchedulerLike,
Subscription,
using
} from "rxjs";

export function refCountOn<T>(scheduler: SchedulerLike): MonoTypeOperatorFunction<T> {

return (source: Observable<T>) => {

const connectable: ConnectableObservable<T> = source as any;
let count = 0;
let subscription: Subscription | null = null;

return using(() => {
++count;
scheduler.schedule(() => {
if (!subscription && (count > 0)) {
subscription = connectable.connect();
}
});
return {
unsubscribe: () => {
--count;
scheduler.schedule(() => {
if (subscription && (count === 0)) {
subscription.unsubscribe();
subscription = null;
}
});
}
};
}, () => source);
};
}

0 comments on commit fbc8b02

Please sign in to comment.