Skip to content

Commit

Permalink
implement destroy logic - needs a spot check
Browse files Browse the repository at this point in the history
  • Loading branch information
phated committed Sep 7, 2022
1 parent b51c1d6 commit 7572a7f
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 3 deletions.
52 changes: 49 additions & 3 deletions index.js
Expand Up @@ -27,15 +27,61 @@ function OrderedStreams(streams, options) {

options = Object.assign({}, options, {
read: read,
predestroy: predestroy,
});

streams.forEach(function (stream) {
var readable = new Readable(options);

var streamIdx = 0;

var destroyedIdx = -1;
var destroyedByError = false;
var readableClosed = false;

streams.forEach(function (stream, idx) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
}

var readableEnded = false;

stream.once('error', onError);
stream.once('end', onEnd);
stream.once('close', onClose);

function onError() {
destroyedByError = true;
}

function onEnd() {
readableEnded = true;
}

function onClose() {
destroyedIdx = idx;
readableClosed = true;
if (!readableEnded) {
readable.destroy();
}
}
});

var streamIdx = 0;
function predestroy() {
streams.forEach(function (stream, idx) {
if (destroyedIdx === idx) {
return;
}

if (destroyedByError) {
return stream.destroy();
}
if (readableClosed) {
return stream.destroy();
}

stream.destroy(new Error('Wrapper destroyed'));
});
}

function read(cb) {
var self = this;
Expand Down Expand Up @@ -84,7 +130,7 @@ function OrderedStreams(streams, options) {
activeStream.resume();
}

return new Readable(options);
return readable;
}

module.exports = OrderedStreams;
76 changes: 76 additions & 0 deletions test/main.js
Expand Up @@ -312,6 +312,82 @@ function suite(moduleName) {
assertErr
);
});

it('destroys all readable streams if the wrapper is destroyed', function (done) {
var s1 = stream.Readable.from([{ value: 'stream 1' }]);
var s2 = stream.Readable.from([{ value: 'stream 2' }]);
var s3 = stream.Readable.from([{ value: 'stream 3' }]);

var streams = new OrderedStreams([s1, s2, s3]);

var errors = [];

s1.on('error', function (err) {
errors.push(err);
assertErr();
});
s2.on('error', function (err) {
errors.push(err);
assertErr();
});
s3.on('error', function (err) {
errors.push(err);
assertErr();
});

function assertErr() {
if (errors.length === 3) {
expect(errors[0].message).toEqual('Wrapper destroyed');
expect(errors[1].message).toEqual('Wrapper destroyed');
expect(errors[2].message).toEqual('Wrapper destroyed');
done();
}
}

streams.destroy();
});

it('destroys the wrapper and other streams if any readable stream is destroyed', function (done) {
var s1 = stream.Readable.from([{ value: 'stream 1' }]);
var s2 = stream.Readable.from([{ value: 'stream 2' }]);
var s3 = stream.Readable.from([{ value: 'stream 3' }]);

var streams = new OrderedStreams([s1, s2, s3]);

var closed = [];

s1.on('close', function () {
closed.push('s1');
assert();
});
s2.on('close', function () {
closed.push('s2');
assert();
});
s3.on('close', function () {
closed.push('s3');
assert();
});
streams.on('close', function () {
closed.push('wrapper');
});

function assert() {
if (closed.length === 4) {
// `destroy` called upon
expect(closed[0]).toEqual('s2');
// Wrapper destroyed first
expect(closed[1]).toEqual('wrapper');
// Then the other 2 streams get destroyed
expect(closed[2]).toEqual('s1');
expect(closed[3]).toEqual('s3');

done();
}
}

s2.destroy();
});
});
}

Expand Down

0 comments on commit 7572a7f

Please sign in to comment.