Skip to content

Commit

Permalink
feat: Provide addSource function on the OrderedReadable stream (#28)
Browse files Browse the repository at this point in the history
feat: Ensure readable does not end until read from
chore: Swap order of a negated if statement
  • Loading branch information
phated committed Oct 10, 2022
1 parent edb9aa1 commit 87c871c
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 9 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ readable.on('data', function (data) {

### `ordered(streams, [options])`

Takes an array of Readable streams and produces a single Readable stream that will consume the provided streams in strict order. The produced Readable stream respects backpressure on itself and any provided streams.
Takes an array of `Readable` streams and produces a single `OrderedReadable` stream that will consume the provided streams in strict order. The produced `Readable` stream respects backpressure on itself and any provided streams.

#### `orderedReadable.addSource(stream)`

The returned `Readable` stream has an `addSource` instance function that takes appends a `Readable` stream to the list of source streams that the `OrderedReadable` is reading from.

## License

Expand Down
25 changes: 17 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ function OrderedStreams(streams, options) {
var streamIdx = 0;
var activeStream = streams[streamIdx];

if (!activeStream) {
readable.push(null);
}

var destroyedIdx = -1;
var destroyedByError = false;
var readableClosed = false;
Expand Down Expand Up @@ -78,10 +74,10 @@ function OrderedStreams(streams, options) {
streamIdx++;
activeStream = streams[streamIdx];
cleanup();
if (!activeStream) {
readable.push(null);
} else {
if (activeStream) {
activeStream.resume();
} else {
readable.push(null);
}
}

Expand Down Expand Up @@ -122,10 +118,23 @@ function OrderedStreams(streams, options) {
}

function read(cb) {
activeStream.resume();
if (activeStream) {
activeStream.resume();
} else {
readable.push(null);
}
cb();
}

function addSource(stream) {
assertReadableStream(stream);
var idx = streams.push(stream);
setup(stream, idx);
activeStream = streams[streamIdx];
}

readable.addSource = addSource;

return readable;
}

Expand Down
120 changes: 120 additions & 0 deletions test/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ function suite(moduleName) {
stream.pipeline([streams, concat()], done);
});

it('does not end until it is read if no streams are given', function (done) {
var streams = new OrderedStreams();

var ended = false;

streams.on('end', function () {
ended = true;
});
setTimeout(function () {
expect(ended).toEqual(false);

stream.pipeline([streams, concat()], function (err) {
expect(ended).toEqual(true);
done(err);
});
}, 250);
});

it('throws an error if stream is not readable', function (done) {
var writable = new stream.Writable({ write: function () {} });

Expand Down Expand Up @@ -390,6 +408,108 @@ function suite(moduleName) {

s2.destroy();
});

describe('addSource', function () {
it('can add a stream to an empty readable before reading', function (done) {
var streams = new OrderedStreams();

streams.addSource(
stream.Readable.from([
{ value: 'data1' },
{ value: 'data2' },
{ value: 'data3' },
])
);

function assert(results) {
expect(results.length).toEqual(3);
}

stream.pipeline([streams, concat(assert)], done);
});

it('can add a stream at the end of the readable', function (done) {
var s = stream.Readable.from([
{ value: 'data1' },
{ value: 'data2' },
{ value: 'data3' },
]);

var streams = new OrderedStreams(s);

streams.addSource(
stream.Readable.from([
{ value: 'data4' },
{ value: 'data5' },
{ value: 'data6' },
])
);

function assert(results) {
expect(results.length).toEqual(6);
expect(results[0]).toEqual({ value: 'data1' });
expect(results[1]).toEqual({ value: 'data2' });
expect(results[2]).toEqual({ value: 'data3' });
expect(results[3]).toEqual({ value: 'data4' });
expect(results[4]).toEqual({ value: 'data5' });
expect(results[5]).toEqual({ value: 'data6' });
}

stream.pipeline([streams, concat(assert)], done);
});

it('can add a stream while the readable is already flowing', function (done) {
var data = [
{ value: 'data1' },
{ value: 'data2' },
{ value: 'data3' },
{ value: 'data4' },
{ value: 'data5' },
{ value: 'data6' },
];
var s = new stream.Readable({
objectMode: true,
read: function (cb) {
if (data.length > 1) {
this.push(data.shift());
} else {
streams.addSource(stream.Readable.from(data));
this.push(null);
}
if (typeof cb === 'function') {
cb(null);
}
},
});

var streams = new OrderedStreams(s);

function assert(results) {
expect(results.length).toEqual(6);
expect(results[0]).toEqual({ value: 'data1' });
expect(results[1]).toEqual({ value: 'data2' });
expect(results[2]).toEqual({ value: 'data3' });
expect(results[3]).toEqual({ value: 'data4' });
expect(results[4]).toEqual({ value: 'data5' });
expect(results[5]).toEqual({ value: 'data6' });
}

stream.pipeline([streams, concat(assert)], done);
});

it('throws an error if stream is not readable', function (done) {
var streams = new OrderedStreams();

function withWritable() {
var writable = new stream.Writable({ write: function () {} });
streams.addSource(writable);
}

expect(withWritable).toThrow('All input streams must be readable');

done();
});
});
});
}

Expand Down

0 comments on commit 87c871c

Please sign in to comment.