diff --git a/index.js b/index.js index 260beb8..89e51f2 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,12 @@ function isReadable(stream) { return true; } +function assertReadableStream(stream) { + if (!isReadable(stream)) { + throw new Error('All input streams must be readable'); + } +} + function OrderedStreams(streams, options) { streams = streams || []; @@ -25,6 +31,8 @@ function OrderedStreams(streams, options) { streams = Array.prototype.concat.apply([], streams); + streams.forEach(assertReadableStream); + options = Object.assign({}, options, { read: read, predestroy: predestroy, @@ -43,11 +51,9 @@ function OrderedStreams(streams, options) { var destroyedByError = false; var readableClosed = false; - streams.forEach(function (stream, idx) { - if (!isReadable(stream)) { - throw new Error('All input streams must be readable'); - } + streams.forEach(setup); + function setup(stream, idx) { stream.on('data', onData); stream.once('error', onError); stream.once('end', onEnd); @@ -85,23 +91,25 @@ function OrderedStreams(streams, options) { cleanup(); readable.destroy(); } - }); + } function predestroy() { - streams.forEach(function (stream, idx) { - if (destroyedIdx === idx) { - return; - } + streams.forEach(destroyStream); + } - if (destroyedByError) { - return stream.destroy(); - } - if (readableClosed) { - return stream.destroy(); - } + function destroyStream(stream, idx) { + if (destroyedIdx === idx) { + return; + } + + if (destroyedByError) { + return stream.destroy(); + } + if (readableClosed) { + return stream.destroy(); + } - stream.destroy(new Error('Wrapper destroyed')); - }); + stream.destroy(new Error('Wrapper destroyed')); } function onData(chunk) {