Skip to content

Commit

Permalink
refactor(traverse): Use an options parameter.
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed May 12, 2018
1 parent 0b41cc0 commit 07f9812
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 55 deletions.
46 changes: 26 additions & 20 deletions source/observable/traverse-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe("traverse", () => {
const expected = m.cold("----|");

const factory = createFactory(-1, 1, m.time("--|"), m.scheduler);
const traversed = traverse(factory, notifier);
const traversed = traverse({ factory, notifier });
m.expect(traversed).toBeObservable(expected);
m.expect(notifier).toHaveSubscriptions(notifierSubs);
}));
Expand All @@ -56,7 +56,7 @@ describe("traverse", () => {
const expected = m.cold("0-");

const factory = createFactory();
const traversed = traverse(factory, notifier);
const traversed = traverse({ factory, notifier });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -66,7 +66,7 @@ describe("traverse", () => {
const expected = m.cold("0-1----2--3--");

const factory = createFactory();
const traversed = traverse(factory, notifier);
const traversed = traverse({ factory, notifier });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -76,7 +76,7 @@ describe("traverse", () => {
const expected = m.cold("(01)-(12)----(23)--(34)--");

const factory = createFactory(Infinity, 2);
const traversed = traverse(factory, notifier);
const traversed = traverse({ factory, notifier });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -86,7 +86,7 @@ describe("traverse", () => {
const expected = m.cold("----0---1---2--");

const factory = createFactory(Infinity, 1, m.time("----|"), m.scheduler);
const traversed = traverse(factory, notifier);
const traversed = traverse({ factory, notifier });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -95,7 +95,7 @@ describe("traverse", () => {
const expected = m.cold("----0---1---2---|");

const factory = createFactory(2, 1, m.time("----|"), m.scheduler);
const traversed = traverse(factory);
const traversed = traverse({ factory });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -110,7 +110,10 @@ describe("traverse", () => {
const expected = m.cold("----0---1---2---|");

const factory = createFactory(2, 1, m.time("----|"), m.scheduler);
const traversed = traverse(factory, source => concat(source, other));
const traversed = traverse({
factory,
operator: source => concat(source, other)
});
m.expect(traversed).toBeObservable(expected);
m.expect(other).toHaveSubscriptions(subs);
}));
Expand All @@ -126,7 +129,10 @@ describe("traverse", () => {
const expected = m.cold("0---1---2---|");

const factory = createFactory(2);
const traversed = traverse(factory, source => concat(source, other));
const traversed = traverse({
factory,
operator: source => concat(source, other)
});
m.expect(traversed).toBeObservable(expected);
m.expect(other).toHaveSubscriptions(subs);
}));
Expand Down Expand Up @@ -166,7 +172,7 @@ describe("traverse", () => {
}
};

const traversed = traverse(factory);
const traversed = traverse({ factory });
m.expect(traversed).toBeObservable(expected);
m.expect(w).toHaveSubscriptions(wSubs);
m.expect(x).toHaveSubscriptions(xSubs);
Expand Down Expand Up @@ -209,7 +215,7 @@ describe("traverse", () => {
const expected = m.cold("(abc)-(de)--(f|)");

const factory = createFactory();
const traversed = traverse(factory, notifier);
const traversed = traverse({ factory, notifier });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -219,7 +225,7 @@ describe("traverse", () => {
const expected = m.cold("------(abc)-(de)--(f|)");

const factory = createFactory(m.time("------|"), m.scheduler);
const traversed = traverse(factory, notifier);
const traversed = traverse({ factory, notifier });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -228,7 +234,7 @@ describe("traverse", () => {
const expected = m.cold("------(abc)-(de)--(f|)");

const factory = createFactory(m.time("------|"), m.scheduler);
const traversed = traverse(factory);
const traversed = traverse({ factory });
m.expect(traversed).toBeObservable(expected);
}));

Expand All @@ -237,7 +243,7 @@ describe("traverse", () => {
const expected = m.cold("------(abc)-(def|)");

const factory = createFactory(m.time("------|"), m.scheduler);
const traversed = traverse(factory, Infinity);
const traversed = traverse({ factory, concurrency: Infinity });
m.expect(traversed).toBeObservable(expected);
}));
});
Expand Down Expand Up @@ -281,15 +287,15 @@ describe("traverse", () => {
it("should traverse the pages", (callback: any) => {

const notifier = new Subject<any>();
const urls = traverse(
(marker?: string) => get(marker || "https://api.github.com/users/cartant/repos").pipe(
const urls = traverse({
factory: (marker?: string) => get(marker || "https://api.github.com/users/cartant/repos").pipe(
map(response => ({
markers: response.next ? [response.next] : [],
values: response.content
}))
),
notifier
).pipe(
}).pipe(
map(repo => repo.html_url)
);

Expand All @@ -316,17 +322,17 @@ describe("traverse", () => {

it("should traverse the pages", (callback: any) => {

const urls = traverse(
(marker?: string) => get(marker || "https://api.github.com/users/cartant/repos").pipe(
const urls = traverse({
factory: (marker?: string) => get(marker || "https://api.github.com/users/cartant/repos").pipe(
map(response => ({
markers: response.next ? [response.next] : [],
values: response.content
}))
),
repos => repos.pipe(
operator: repos => repos.pipe(
map(repo => repo.html_url)
)
);
});

const received: string[] = [];
urls.subscribe(
Expand Down
63 changes: 28 additions & 35 deletions source/observable/traverse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,56 @@ import {

import { expand, ignoreElements, mergeMap, tap } from "rxjs/operators";
import { NotificationQueue } from "./NotificationQueue";
import { isObservable } from "../util";

export type TraverseElement<T, M> = { markers: ObservableInput<M>, values: ObservableInput<T> };
export type TraverseFactory<T, M> = (marker: M | undefined, index: number) => Observable<TraverseElement<T, M>>;

export function traverse<T, M>(
export function traverse<T, M>(options: {
concurrency?: number,
factory: TraverseFactory<T, M>,
notifier: Observable<any>,
concurrency?: number
): Observable<T>;
notifier: Observable<any>
}): Observable<T>;

export function traverse<T, M, R>(
export function traverse<T, M, R>(options: {
concurrency?: number,
factory: TraverseFactory<T, M>,
operator: OperatorFunction<T, R>,
concurrency?: number
): Observable<R>;
operator: OperatorFunction<T, R>
}): Observable<R>;

export function traverse<T, M>(
factory: TraverseFactory<T, M>,
concurrency?: number
): Observable<T>;
export function traverse<T, M>(options: {
concurrency?: number,
factory: TraverseFactory<T, M>
}): Observable<T>;

// https://github.com/palantir/tslint/issues/3906

export function traverse<T, M, R>(
export function traverse<T, M, R>({
concurrency: optionalConcurrency, // tslint:disable-line:no-use-before-declare
factory,
operator: optionalOperator, // tslint:disable-line:no-use-before-declare
notifier: optionalNotifier // tslint:disable-line:no-use-before-declare
}: {
concurrency?: number,
factory: TraverseFactory<T, M>,
optionalNotifierOrOperatorOrConcurrency?: Observable<any> | OperatorFunction<T, R> | number,
optionalConcurrency?: number
): Observable<T | R> {
operator?: OperatorFunction<T, T | R>,
notifier?: Observable<any>
}): Observable<T | R> {
return new Observable<T | R>(observer => {

let concurrency: number;
let operator: OperatorFunction<T, T | R>;
const concurrency = (optionalConcurrency !== undefined) ? optionalConcurrency : 1;
const operator = optionalOperator || identity;
let queue: NotificationQueue;
let queueOperator: MonoTypeOperatorFunction<M | undefined>;

if (isObservable(optionalNotifierOrOperatorOrConcurrency)) {
operator = identity;
queue = new NotificationQueue(optionalNotifierOrOperatorOrConcurrency);
if (optionalNotifier) {
queue = new NotificationQueue(optionalNotifier);
queueOperator = identity;
} else {
const subject = new Subject<any>();
if (typeof optionalNotifierOrOperatorOrConcurrency === "function") {
operator = optionalNotifierOrOperatorOrConcurrency;
} else {
operator = identity;
}
queue = new NotificationQueue(subject);
queueOperator = markers => { subject.next(); return markers; };
}

if (typeof optionalConcurrency === "number") {
concurrency = optionalConcurrency;
} else if (typeof optionalNotifierOrOperatorOrConcurrency === "number") {
concurrency = optionalNotifierOrOperatorOrConcurrency;
} else {
concurrency = 1;
}

const destination = new Subject<T | R>();
const subscription = destination.subscribe(observer);
subscription.add(queue.connect());
Expand Down

0 comments on commit 07f9812

Please sign in to comment.