/
linestream.js
82 lines (67 loc) · 1.87 KB
/
linestream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
module.exports = function(RED) {
const stream = require('node:stream')
/*
* From https://discourse.nodered.org/t/streaming-nodes/84422/37
*/
class ByLine extends stream.Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
let data = chunk.toString();
if (this._lastLineData) {
data = this._lastLineData + data;
}
let lines = data.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g);
this._lastLineData = lines.splice(lines.length - 1, 1)[0];
lines.forEach(this.push.bind(this));
callback();
}
_flush(done) {
if (this._lastLineData) {
this.push(this._lastLineData);
}
this._lastLineData = null;
done();
}
}
function CoreLineStreamFunctionality(config) {
RED.nodes.createNode(this,config);
var node = this;
var cfg = config;
node.createStream = (opts, msg, snd, dne, stNde) => {
return [
new ByLine({ objectMode: true }),
stream.Transform({
objectMode: true,
transform: function (entry, e, cb) {
let m = RED.util.cloneMessage(msg);
m.payload = entry
snd(m, false)
cb();
}
})
]
}
node.on('close', function() {
node.status({});
});
/* msg handler, in this case pass the message on unchanged */
node.on("input", function (msg, send, done) {
// Send a message and how to handle errors.
try {
(msg._streamPipeline || []).push({
id: node.id,
})
send(msg);
done();
} catch (err) {
// use done if the node won't send anymore messages for the
// message it received.
msg.error = err
done(err.message, msg)
}
});
}
RED.nodes.registerType("LineStream", CoreLineStreamFunctionality);
}