/
SinkNode.ts
125 lines (114 loc) · 4.29 KB
/
SinkNode.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import { DataFrame } from '../data/DataFrame';
import { DataObject } from '../data';
import { SerializableObject } from '../data/decorators';
import { v4 as uuidv4 } from 'uuid';
import { Node, NodeOptions } from '../Node';
import { PushOptions } from '../graph/options';
import { DataObjectService } from '../service';
/**
* Sink node
*
* ## Usage
*
* ### Creating a SinkNode
* When creating a sink node, you have to implement an ```onPush``` method that provides you with the pushed data frame.
* Sink nodes are the final nodes in the model and have no outlets. Once the onPush is resolved, data objects in that frame
* are stored in a [[DataObjectService]].
* ```typescript
* import { DataFrame, SinkNode } from '@openhps/core';
*
* export class CustomSink<In extends DataFrame> extends SinkNode<In> {
* // ...
* public onPush(data: In, options?: GraphOptions): Promise<void> {
* return new Promise<void>((resolve, reject) => {
*
* });
* }
* }
* ```
*
* @category Sink node
*/
@SerializableObject()
export abstract class SinkNode<In extends DataFrame = DataFrame> extends Node<In, In> {
protected declare options: SinkNodeOptions;
constructor(options?: SinkNodeOptions) {
super(options);
this.options.completedEvent = this.options['completedEvent'] === undefined ? true : this.options.completedEvent;
this.options.persistence = this.options['persistence'] === undefined ? true : this.options.persistence;
}
public push(data: In | In[], options?: PushOptions): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (data === null || data === undefined) {
return reject();
}
// Push the frame to the sink node
this.onPush(data, options)
.then(() => {
const persistPromise: Array<Promise<void>> = [];
if (data instanceof Array) {
data.forEach((f: In) => {
if (this.options.persistence) {
persistPromise.push(this.persistDataObject(f));
}
});
} else {
if (this.options.persistence) {
persistPromise.push(this.persistDataObject(data));
}
}
return Promise.all(persistPromise);
})
.then(() => {
resolve();
// Fire a completed event
if (this.options.completedEvent) {
if (data instanceof Array) {
data.forEach((f: In) => {
this.emit('completed', {
frameUID: f.uid,
});
});
} else {
this.emit('completed', {
frameUID: data.uid,
});
}
}
})
.catch(reject);
});
}
protected persistDataObject(frame: In): Promise<void> {
return new Promise<void>((resolve, reject) => {
const servicePromises: Array<Promise<DataObject>> = [];
const objects: DataObject[] = frame.getObjects();
for (const object of objects) {
if (object.uid === null) {
object.uid = uuidv4();
}
// Queue the storage of the object in a data service
const service = this.model.findDataService(object) as DataObjectService<DataObject>;
servicePromises.push(service.insert(object.uid, object));
}
Promise.all(servicePromises)
.then(() => resolve())
.catch(reject);
});
}
public abstract onPush(frame: In | In[], options?: PushOptions): Promise<void>;
}
export interface SinkNodeOptions extends NodeOptions {
/**
* Store objects in data services
*
* @default true
*/
persistence?: boolean;
/**
* Emit a completed event for this sink
*
* @default true
*/
completedEvent?: boolean;
}