Skip to content

Commit

Permalink
feat(tapSubscribe): Add operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed May 17, 2019
1 parent 366edd1 commit fd52583
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 0 deletions.
1 change: 1 addition & 0 deletions source/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export * from "./subsequent";
export * from "./switchMapUntil";
export * from "./switchTap";
export * from "./takeWhileInclusive";
export * from "./tapSubscribe";
export * from "./tapWithIndex";
export * from "./throttleAfter";
export * from "./withLatestFromWhen";
Expand Down
105 changes: 105 additions & 0 deletions source/operators/tapSubscribe-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/
/*tslint:disable:no-unused-expression*/

import { expect } from "chai";
import { marbles } from "rxjs-marbles";
import { tapSubscribe } from "./tapSubscribe";

// prettier-ignore
describe("tapSubscribe", () => {
it(
"should mirror the source with a function argument",
marbles(m => {
const source = m.cold(" abc|");
const expected = " abc|";

let subscribeCount = 0;
const result = source.pipe(tapSubscribe(() => ++subscribeCount));
m.expect(result).toBeObservable(expected);
m.flush();
expect(subscribeCount).to.equal(1);
})
);

it(
"should mirror the source with a config argument",
marbles(m => {
const source = m.cold(" abc|");
const expected = " abc|";

let subscribeCount = 0;
let unsubscribeCount = 0;
const result = source.pipe(tapSubscribe({
subscribe: () => ++subscribeCount,
unsubscribe: () => ++unsubscribeCount
}));
m.expect(result).toBeObservable(expected);
m.flush();
expect(subscribeCount).to.equal(1);
expect(unsubscribeCount).to.equal(1);
})
);

it(
"should support ignoring complete",
marbles(m => {
const source = m.cold(" abc|");
const expected = " abc|";

let subscribeCount = 0;
let unsubscribeCount = 0;
const result = source.pipe(tapSubscribe({
ignore: { complete: true },
subscribe: () => ++subscribeCount,
unsubscribe: () => ++unsubscribeCount
}));
m.expect(result).toBeObservable(expected);
m.flush();
expect(subscribeCount).to.equal(1);
expect(unsubscribeCount).to.equal(0);
})
);

it(
"should support ignoring error",
marbles(m => {
const source = m.cold(" abc#");
const expected = " abc#";

let subscribeCount = 0;
let unsubscribeCount = 0;
const result = source.pipe(tapSubscribe({
ignore: { error: true },
subscribe: () => ++subscribeCount,
unsubscribe: () => ++unsubscribeCount
}));
m.expect(result).toBeObservable(expected);
m.flush();
expect(subscribeCount).to.equal(1);
expect(unsubscribeCount).to.equal(0);
})
);

it(
"should support explicit unsubscription",
marbles(m => {
const source = m.cold(" abc-");
const expected = " abc-";
const subs = " ^--!";

let subscribeCount = 0;
let unsubscribeCount = 0;
const result = source.pipe(tapSubscribe({
subscribe: () => ++subscribeCount,
unsubscribe: () => ++unsubscribeCount
}));
m.expect(result, subs).toBeObservable(expected);
m.flush();
expect(subscribeCount).to.equal(1);
expect(unsubscribeCount).to.equal(1);
})
);
});
49 changes: 49 additions & 0 deletions source/operators/tapSubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/

import { defer, MonoTypeOperatorFunction, noop } from "rxjs";
import { finalize, tap } from "rxjs/operators";

export interface TapSubscriberConfig {
ignore?: { complete?: boolean; error?: boolean };
subscribe?: () => void;
unsubscribe?: () => void;
}

export function tapSubscribe<T>(
config: TapSubscriberConfig
): MonoTypeOperatorFunction<T>;
export function tapSubscribe<T>(
subscribe: () => void
): MonoTypeOperatorFunction<T>;
export function tapSubscribe<T>(
configOrSubscribe: TapSubscriberConfig | (() => void)
): MonoTypeOperatorFunction<T> {
const { ignore = {}, subscribe = noop, unsubscribe = noop } =
typeof configOrSubscribe === "function"
? { subscribe: configOrSubscribe }
: configOrSubscribe;
return source =>
defer(() => {
let completed = false;
let errored = false;
subscribe();
return source.pipe(
tap({
complete: () => (completed = true),
error: () => (errored = true)
}),
finalize(() => {
if (completed && ignore.complete) {
return;
}
if (errored && ignore.error) {
return;
}
unsubscribe();
})
);
});
}

0 comments on commit fd52583

Please sign in to comment.