Skip to content

Commit

Permalink
feat(initial): Add initial operator.
Browse files Browse the repository at this point in the history
The reverse of subsequent.
  • Loading branch information
cartant committed May 10, 2018
1 parent 8317a98 commit c5a0b84
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
1 change: 1 addition & 0 deletions source/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export * from "./debounceTimeSubsequent";
export * from "./defaultObservableIfEmpty";
export * from "./endWith";
export * from "./guard";
export * from "./initial";
export * from "./instanceOf";
export * from "./pluck";
export * from "./prioritize";
Expand Down
36 changes: 36 additions & 0 deletions source/operators/initial-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/
/*tslint:disable:no-unused-expression*/

import { delay } from "rxjs/operators";
import { marbles } from "rxjs-marbles";
import { initial } from "./initial";

describe("initial", () => {

it("should debounce only initial notifications", marbles((m) => {

const source = m.cold("abc---d|");
const sourceSubs = "^------!";
const expected = m.cold("-bca--d|");

const duration = m.time("---|");
const destination = source.pipe(initial(s => s.pipe(delay(duration, m.scheduler))));
m.expect(destination).toBeObservable(expected);
m.expect(source).toHaveSubscriptions(sourceSubs);
}));

it("should support count", marbles((m) => {

const source = m.cold("abc---d|");
const sourceSubs = "^------!";
const expected = m.cold("--cab-d|");

const duration = m.time("---|");
const destination = source.pipe(initial(2, s => s.pipe(delay(duration, m.scheduler))));
m.expect(destination).toBeObservable(expected);
m.expect(source).toHaveSubscriptions(sourceSubs);
}));
});
56 changes: 56 additions & 0 deletions source/operators/initial.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/

import {
ConnectableObservable,
merge,
MonoTypeOperatorFunction,
Observable,
OperatorFunction
} from "rxjs/";

import { publish, skip, take } from "rxjs/operators";

export function initial<T, R>(
count: number,
selector: (source: Observable<T>) => Observable<R>
): OperatorFunction<T, T | R>;

export function initial<T>(
count: number,
selector: (source: Observable<T>) => Observable<T>
): MonoTypeOperatorFunction<T>;

export function initial<T, R>(
selector: (source: Observable<T>) => Observable<R>
): OperatorFunction<T, T | R>;

export function initial<T>(
selector: (source: Observable<T>) => Observable<T>
): MonoTypeOperatorFunction<T>;

export function initial<T, R>(
countOrSelector: number | ((source: Observable<T>) => Observable<R>),
selector?: (source: Observable<T>) => Observable<R>
): OperatorFunction<T, T | R> {

let count: number;
if (typeof countOrSelector === "number") {
count = countOrSelector;
} else {
count = 1;
selector = countOrSelector;
}

return (source: Observable<T>) => new Observable<T | R>(observer => {
const published = source.pipe(publish()) as ConnectableObservable<T>;
const subscription = merge(
selector!(published.pipe(take(count))),
published.pipe(skip(count))
).subscribe(observer);
subscription.add(published.connect());
return subscription;
});
}

0 comments on commit c5a0b84

Please sign in to comment.