/
MergeShape.ts
178 lines (162 loc) · 6.22 KB
/
MergeShape.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import { DataFrame } from '../../data';
import { ProcessingNode, ProcessingNodeOptions } from '../ProcessingNode';
import { TimeUnit } from '../../utils';
import { TimeService } from '../../service';
import { PushOptions } from '../../graph/options';
import { PushError } from '../../graph/events';
/**
* Merge data frames from two or more sources
* using a certain merge key (e.g. source uid, parent uid, node uid).
* @category Flow shape
*/
export abstract class MergeShape<InOut extends DataFrame> extends ProcessingNode<InOut, InOut> {
private _queue: Map<any, QueuedMerge<InOut>> = new Map();
private _timeout: number;
private _timer: NodeJS.Timeout;
private _mergeKeyFn: (frame: InOut, options?: PushOptions) => any;
private _groupFn: (frame: InOut, options?: PushOptions) => any;
protected options: MergeShapeOptions;
constructor(
mergeFn: (frame: InOut, options?: PushOptions) => any,
groupFn: (frame: InOut, options?: PushOptions) => any,
options?: MergeShapeOptions,
) {
super(options);
this._mergeKeyFn = mergeFn;
this._groupFn = groupFn;
// Merge timeout
this.options.timeout = this.options.timeout || 100;
this.options.timeoutUnit = this.options.timeoutUnit || TimeUnit.MILLISECOND;
this._timeout = this.options.timeoutUnit.convert(this.options.timeout, TimeService.getUnit());
this.once('build', this._start.bind(this));
this.once('destroy', this._stop.bind(this));
}
/**
* Start the timeout timer
* @returns {Promise<void>} Timer promise
*/
private _start(): Promise<void> {
return new Promise((resolve) => {
this.options.minCount = this.options.minCount || this.inlets.length;
this.options.maxCount = this.options.maxCount || this.inlets.length;
const interval =
this.options.checkInterval || TimeService.getUnit().convert(this._timeout, TimeUnit.MILLISECOND);
if (this._timeout > 0) {
this._timer = setInterval(this._timerTick.bind(this), interval);
}
resolve();
});
}
private _timerTick(): void {
this._queue.forEach((queue) => {
this._purgeQueue(queue);
});
}
private _purgeQueue(queue: QueuedMerge<InOut>): QueuedMerge<InOut> {
const currentTime = TimeService.now();
if (
queue !== undefined &&
this._queue.has(queue.key) &&
currentTime - queue.timestamp >= this._timeout &&
queue.frames.size >= this.options.minCount
) {
const frames = Array.from(queue.frames.values());
try {
// Merge node
this.outlets.forEach((outlet) => outlet.push(this.merge(frames, queue.key as string)));
this._queue.delete(queue.key);
// Resolve pending promises
queue.promises.forEach((fn) => {
fn(undefined);
});
} catch (ex) {
this.emit('error', new PushError(frames[0].uid, this.uid, ex));
}
return undefined;
} else {
return queue;
}
}
private _stop(): void {
if (this._timer !== undefined) {
clearInterval(this._timer);
}
}
public process(frame: InOut, options?: PushOptions): Promise<InOut> {
return new Promise<InOut>((resolve) => {
if (this.options.maxCount === 1) {
return resolve(frame);
}
// Merge key(s)
const merge = this._mergeKeyFn(frame, options);
if (merge === undefined) {
return resolve(undefined);
}
(Array.isArray(merge) ? merge : [merge]).forEach((key) => {
let queue = this._purgeQueue(this._queue.get(key));
if (queue === undefined) {
// Create a new queued data frame based on the key
queue = new QueuedMerge(key);
queue.promises.push(resolve);
// Group the frames by the grouping function
queue.frames.set(this._groupFn(frame, options), frame);
this._queue.set(key, queue);
} else {
const groupKey = this._groupFn(frame, options);
if (queue.frames.has(groupKey)) {
// Merge frames
queue.frames.set(groupKey, this.merge([queue.frames.get(groupKey), frame]));
} else {
queue.frames.set(groupKey, frame);
}
// Check if there are enough frames
if (queue.frames.size >= this.options.maxCount) {
this._queue.delete(key);
const mergedFrame = this.merge(Array.from(queue.frames.values()), key);
resolve(mergedFrame);
queue.promises.forEach((fn) => {
fn(undefined);
});
} else {
queue.promises.push(resolve);
}
}
});
});
}
/**
* Merge the data frames
* @param {DataFrame[]} frames Data frames to merge
* @param {string} [key=undefined] Key to merge on
* @returns {Promise<DataFrame>} Promise of merged data frame
*/
public abstract merge(frames: InOut[], key?: string): InOut;
}
/**
* Queued merge
*/
class QueuedMerge<InOut extends DataFrame> {
public key: any;
public frames: Map<any, InOut> = new Map();
public promises: Array<(value: InOut) => void> = [];
public timestamp: number;
constructor(key: any) {
this.key = key;
this.timestamp = TimeService.now();
}
}
export interface MergeShapeOptions extends ProcessingNodeOptions {
timeout?: number;
timeoutUnit?: TimeUnit;
/**
* Check interval for timeout
* @default timeout Same as timeout
*/
checkInterval?: number;
minCount?: number;
/**
* Maximum number of frames to merge
* @default inlets.length Based on the amount of inlets
*/
maxCount?: number;
}