Skip to content

Commit

Permalink
feat(bare-flow): allow to dynamically link dataflow
Browse files Browse the repository at this point in the history
  • Loading branch information
ElonH committed May 15, 2020
1 parent 45c4f20 commit 7b8680b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
44 changes: 44 additions & 0 deletions src/app/@dataflow/core/bare-flow.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,50 @@ describe('BareFlow', () => {
})();
rst.deploy();

expectObservable(rst.getOutput()).toBe(expected, values);
});
});
it('prerequest twice(same value), but got once only', () => {
scheduler.run((helpers) => {
const { cold, hot, expectObservable, expectSubscriptions, flush } = helpers;
const values: { [id: string]: DataFlowNode } = {
a: [{ a: 555 }, []],
b: [{ b: 123 }, []],
};
const pre = cold('a--a-', values);
const expected = 'b----';

const rst = new (class extends BareFlow {
public prerequest$ = pre;
protected request(pre: DataFlowNode): Observable<DataFlowNode> {
return of(values.b);
}
})();
rst.deploy();

expectObservable(rst.getOutput()).toBe(expected, values);
});
});
it('prerequest twice(different value), got twice', () => {
scheduler.run((helpers) => {
const { cold, hot, expectObservable, expectSubscriptions, flush } = helpers;
const values: { [id: string]: DataFlowNode } = {
a: [{ ab: 555 }, []],
b: [{ ab: 123 }, []],
c: [{ ab: 556 }, []],
d: [{ ab: 124 }, []],
};
const pre = cold('a--b-', values);
const expected = 'c--d-';

const rst = new (class extends BareFlow {
public prerequest$ = pre;
protected request(pre: DataFlowNode): Observable<DataFlowNode> {
return of([{ ab: pre[0]['ab'] + 1 }, []]);
}
})();
rst.deploy();

expectObservable(rst.getOutput()).toBe(expected, values);
});
});
Expand Down
13 changes: 10 additions & 3 deletions src/app/@dataflow/core/bare-flow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Observable, of } from 'rxjs';
import { switchMap, take } from 'rxjs/operators';
import { switchMap, take, tap, startWith, distinctUntilChanged, skip } from 'rxjs/operators';

export type DataFlowNode = [object, Error[]];

Expand All @@ -8,19 +8,26 @@ export abstract class BareFlow {
protected abstract request(pre: DataFlowNode): Observable<DataFlowNode>;
private bareData$: Observable<DataFlowNode>;
private deployed = false;
private boostrapData: DataFlowNode;
public deploy() {
this.bareData$ = this.prerequest$.pipe(
switchMap(
(pre): Observable<DataFlowNode> => {
if (pre[1].length === 0) return this.request(pre).pipe(take(1));
return of(pre);
}
)
),
tap((x) => (this.boostrapData = x))
);
this.bareData$.pipe(take(1)).subscribe();
this.deployed = true;
}
public getOutput(): Observable<DataFlowNode> {
if (!this.deployed) throw new Error('run deploy before getOutput');
return this.bareData$;
return this.bareData$.pipe(
startWith(this.boostrapData),
distinctUntilChanged(),
skip(1) // don't why need it , otherwise, test failure. refs: https://stackoverflow.com/a/52157317
);
}
}

0 comments on commit 7b8680b

Please sign in to comment.