/
FilterProcessingNode.ts
68 lines (61 loc) · 2.53 KB
/
FilterProcessingNode.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import { DataFrame, DataObject } from '../../../data';
import { TimeService } from '../../../service/TimeService';
import { ObjectProcessingNode, ObjectProcessingNodeOptions } from '../../ObjectProcessingNode';
/**
* @category Processing node
*/
export abstract class FilterProcessingNode<InOut extends DataFrame> extends ObjectProcessingNode<InOut> {
protected options: FilterProcessingOptions;
constructor(options?: FilterProcessingOptions) {
super(options);
}
processObject(object: DataObject, frame: InOut): Promise<DataObject> {
return new Promise((resolve, reject) => {
// Get existing filter data
this.getNodeData(object)
.then(async (nodeData) => {
if (nodeData === undefined) {
nodeData = {
timestamp: TimeService.now(),
...(await this.initFilter(object, frame, this.options)),
};
} else if (nodeData['timestamp']) {
if (nodeData.timestamp + this.options.expire < TimeService.now()) {
nodeData = {
timestamp: TimeService.now(),
...(await this.initFilter(object, frame, this.options)),
};
}
} else {
nodeData = {
timestamp: TimeService.now(),
...nodeData,
};
}
this.filter(object, frame, nodeData, this.options)
.then((result) => {
resolve(result);
})
.catch(reject)
.finally(() => {
this.setNodeData(object, nodeData)
.then(() => {
resolve(undefined);
})
.catch(reject);
});
})
.catch(reject);
});
}
abstract initFilter(object: DataObject, frame: InOut, options?: FilterProcessingOptions): Promise<any>;
abstract filter(
object: DataObject,
frame: InOut,
filter: any,
options?: FilterProcessingOptions,
): Promise<DataObject>;
}
export interface FilterProcessingOptions extends ObjectProcessingNodeOptions {
expire?: number;
}