Skip to content

Commit ba4b026

Browse files
committed
feat(combineLatestHigherOrder): Add factory.
1 parent 1c8cb3b commit ba4b026

File tree

2 files changed

+198
-0
lines changed

2 files changed

+198
-0
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* @license Use of this source code is governed by an MIT-style license that
3+
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
4+
*/
5+
/*tslint:disable:no-unused-expression*/
6+
7+
import { marbles } from "rxjs-marbles/mocha";
8+
import { combineLatestHigherOrder } from "./combineLatestHigherOrder";
9+
10+
describe("combineLatestHigherOrder", () => {
11+
12+
it("should combine cold observables", marbles(m => {
13+
14+
const a = m.cold( "a--");
15+
const b = m.cold( "b--");
16+
const c = m.cold( "c--");
17+
const d = m.cold( "d--");
18+
const h = m.cold( "ijk", { i: [a, b], j: [a, c], k: [d, c] });
19+
const as = "^-!";
20+
const bs = "^!-";
21+
const cs = "-^-";
22+
const ds = "--^";
23+
const expected = m.cold( "xyz", { x: ["a", "b"], y: ["a", "c"], z: ["d", "c"] });
24+
25+
const combined = h.pipe(combineLatestHigherOrder());
26+
m.expect(combined).toBeObservable(expected);
27+
m.expect(a).toHaveSubscriptions(as);
28+
m.expect(b).toHaveSubscriptions(bs);
29+
m.expect(c).toHaveSubscriptions(cs);
30+
m.expect(d).toHaveSubscriptions(ds);
31+
}));
32+
33+
it("should combine hot observables", marbles(m => {
34+
35+
const a = m.hot( "--a------");
36+
const b = m.hot( "----b----");
37+
const c = m.hot( "------c--");
38+
const d = m.hot( "--------d");
39+
const h = m.hot( "i---j--k-", { i: [a, b], j: [a, c], k: [d, c] });
40+
const as = "^------!-";
41+
const bs = "^---!----";
42+
const cs = "----^----";
43+
const ds = "-------^-";
44+
const expected = m.cold( "----x-y-z", { x: ["a", "b"], y: ["a", "c"], z: ["d", "c"] });
45+
46+
const combined = h.pipe(combineLatestHigherOrder());
47+
m.expect(combined).toBeObservable(expected);
48+
m.expect(a).toHaveSubscriptions(as);
49+
m.expect(b).toHaveSubscriptions(bs);
50+
m.expect(c).toHaveSubscriptions(cs);
51+
m.expect(d).toHaveSubscriptions(ds);
52+
}));
53+
54+
it("should forward error notifications", marbles(m => {
55+
56+
const a = m.hot( "--a------");
57+
const b = m.hot( "----b----");
58+
const c = m.hot( "------#--");
59+
const d = m.hot( "--------d");
60+
const h = m.hot( "i---j--k-", { i: [a, b], j: [a, c], k: [d, c] });
61+
const as = "^-----!--";
62+
const bs = "^---!----";
63+
const cs = "----^-!--";
64+
const expected = m.cold( "----x-#--", { x: ["a", "b"] });
65+
66+
const combined = h.pipe(combineLatestHigherOrder());
67+
m.expect(combined).toBeObservable(expected);
68+
m.expect(a).toHaveSubscriptions(as);
69+
m.expect(b).toHaveSubscriptions(bs);
70+
m.expect(c).toHaveSubscriptions(cs);
71+
}));
72+
73+
it("should not emit initial empty sources", marbles(m => {
74+
75+
const a = m.hot( "---------");
76+
const b = m.hot( "---------");
77+
const c = m.hot( "------c--");
78+
const d = m.hot( "--------d");
79+
const h = m.hot( "i---j--k-", { i: [a, b], j: [a, c], k: [d, c] });
80+
const as = "^------!-";
81+
const bs = "^---!----";
82+
const cs = "----^----";
83+
const ds = "-------^-";
84+
const expected = m.cold( "--------z", { z: ["d", "c"] });
85+
86+
const combined = h.pipe(combineLatestHigherOrder());
87+
m.expect(combined).toBeObservable(expected);
88+
m.expect(a).toHaveSubscriptions(as);
89+
m.expect(b).toHaveSubscriptions(bs);
90+
m.expect(c).toHaveSubscriptions(cs);
91+
m.expect(d).toHaveSubscriptions(ds);
92+
}));
93+
94+
it("should not emit later empty sources", marbles(m => {
95+
96+
const a = m.hot( "--a------");
97+
const b = m.hot( "----b----");
98+
const c = m.hot( "---------");
99+
const d = m.hot( "---------");
100+
const h = m.hot( "i---j--k-", { i: [a, b], j: [a, c], k: [d, c] });
101+
const as = "^------!-";
102+
const bs = "^---!----";
103+
const cs = "----^----";
104+
const ds = "-------^-";
105+
const expected = m.cold( "----x----", { x: ["a", "b"] });
106+
107+
const combined = h.pipe(combineLatestHigherOrder());
108+
m.expect(combined).toBeObservable(expected);
109+
m.expect(a).toHaveSubscriptions(as);
110+
m.expect(b).toHaveSubscriptions(bs);
111+
m.expect(c).toHaveSubscriptions(cs);
112+
m.expect(d).toHaveSubscriptions(ds);
113+
}));
114+
115+
it("should support duplicate sources", marbles(m => {
116+
117+
const a = m.hot( "-a----a----");
118+
const h = m.hot( "i----------", { i: [a, a] });
119+
const as = [ "^----------",
120+
"^----------"];
121+
const expected = m.cold( "-x----(xx)-", { x: ["a", "a"] });
122+
123+
const combined = h.pipe(combineLatestHigherOrder());
124+
m.expect(combined).toBeObservable(expected);
125+
m.expect(a).toHaveSubscriptions(as);
126+
}));
127+
});
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* @license Use of this source code is governed by an MIT-style license that
3+
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
4+
*/
5+
6+
import { Observable, OperatorFunction, Subscription } from "rxjs";
7+
8+
interface Source<T> {
9+
completed: boolean;
10+
nexted: boolean;
11+
observable: Observable<T>;
12+
subscription?: Subscription;
13+
value?: T;
14+
}
15+
16+
export function combineLatestHigherOrder<T>(): OperatorFunction<Observable<T>[], T[]> {
17+
return higherOrderSource => new Observable<T[]>(observer => {
18+
let lasts: Source<T>[] = [];
19+
let higherOrderComplete = false;
20+
const higherOrderSubscription = new Subscription();
21+
higherOrderSubscription.add(higherOrderSource.subscribe(
22+
observables => {
23+
const subscribes: (() => void)[] = [];
24+
const nexts = observables.map(observable => {
25+
const index = lasts.findIndex(l => l.observable === observable);
26+
if (index !== -1) {
27+
const next = lasts[index];
28+
lasts.splice(index, 1);
29+
return next;
30+
}
31+
const next: Source<T> = { completed: false, nexted: false, observable };
32+
subscribes.push(() => {
33+
next.subscription = next.observable.subscribe(
34+
value => {
35+
next.nexted = true;
36+
next.value = value;
37+
if (nexts.every(({ nexted }) => nexted)) {
38+
observer.next(nexts.map(({ value }) => value) as T[]);
39+
}
40+
},
41+
error => observer.error(error),
42+
() => {
43+
next.completed = true;
44+
if (higherOrderComplete && nexts.every(({ completed }) => completed)) {
45+
observer.complete();
46+
}
47+
}
48+
);
49+
higherOrderSubscription.add(next.subscription);
50+
});
51+
return next;
52+
});
53+
lasts.forEach(({ subscription }) => {
54+
if (subscription) {
55+
subscription.unsubscribe();
56+
}
57+
});
58+
lasts = nexts;
59+
subscribes.forEach(subscribe => subscribe());
60+
},
61+
error => observer.error(error),
62+
() => {
63+
if (lasts.every(({ completed }) => completed)) {
64+
observer.complete();
65+
}
66+
higherOrderComplete = true;
67+
}
68+
));
69+
return higherOrderSubscription;
70+
});
71+
}

0 commit comments

Comments
 (0)