From be0220407339715d1ec66ddf3aa8b5d36093ae86 Mon Sep 17 00:00:00 2001 From: Blaine Bublitz Date: Sun, 9 Oct 2022 16:17:52 -0700 Subject: [PATCH] feat: Provide an addSource API on the OrderedReadable stream --- README.md | 6 ++- index.js | 9 +++++ test/main.js | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e5f7473..3c0be44 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,11 @@ readable.on('data', function (data) { ### `ordered(streams, [options])` -Takes an array of Readable streams and produces a single Readable stream that will consume the provided streams in strict order. The produced Readable stream respects backpressure on itself and any provided streams. +Takes an array of `Readable` streams and produces a single `OrderedReadable` stream that will consume the provided streams in strict order. The produced `Readable` stream respects backpressure on itself and any provided streams. + +#### `orderedReadable.addSource(stream)` + +The returned `Readable` stream has an `addSource` instance function that takes appends a `Readable` stream to the list of source streams that the `OrderedReadable` is reading from. ## License diff --git a/index.js b/index.js index 912f210..9c6177d 100644 --- a/index.js +++ b/index.js @@ -126,6 +126,15 @@ function OrderedStreams(streams, options) { cb(); } + function addSource(stream) { + assertReadableStream(stream); + var idx = streams.push(stream); + setup(stream, idx); + activeStream = streams[streamIdx]; + } + + readable.addSource = addSource; + return readable; } diff --git a/test/main.js b/test/main.js index a01a096..bd9be50 100644 --- a/test/main.js +++ b/test/main.js @@ -408,6 +408,108 @@ function suite(moduleName) { s2.destroy(); }); + + describe('addSource', function () { + it('can add a stream to an empty readable before reading', function (done) { + var streams = new OrderedStreams(); + + streams.addSource( + stream.Readable.from([ + { value: 'data1' }, + { value: 'data2' }, + { value: 'data3' }, + ]) + ); + + function assert(results) { + expect(results.length).toEqual(3); + } + + stream.pipeline([streams, concat(assert)], done); + }); + + it('can add a stream at the end of the readable', function (done) { + var s = stream.Readable.from([ + { value: 'data1' }, + { value: 'data2' }, + { value: 'data3' }, + ]); + + var streams = new OrderedStreams(s); + + streams.addSource( + stream.Readable.from([ + { value: 'data4' }, + { value: 'data5' }, + { value: 'data6' }, + ]) + ); + + function assert(results) { + expect(results.length).toEqual(6); + expect(results[0]).toEqual({ value: 'data1' }); + expect(results[1]).toEqual({ value: 'data2' }); + expect(results[2]).toEqual({ value: 'data3' }); + expect(results[3]).toEqual({ value: 'data4' }); + expect(results[4]).toEqual({ value: 'data5' }); + expect(results[5]).toEqual({ value: 'data6' }); + } + + stream.pipeline([streams, concat(assert)], done); + }); + + it('can add a stream while the readable is already flowing', function (done) { + var data = [ + { value: 'data1' }, + { value: 'data2' }, + { value: 'data3' }, + { value: 'data4' }, + { value: 'data5' }, + { value: 'data6' }, + ]; + var s = new stream.Readable({ + objectMode: true, + read: function (cb) { + if (data.length > 1) { + this.push(data.shift()); + } else { + streams.addSource(stream.Readable.from(data)); + this.push(null); + } + if (typeof cb === 'function') { + cb(null); + } + }, + }); + + var streams = new OrderedStreams(s); + + function assert(results) { + expect(results.length).toEqual(6); + expect(results[0]).toEqual({ value: 'data1' }); + expect(results[1]).toEqual({ value: 'data2' }); + expect(results[2]).toEqual({ value: 'data3' }); + expect(results[3]).toEqual({ value: 'data4' }); + expect(results[4]).toEqual({ value: 'data5' }); + expect(results[5]).toEqual({ value: 'data6' }); + } + + stream.pipeline([streams, concat(assert)], done); + }); + + it('throws an error if stream is not readable', function (done) { + var streams = new OrderedStreams(); + + function withWritable() { + var writable = new stream.Writable({ write: function () {} }); + streams.addSource(writable); + } + + expect(withWritable).toThrow('All input streams must be readable'); + + done(); + }); + }); }); }