Skip to content

Commit

Permalink
feat: Provide an addSource API on the OrderedReadable stream
Browse files Browse the repository at this point in the history
  • Loading branch information
phated committed Oct 9, 2022
1 parent ceec805 commit be02204
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -73,7 +73,11 @@ readable.on('data', function (data) {

### `ordered(streams, [options])`

Takes an array of Readable streams and produces a single Readable stream that will consume the provided streams in strict order. The produced Readable stream respects backpressure on itself and any provided streams.
Takes an array of `Readable` streams and produces a single `OrderedReadable` stream that will consume the provided streams in strict order. The produced `Readable` stream respects backpressure on itself and any provided streams.

#### `orderedReadable.addSource(stream)`

The returned `Readable` stream has an `addSource` instance function that takes appends a `Readable` stream to the list of source streams that the `OrderedReadable` is reading from.

## License

Expand Down
9 changes: 9 additions & 0 deletions index.js
Expand Up @@ -126,6 +126,15 @@ function OrderedStreams(streams, options) {
cb();
}

function addSource(stream) {
assertReadableStream(stream);
var idx = streams.push(stream);
setup(stream, idx);
activeStream = streams[streamIdx];
}

readable.addSource = addSource;

return readable;
}

Expand Down
102 changes: 102 additions & 0 deletions test/main.js
Expand Up @@ -408,6 +408,108 @@ function suite(moduleName) {

s2.destroy();
});

describe('addSource', function () {
it('can add a stream to an empty readable before reading', function (done) {
var streams = new OrderedStreams();

streams.addSource(
stream.Readable.from([
{ value: 'data1' },
{ value: 'data2' },
{ value: 'data3' },
])
);

function assert(results) {
expect(results.length).toEqual(3);
}

stream.pipeline([streams, concat(assert)], done);
});

it('can add a stream at the end of the readable', function (done) {
var s = stream.Readable.from([
{ value: 'data1' },
{ value: 'data2' },
{ value: 'data3' },
]);

var streams = new OrderedStreams(s);

streams.addSource(
stream.Readable.from([
{ value: 'data4' },
{ value: 'data5' },
{ value: 'data6' },
])
);

function assert(results) {
expect(results.length).toEqual(6);
expect(results[0]).toEqual({ value: 'data1' });
expect(results[1]).toEqual({ value: 'data2' });
expect(results[2]).toEqual({ value: 'data3' });
expect(results[3]).toEqual({ value: 'data4' });
expect(results[4]).toEqual({ value: 'data5' });
expect(results[5]).toEqual({ value: 'data6' });
}

stream.pipeline([streams, concat(assert)], done);
});

it('can add a stream while the readable is already flowing', function (done) {
var data = [
{ value: 'data1' },
{ value: 'data2' },
{ value: 'data3' },
{ value: 'data4' },
{ value: 'data5' },
{ value: 'data6' },
];
var s = new stream.Readable({
objectMode: true,
read: function (cb) {
if (data.length > 1) {
this.push(data.shift());
} else {
streams.addSource(stream.Readable.from(data));
this.push(null);
}
if (typeof cb === 'function') {
cb(null);
}
},
});

var streams = new OrderedStreams(s);

function assert(results) {
expect(results.length).toEqual(6);
expect(results[0]).toEqual({ value: 'data1' });
expect(results[1]).toEqual({ value: 'data2' });
expect(results[2]).toEqual({ value: 'data3' });
expect(results[3]).toEqual({ value: 'data4' });
expect(results[4]).toEqual({ value: 'data5' });
expect(results[5]).toEqual({ value: 'data6' });
}

stream.pipeline([streams, concat(assert)], done);
});

it('throws an error if stream is not readable', function (done) {
var streams = new OrderedStreams();

function withWritable() {
var writable = new stream.Writable({ write: function () {} });
streams.addSource(writable);
}

expect(withWritable).toThrow('All input streams must be readable');

done();
});
});
});
}

Expand Down

0 comments on commit be02204

Please sign in to comment.