Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
phated committed Oct 1, 2022
1 parent f566a9f commit 4f7375b
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions index.js
Expand Up @@ -16,6 +16,12 @@ function isReadable(stream) {
return true;
}

function assertReadableStream(stream) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
}
}

function OrderedStreams(streams, options) {
streams = streams || [];

Expand All @@ -25,6 +31,8 @@ function OrderedStreams(streams, options) {

streams = Array.prototype.concat.apply([], streams);

streams.forEach(assertReadableStream);

options = Object.assign({}, options, {
read: read,
predestroy: predestroy,
Expand All @@ -43,11 +51,9 @@ function OrderedStreams(streams, options) {
var destroyedByError = false;
var readableClosed = false;

streams.forEach(function (stream, idx) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
}
streams.forEach(setup);

function setup(stream, idx) {
stream.on('data', onData);
stream.once('error', onError);
stream.once('end', onEnd);
Expand Down Expand Up @@ -85,23 +91,25 @@ function OrderedStreams(streams, options) {
cleanup();
readable.destroy();
}
});
}

function predestroy() {
streams.forEach(function (stream, idx) {
if (destroyedIdx === idx) {
return;
}
streams.forEach(destroyStream);
}

if (destroyedByError) {
return stream.destroy();
}
if (readableClosed) {
return stream.destroy();
}
function destroyStream(stream, idx) {
if (destroyedIdx === idx) {
return;
}

if (destroyedByError) {
return stream.destroy();
}
if (readableClosed) {
return stream.destroy();
}

stream.destroy(new Error('Wrapper destroyed'));
});
stream.destroy(new Error('Wrapper destroyed'));
}

function onData(chunk) {
Expand Down

0 comments on commit 4f7375b

Please sign in to comment.