/
FrameChunkNode.ts
76 lines (66 loc) · 2.17 KB
/
FrameChunkNode.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import { DataFrame } from '../../data';
import { Node, NodeOptions } from '../../Node';
import { TimeUnit } from '../../utils';
/**
* @category Flow shape
*/
export class FrameChunkNode<InOut extends DataFrame> extends Node<InOut, InOut> {
private _count: number;
private _queue: InOut[] = [];
private _interval: number;
private _timer: NodeJS.Timeout;
constructor(count: number, timeout?: number, timeoutUnit = TimeUnit.MILLISECOND, options?: NodeOptions) {
super(options);
this._count = count;
if (timeout) {
this._interval = timeoutUnit.convert(timeout, TimeUnit.MILLISECOND);
this.once('build', this._start.bind(this));
this.once('destroy', this._stop.bind(this));
}
this.on('push', this._onPush.bind(this));
}
private _onPush(frame: InOut): Promise<void> {
return new Promise<void>((resolve, reject) => {
this._queue.push(frame);
if (this._queue.length >= this._count) {
this._flushQueue().then(resolve).catch(reject);
} else {
resolve();
}
});
}
private _flushQueue(): Promise<void> {
return new Promise((resolve) => {
// Restart the timeout
if (this._timer !== undefined) {
clearInterval(this._timer);
this._timer = setInterval(this._timeoutFn.bind(this), this._interval);
}
this.outlets.forEach((outlet) => outlet.push(this._queue));
this._queue = [];
resolve();
});
}
private _timeoutFn(): void {
if (this._queue.length > 0) {
Promise.resolve(this._flushQueue());
}
}
/**
* Start the timeout timer
*
* @returns {Promise<void>} Start promise
*/
private _start(): Promise<void> {
return new Promise((resolve) => {
this._timer = setInterval(this._timeoutFn.bind(this), this._interval);
resolve();
});
}
private _stop(): void {
if (this._timer !== undefined) {
clearInterval(this._timer);
this._timer = undefined;
}
}
}