This repository has been archived by the owner on Mar 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
117 lines (87 loc) · 2.58 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
var ProxyAgent = require('proxy-agent');
var Request = require('request-stream');
var Stream = require('stream');
var Util = require('util');
module.exports = createStream;
function createStream(url, options) {
if (!options) options = {};
var proxyUri = process.env.HTTP_PROXY || process.env.http_proxy;
var agent = proxyUri
? new ProxyAgent(proxyUri)
: undefined;
var stream = new LogStream(options);
var req = Request.get(url, {
agent,
headers: {
'Accept': 'text/event-stream',
},
},onResponse);
req.once('close', function () {
stream.emit('error', new Error('Connection closed'));
stream.destroy();
});
req.once('socket', function () {
stream.emit('open');
});
return stream;
function onResponse(err, res) {
if (err) {
stream.emit('error', err);
return;
}
res.pipe(stream);
}
}
function LogStream(options) {
if (!options) options = {};
this._buffer;
this._event;
Stream.Transform.call(this, { readableObjectMode: true });
}
Util.inherits(LogStream, Stream.Transform);
LogStream.prototype._flush = function (cb) {
if (this._event) this.push(this._event);
return cb();
};
LogStream.prototype._transform = function(chunk, encoding, cb) {
if (!Buffer.isBuffer(chunk)) {
chunk = new Buffer(chunk);
}
if (this._buffer) {
chunk = Buffer.concat([this._buffer, chunk]);
}
var ptr = 0;
var start = 0;
for (ptr = 0; ptr < chunk.length; ptr++) {
if (chunk[ptr] === 10) {
var line = chunk.slice(start, ptr).toString('utf8');
var matches = line.match(/^([^:]*):(.*)$/);
if (matches) {
if (!this._event) {
this._event = { event: 'data', };
}
if (matches[1]) {
this._event[matches[1]] = matches[2];
}
} else {
console.error('Line failed to match', line);
}
if (ptr < chunk.length - 1 && chunk[ptr + 1] === 10) {
if (this._event.event === 'data') {
try {
this.push(JSON.parse(this._event.data));
} catch (__) { }
}
this._event = null;
ptr++;
}
start = ptr + 1;
}
}
this._buffer = chunk.slice(start);
return cb();
};
LogStream.prototype.destroy = function () {
this.emit('close');
this.unpipe();
};