-
Notifications
You must be signed in to change notification settings - Fork 1
/
ProcessingNode.ts
133 lines (123 loc) · 4.8 KB
/
ProcessingNode.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import { DataFrame, DataObject } from '../data';
import { Node, NodeOptions } from '../Node';
import { NodeDataService, NodeData } from '../service';
import { PushOptions } from '../graph/options';
import { SerializableObject } from '../data/decorators';
/**
* Node that processes a dataframe or the contained objects.
*
* ## Usage
*
* ### Creating a ProcessingNode
* Processing nodes hide the push and pull functionalities from a regular node. When a push is received, this
* data frame is provided to the ```process()``` method that has to be implemented. When a pull is received, this pull is
* forwarded to all incoming nodes.
* ```typescript
* import { DataFrame, DataObject, ProcessingNode } from '@openhps/core';
*
* export class CustomProcessingNode<In extends DataFrame, Out extends DataFrame> extends ProcessingNode<In, Out> {
* // ...
* public process(data: In, options?: GraphOptions): Promise<Out> {
* return new Promise<Out>((resolve, reject) => {
* // ... process/manipulate the data frame
* data.addObject(new DataObject("custom_process_object"));
* resolve(data);
* });
* }
* }
* ```
*
* @category Processing node
*/
@SerializableObject()
export abstract class ProcessingNode<In extends DataFrame = DataFrame, Out extends DataFrame = DataFrame> extends Node<
In,
Out
> {
protected declare options: ProcessingNodeOptions;
constructor(options?: ProcessingNodeOptions) {
super(options);
this.options.frameFilter = this.options.frameFilter || (() => true);
this.on('push', this._onPush.bind(this));
}
private _onPush(frame: In | In[], options?: PushOptions): Promise<void> {
return new Promise<void>((resolve, reject) => {
const processPromises: Array<Promise<Out>> = [];
if (Array.isArray(frame)) {
frame
.filter((frame) => this.options.frameFilter(frame))
.forEach((f) => {
processPromises.push(this.process(f, options));
});
frame
.filter((frame) => !this.options.frameFilter(frame))
.forEach((f) => {
processPromises.push(Promise.resolve(f as any));
});
} else if (this.options.frameFilter(frame)) {
processPromises.push(this.process(frame, options));
} else {
processPromises.push(Promise.resolve(frame as any));
}
Promise.all(processPromises)
.then((results) => {
const output = results.filter((res) => res !== undefined);
if (output.length > 0) {
this.outlets.forEach((outlet) =>
outlet.push(output.length === 1 ? output[0] : output, options),
);
}
resolve();
})
.catch((ex) => {
if (ex === undefined) {
this.logger('warn', `Exception thrown in processing node ${this.uid} but no exception given!`);
}
reject(ex);
});
});
}
protected findNodeDataService(): NodeDataService<NodeData> {
return this.model.findDataService(NodeData);
}
/**
* Get node data
*
* @param {DataObject} dataObject Data object to get node data from
* @param {any} [defaultData] Default data
* @returns {Promise<any>} Promise with node data
*/
protected getNodeData<T = any>(dataObject: DataObject, defaultData: T = undefined): Promise<any> {
return new Promise((resolve, reject) => {
this.findNodeDataService()
.findData(this.uid, dataObject)
.then((data) => {
if (!data) {
resolve(defaultData);
} else {
resolve(data);
}
})
.catch(reject);
});
}
/**
* Set node data
*
* @param {DataObject} dataObject Data object to store data for
* @param {any} data Data to store
* @returns {Promise<any>} Promise with stored node data
*/
protected setNodeData(dataObject: DataObject, data: any): Promise<NodeData> {
return new Promise((resolve, reject) => {
this.findNodeDataService().insertData(this.uid, dataObject, data).then(resolve).catch(reject);
});
}
public abstract process(frame: In, options?: PushOptions): Promise<Out>;
}
export interface ProcessingNodeOptions extends NodeOptions {
/**
* Frame filter to specify what frames are processed by this node
*/
frameFilter?: (frame: DataFrame) => boolean;
}