/
streamer.js
322 lines (300 loc) · 9.16 KB
/
streamer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
/* vim:set ts=2 sw=2 sts=2 expandtab */
/*jshint asi: true undef: true es5: true node: true devel: true
forin: false latedef: false */
/*global define: true */
(typeof define !== "function" ? function($){ $(require, exports, module); } : define)(function(require, exports, module, undefined) {
'use strict';
/**
* Creates stream of given elements.
* @examples
* list('a', 2, {})(console.log)
*/
function list() {
var elements = Array.prototype.slice.call(arguments, 0)
return function stream(next, stop) {
elements.forEach(next)
if (stop) stop()
}
}
exports.list = list
/*
* Creates empty stream. This is equivalent of `list()`.
*/
exports.empty = function empty() {
return list()
}
/**
* Returns stream of mapped values.
* @param {Function} input
* source stream to be mapped
* @param {Function} map
* function that maps each value
* @examples
* var stream = list({ name: 'foo' }, { name: 'bar' })
* var names = map(stream, function(value) { return value.name })
* names(console.log)
* // 'foo'
* // 'bar'
* var numbers = list(1, 2, 3)
* var mapped = map(numbers, function onEach(number) { return number * 2 })
* mapped(console.log)
* // 2
* // 4
* // 6
*/
function map(source, mapper) {
return function stream(next, stop) {
source(function onElement(element) {
next(mapper(element))
}, stop)
}
}
exports.map = map
/**
* Returns stream of filtered values.
* @param {Function} source
* source stream to be filtered
* @param {Function} filter
* @examples
* var numbers = list(10, 23, 2, 7, 17)
* var digits = filter(numbers, function(value) {
* return value >= 0 && value <= 9
* })
* digits(console.log)
* // 2
* // 7
*/
function filter(source, filterer) {
return function stream(next, stop) {
source(function onElement(element) {
if (filterer(element)) next(element)
}, stop)
}
}
exports.filter = filter
/**
* Returns stream of reduced values
* @param {Function} source
* stream to reduce.
* @param {Function} reducer
* reducer function
* @param initial
* initial value
* @examples
* var numbers = list(2, 3, 8)
* var sum = reduce(numbers, function onElement(previous, current) {
* return (previous || 0) + current
* })
* sum(console.log)
* // 13
*/
function reduce(source, reducer, initial) {
return function stream(next, stop) {
var value = initial
source(function onElement(element) {
value = reducer(value, element)
}, function onStop(error) {
if (error) return stop(error)
next(value)
if (stop) stop()
})
}
}
exports.reduce = reduce
/**
* This function returns stream of tuples, where the n-th tuple contains the
* n-th element from each of the argument streams. The returned stream is
* truncated in length to the length of the shortest argument stream.
* @params {Function}
* source steams to be combined
* @examples
* var a = list([ 'a', 'b', 'c' ])
* var b = list([ 1, 2, 3, 4 ])
* var c = list([ '!', '@', '#', '$', '%' ])
* var abc = zip(a, b, c)
* abs(console.log)
* // [ 'a', 1, '!' ]
* // [ 'b', 2, '@' ]
* // [ 'c', 3, '#' ]
*/
var zip = exports.zip = (function Zip() {
// Returns weather array is empty or not.
function isEmpty(array) { return !array.length }
// Utility function that check if each array in given array of arrays
// has at least one element (in which case we do have a tuple).
function hasTuple(array) { return !array.some(isEmpty) }
// Utility function that creates tuple by shifting element from each
// array of arrays.
function shiftTuple(array) {
var index = array.length, tuple = []
while (0 <= --index) tuple.unshift(array[index].shift())
return tuple
}
return function zip() {
var sources = Array.prototype.slice.call(arguments)
return function stream(next, stop) {
var buffers = [], id, reason, isStopped = false, shortest
function onElement(id, element) {
// If resulting stream is already stopped (we are in truncate mode) or
// if this stream is stopped (we deal with badly implemented stream that
// yields value after it's stopped) we ignore element.
if (isStopped) return null
// Otherwise we buffer an element.
buffers[id].push(element)
// If tuple is ready we yield it.
if (hasTuple(buffers)) next(shiftTuple(buffers))
}
function onStop(id, error) {
// If shortest stream was already stopped then we are in truncate mode
// which means we ignore all the following stream stops.
if (isStopped) return null
// If stream being stopped is the first one to be stopped or if it's
// shorter then the shortest one stopped, we update stop reason and
// shortest stopped stream reference.
if (!shortest || shortest.length > buffers[id].length) {
shortest = buffers[id]
reason = error
}
// If shortest stream has no buffered elements, we stop resulting stream
// & do some clean up.
if (!shortest.length) {
// Marking stream as stopped.
isStopped = true
// Stopping a stream.
stop(reason)
// Setting all closure captured elements to `null` so that gc can
// collect them.
buffers = shortest = null
}
}
// Initializing buffers.
id = sources.length
while (0 <= --id) buffers.push([])
// Start reading streams.
id = sources.length
while (0 <= --id)
sources[id](onElement.bind(null, id), onStop.bind(null, id))
}
}
})()
/**
* Returns a stream containing only first `number` of elements of the given
* `source` stream or all elements, if `source` stream has less than `number`
* of elements. If `number` is not passed it defaults to `1`.
* @param {Function} source
* source stream
* @param {Number} number=1
* number of elements to take from stream
*/
function head(source, number) {
return function stream(next, stop) {
var left = number || 1
source(function onElement(element) {
if (left-- <= 0) return null
next(element)
if (left <= 0 && stop) stop()
}, function onStop(error) {
if (left <= 0) return null
number = 0
stop(error)
})
}
}
exports.head = head
/**
* Returns a stream equivalent to given `source` stream, except that the first
* `number` of elements are omitted. If `source` stream has less than `number`
* of elements, then empty stream is returned. `number` defaults to `1` if it's
* not passed.
* @param {Function} source
* source stream to return tail of.
* @param {Number} number=1
* Number of elements that will be omitted.
*/
function tail(source, number) {
return function stream(next, stop) {
var left = number || 1
source(function onElement(element) {
if (left-- > 0) return null
next(element)
}, stop)
}
}
exports.tail = tail
}
/**
* Returns a stream that contains all elements of each stream in the order they
* appear in the original streams. If any of the `source` streams is stopped
* with an error than it propagates to the resulting stream and it also get's
* stopped.
* @examples
* var stream = join(list(1, 2), list('a', 'b'))
* stream(console.log)
* // 1
* // 2
* // 'a'
* // 'b'
*/
function append() {
var streams = Array.prototype.slice.call(arguments, 0)
return function stream(next, stop) {
var source, sources = streams.slice(0)
function onStop(error) {
if (error) return stop && stop(error)
if ((source = sources.shift())) source(next, onStop)
else return stop && stop()
}
onStop()
}
}
exports.append = append
/**
* Returns a stream that contains all elements of each stream of the given
* source stream. `source` is stream of streams whose elements will be contained
* by the resulting stream. Any error from any stream will propagate to the
* resulting stream. Stream is stopped when all streams from `source` and source
* itself is ended. Elements of the stream are position in order they are
* delivered so it could happen that elements from second stream will appear
* before or between elements of the first stream.
* @param {Function} source
* Stream of streams whose elements will be contained by resulting stream
* @examples
* function async(next, stop) {
* setTimeout(function() {
* next('async')
* stop()
* }, 10)
* }
* var stream = merge(list(async, list(1, 2, 3)))
* stream(console.log)
* // 1
* // 2
* // 3
* // 'async'
*/
exports.merge = function merge(source) {
return function stream(next, stop) {
var open = 1
function onStop(error) {
if (!open) return false
if (error) open = 0
else open --
if (!open) stop(error)
}
source(function onStream(stream) {
open ++
stream(function onNext(value) { if (open) next(value) }, onStop)
}, onStop)
}
}
/**
* Utility function to print streams.
*/
exports.print = function print(stream) {
console.log('>>')
stream(console.log.bind(console), function onStop(error) {
if (error) console.error(error)
else console.log('<<')
})
}
})