Skip to content

Commit

Permalink
fix(superset-flow): prerequest of superset is alway previous result
Browse files Browse the repository at this point in the history
  • Loading branch information
ElonH committed May 29, 2020
1 parent 62c7a0e commit 40b0a90
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
20 changes: 9 additions & 11 deletions src/app/@dataflow/core/bare-flow.ts
@@ -1,13 +1,5 @@
import { Observable, of } from 'rxjs';
import {
switchMap,
take,
tap,
startWith,
distinctUntilChanged,
skipWhile,
shareReplay,
} from 'rxjs/operators';
import { switchMap, take, distinctUntilChanged, shareReplay } from 'rxjs/operators';

export interface FlowInNode {}
export interface FlowOutNode {}
Expand All @@ -18,7 +10,7 @@ export abstract class BareFlow<Tin extends FlowInNode, Tout extends FlowOutNode>
protected abstract request(pre: CombErr<Tin>): Observable<CombErr<Tout>>;
private bareData$: Observable<CombErr<Tout>>;
private deployed = false;
public deploy() {
protected deployBefore() {
this.bareData$ = this.prerequest$.pipe(
switchMap(
(pre): Observable<CombErr<Tout>> => {
Expand All @@ -30,9 +22,15 @@ export abstract class BareFlow<Tin extends FlowInNode, Tout extends FlowOutNode>
distinctUntilChanged(),
shareReplay()
);
this.bareData$.pipe(take(1)).subscribe();
this.deployed = true;
}
protected deployAfter() {
this.bareData$.pipe(take(1)).subscribe();
}
public deploy() {
this.deployBefore();
this.deployAfter();
}
public getOutput(): Observable<CombErr<Tout>> {
if (!this.deployed) throw new Error('run deploy before getOutput');
return this.bareData$;
Expand Down
26 changes: 17 additions & 9 deletions src/app/@dataflow/core/superset-flow.ts
Expand Up @@ -9,11 +9,21 @@ export abstract class SupersetFlow<
Tout extends FlowOutNode,
Tsup extends FlowSupNode = Tin & Tout
> extends BareFlow<Tin, Tout> {
private boostrapPrerequest$: Observable<CombErr<Tin>>;
public deploy() {
super.deploy();
this.boostrapPrerequest$ = this.prerequest$.pipe(distinctUntilChanged(), shareReplay());
this.boostrapPrerequest$.pipe(take(1)).subscribe();
private supersetData$: Observable<CombErr<Tsup>>;
private supersetDeployed = false;
protected deployBefore() {
this.prerequest$ = this.prerequest$.pipe(distinctUntilChanged(), shareReplay());
super.deployBefore();
this.supersetData$ = this.getOutput().pipe(
withLatestFrom(this.prerequest$),
map(([cur, pre]) => this.generateSuperset(cur, pre)),
distinctUntilChanged(),
shareReplay()
);
this.supersetDeployed = true;
}
protected deployAfter() {
this.supersetData$.pipe(take(1)).subscribe();
}
protected generateSuperset(current: CombErr<Tout>, previous: CombErr<Tin>): CombErr<Tsup> {
return [
Expand All @@ -22,9 +32,7 @@ export abstract class SupersetFlow<
];
}
public getSupersetOutput(): Observable<CombErr<Tsup>> {
return this.getOutput().pipe(
withLatestFrom(this.boostrapPrerequest$),
map(([cur, pre]) => this.generateSuperset(cur, pre))
);
if (!this.supersetDeployed) throw new Error('run deploy before getSupersetOutput');
return this.supersetData$;
}
}

0 comments on commit 40b0a90

Please sign in to comment.