Skip to content

Commit

Permalink
feat(combine...Object): Add operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Aug 18, 2018
1 parent c63811b commit 78199bc
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 0 deletions.
127 changes: 127 additions & 0 deletions source/observable/combineLatestHigherOrderObject-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/
/*tslint:disable:no-unused-expression*/

import { marbles } from "rxjs-marbles/mocha";
import { combineLatestHigherOrderObject } from "./combineLatestHigherOrderObject";

describe("combineLatestHigherOrderObject", () => {

it("should combine cold observables", marbles(m => {

const a = m.cold( "a--");
const b = m.cold( "b--");
const c = m.cold( "c--");
const d = m.cold( "d--");
const h = m.cold( "ijk", { i: { a, b }, j: { a, c }, k: { d, c } });
const as = "^-!";
const bs = "^!-";
const cs = "-^-";
const ds = "--^";
const expected = m.cold( "xyz", { x: { a: "a", b: "b" }, y: { a: "a", c: "c" }, z: { d: "d", c: "c" } });

const combined = h.pipe(combineLatestHigherOrderObject());
m.expect(combined).toBeObservable(expected);
m.expect(a).toHaveSubscriptions(as);
m.expect(b).toHaveSubscriptions(bs);
m.expect(c).toHaveSubscriptions(cs);
m.expect(d).toHaveSubscriptions(ds);
}));

it("should combine hot observables", marbles(m => {

const a = m.hot( "--a------");
const b = m.hot( "----b----");
const c = m.hot( "------c--");
const d = m.hot( "--------d");
const h = m.hot( "i---j--k-", { i: { a, b }, j: { a, c }, k: { d, c } });
const as = "^------!-";
const bs = "^---!----";
const cs = "----^----";
const ds = "-------^-";
const expected = m.cold( "----x-y-z", { x: { a: "a", b: "b" }, y: { a: "a", c: "c" }, z: { d: "d", c: "c" } });

const combined = h.pipe(combineLatestHigherOrderObject());
m.expect(combined).toBeObservable(expected);
m.expect(a).toHaveSubscriptions(as);
m.expect(b).toHaveSubscriptions(bs);
m.expect(c).toHaveSubscriptions(cs);
m.expect(d).toHaveSubscriptions(ds);
}));

it("should forward error notifications", marbles(m => {

const a = m.hot( "--a------");
const b = m.hot( "----b----");
const c = m.hot( "------#--");
const d = m.hot( "--------d");
const h = m.hot( "i---j--k-", { i: { a, b }, j: { a, c }, k: { d, c } });
const as = "^-----!--";
const bs = "^---!----";
const cs = "----^-!--";
const expected = m.cold( "----x-#--", { x: { a: "a", b: "b" } });

const combined = h.pipe(combineLatestHigherOrderObject());
m.expect(combined).toBeObservable(expected);
m.expect(a).toHaveSubscriptions(as);
m.expect(b).toHaveSubscriptions(bs);
m.expect(c).toHaveSubscriptions(cs);
}));

it("should not emit initial empty sources", marbles(m => {

const a = m.hot( "---------");
const b = m.hot( "---------");
const c = m.hot( "------c--");
const d = m.hot( "--------d");
const h = m.hot( "i---j--k-", { i: { a, b }, j: { a, c }, k: { d, c } });
const as = "^------!-";
const bs = "^---!----";
const cs = "----^----";
const ds = "-------^-";
const expected = m.cold( "--------z", { z: { c: "c", d: "d" } });

const combined = h.pipe(combineLatestHigherOrderObject());
m.expect(combined).toBeObservable(expected);
m.expect(a).toHaveSubscriptions(as);
m.expect(b).toHaveSubscriptions(bs);
m.expect(c).toHaveSubscriptions(cs);
m.expect(d).toHaveSubscriptions(ds);
}));

it("should not emit later empty sources", marbles(m => {

const a = m.hot( "--a------");
const b = m.hot( "----b----");
const c = m.hot( "---------");
const d = m.hot( "---------");
const h = m.hot( "i---j--k-", { i: { a, b }, j: { a, c }, k: { d, c } });
const as = "^------!-";
const bs = "^---!----";
const cs = "----^----";
const ds = "-------^-";
const expected = m.cold( "----x----", { x: { a: "a", b: "b" } });

const combined = h.pipe(combineLatestHigherOrderObject());
m.expect(combined).toBeObservable(expected);
m.expect(a).toHaveSubscriptions(as);
m.expect(b).toHaveSubscriptions(bs);
m.expect(c).toHaveSubscriptions(cs);
m.expect(d).toHaveSubscriptions(ds);
}));

it("should support duplicate sources", marbles(m => {

const a = m.hot( "-a----a----");
const h = m.hot( "i----------", { i: { p: a, q: a } });
const as = [ "^----------",
"^----------"];
const expected = m.cold( "-x----(xx)-", { x: { p: "a", q: "a" } });

const combined = h.pipe(combineLatestHigherOrderObject());
m.expect(combined).toBeObservable(expected);
m.expect(a).toHaveSubscriptions(as);
}));
});
76 changes: 76 additions & 0 deletions source/observable/combineLatestHigherOrderObject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/

import { Observable, OperatorFunction, Subscription } from "rxjs";

interface Source<T> {
completed: boolean;
key: string;
nexted: boolean;
observable: Observable<T>;
subscription?: Subscription;
value?: T;
}

export function combineLatestHigherOrderObject<T>(): OperatorFunction<Record<string, Observable<T>>, Record<string, T>> {
return higherOrder => new Observable<Record<string, T>>(observer => {
let lasts: Source<T>[] = [];
let higherOrderCompleted = false;
const higherOrderSubscription = new Subscription();
higherOrderSubscription.add(higherOrder.subscribe(
observables => {
const subscribes: (() => void)[] = [];
const nexts = Object.keys(observables).map(key => {
const observable = observables[key];
const index = lasts.findIndex(l => (l.observable === observable) && (l.key === key));
if (index !== -1) {
const next = lasts[index];
lasts.splice(index, 1);
return next;
}
const next: Source<T> = { completed: false, key, nexted: false, observable };
subscribes.push(() => {
if (higherOrderSubscription.closed) {
return;
}
next.subscription = next.observable.subscribe(
value => {
next.nexted = true;
next.value = value;
if (nexts.every(({ nexted }) => nexted)) {
observer.next(nexts.reduce((acc, { key, value }) => ({ ...acc, [key]: value }), {}));
}
},
error => observer.error(error),
() => {
next.completed = true;
if (higherOrderCompleted && nexts.every(({ completed }) => completed)) {
observer.complete();
}
}
);
higherOrderSubscription.add(next.subscription);
});
return next;
});
lasts.forEach(({ subscription }) => {
if (subscription) {
subscription.unsubscribe();
}
});
lasts = nexts;
subscribes.forEach(subscribe => subscribe());
},
error => observer.error(error),
() => {
if (lasts.every(({ completed }) => completed)) {
observer.complete();
}
higherOrderCompleted = true;
}
));
return higherOrderSubscription;
});
}
1 change: 1 addition & 0 deletions source/observable/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

export * from "./combineLatestArray";
export * from "./combineLatestHigherOrder";
export * from "./combineLatestHigherOrderObject";
export * from "./combineLatestObject";
export * from "./concatArray";
export * from "./forkJoinArray";
Expand Down

0 comments on commit 78199bc

Please sign in to comment.