-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
121 lines (100 loc) · 2.84 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
var Readable = require('readable-stream'),
inherits = require('inherits');
var debug = require('debug')('next-stream');
module.exports = Next;
inherits(Next, Readable);
function Next(streams, opts) {
if(!(this instanceof Next)) return new Next(streams, opts);
Readable.call(this);
opts = opts || {};
this._open = (typeof opts.open === 'undefined') ? true : opts.open;
this._current = null;
this._next = [].concat(streams);
// propagate errors in all streams.
var self = this;
this._onInnerError = function(e) { self.emit('error', e); }
this._next.forEach(this._propagateErrors.bind(this));
this._shift();
}
Next.prototype.push = function(stream) {
debug('PUSH')
this._next.push(stream);
if(!this._current)
this._shift();
}
Next.prototype.close = function() {
this._open = false;
if(!this._current) {
this._push(null);
}
}
Next.prototype.open = function() {
// TODO: check if already ended.
this._open = true;
}
Next.prototype._push = Readable.prototype.push;
Next.prototype._read = function(n) {
var self = this;
var current = this._current;
var count = 0;
if(isReadableStream(current)) {
var data;
while((data = current.read()) !== null) {
this._push(data);
debug('_read', data.toString());
count++;
}
if(count === 0) current.once('readable', this._read.bind(this, n));
}
else if(current && current.length > 0) {
debug('_read non-stream', current)
var data;
while(data = current.shift()) {
this._push(data);
}
}
else if(current && current.length === 0) {
debug('_read non-stream end');
this._shift();
}
}
Next.prototype._shift = function() {
var self = this;
debug('SHIFT', this._next.length);
if(!(this._current = this._next.shift())) {
debug('_next empty, ending stream.')
if(!this._open) this._push(null);
return;
}
// apply the thunk
if(typeof this._current === 'function') {
this._current = this._current();
this._propagateErrors(this._current);
}
if(isReadableStream(this._current)) {
debug('(stream)');
this._current.once('end', function() { debug('end stream'); self._shift(); });
this._read();
}
else {
debug('(non-stream)');
var nonStreams = [this._current];
while(this._next[0] && !isReadableStream(this._next[0]))
nonStreams.push(this._next.shift());
this._current = nonStreams;
this._read();
}
}
Next.prototype._propagateErrors = function(stream) {
if(!isReadableStream(stream)) return;
var self = this;
stream.on('error', this._onInnerError);
stream.once('end', function() { stream.removeListener('error', self._onInnerError) });
}
// duck!
function isReadableStream(s) {
return s && (typeof s.once === 'function')
&& (typeof s.on === 'function')
&& (typeof s.removeListener === 'function')
&& (typeof s.read === 'function');
}