From 1d799522c4411bdc3017b3a58182ee92d9f268e4 Mon Sep 17 00:00:00 2001 From: Blaine Bublitz Date: Wed, 31 Aug 2022 16:40:11 -0700 Subject: [PATCH] feat!: Switch to streamx feat!: Avoid eagerly reading streams chore: Ensure project works with different streams chore: Update example and change description in README --- README.md | 96 +++++++------ index.js | 133 ++++++++++-------- package.json | 32 +++-- test/main.js | 391 +++++++++++++++++++++++++++++++++++++-------------- 4 files changed, 431 insertions(+), 221 deletions(-) diff --git a/README.md b/README.md index b22d33a..e5f7473 100644 --- a/README.md +++ b/README.md @@ -8,62 +8,72 @@ [![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url] -Combines array of streams into one read stream in strict order. - -## Overview - -`ordered-read-streams` handles all data/errors from input streams in parallel, but emits data/errors in strict order in which streams are passed to constructor. This is `objectMode = true` stream. +Combines array of streams into one Readable stream in strict order. ## Usage ```js -var through = require('through2'); -var Ordered = require('ordered-read-streams'); - -var s1 = through.obj(function (data, enc, next) { - var self = this; - setTimeout(function () { - self.push(data); - next(); - }, 200); +var { Readable } = require('streamx'); +var ordered = require('ordered-read-streams'); + +var s1 = new Readable({ + read: function (cb) { + var self = this; + if (self.called) { + self.push(null); + return cb(null); + } + setTimeout(function () { + self.called = true; + self.push('stream 1'); + cb(null); + }, 200); + }, }); -var s2 = through.obj(function (data, enc, next) { - var self = this; - setTimeout(function () { - self.push(data); - next(); - }, 30); +var s2 = new Readable({ + read: function (cb) { + var self = this; + if (self.called) { + self.push(null); + return cb(null); + } + setTimeout(function () { + self.called = true; + self.push('stream 2'); + cb(null); + }, 30); + }, }); -var s3 = through.obj(function (data, enc, next) { - var self = this; - setTimeout(function () { - self.push(data); - next(); - }, 100); +var s3 = new Readable({ + read: function (cb) { + var self = this; + if (self.called) { + self.push(null); + return cb(null); + } + setTimeout(function () { + self.called = true; + self.push('stream 3'); + cb(null); + }, 100); + }, }); -var streams = new Ordered([s1, s2, s3]); -streams.on('data', function (data) { +var readable = ordered([s1, s2, s3]); +readable.on('data', function (data) { console.log(data); + // Logs: + // stream 1 + // stream 2 + // stream 3 }); - -s1.write('stream 1'); -s1.end(); - -s2.write('stream 2'); -s2.end(); - -s3.write('stream 3'); -s3.end(); ``` -Ouput will be: +## API -``` -stream 1 -stream 2 -stream 3 -``` +### `ordered(streams, [options])` + +Takes an array of Readable streams and produces a single Readable stream that will consume the provided streams in strict order. The produced Readable stream respects backpressure on itself and any provided streams. ## License diff --git a/index.js b/index.js index 79ac0f4..12c8f34 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,4 @@ -var Readable = require('readable-stream/readable'); -var util = require('util'); +var Readable = require('streamx').Readable; function isReadable(stream) { if (typeof stream.pipe !== 'function') { @@ -10,92 +9,106 @@ function isReadable(stream) { return false; } - if (typeof stream._read !== 'function') { - return false; - } - - if (!stream._readableState) { + if (typeof stream.read !== 'function') { return false; } return true; } -function addStream(streams, stream) { - if (!isReadable(stream)) { - throw new Error('All input streams must be readable'); +function OrderedStreams(streams, options) { + streams = streams || []; + + if (!Array.isArray(streams)) { + streams = [streams]; } - var self = this; + streams = Array.prototype.concat.apply([], streams); - stream._buffer = []; + options = Object.assign({}, options, { + read: read, + }); - stream.on('readable', function () { - var chunk = stream.read(); - while (chunk) { - if (this === streams[0]) { - self.push(chunk); - } else { - this._buffer.push(chunk); - } - chunk = stream.read(); + streams.forEach(function (stream) { + if (!isReadable(stream)) { + throw new Error('All input streams must be readable'); } }); - stream.on('end', function () { - for ( - var stream = streams[0]; - stream && stream._readableState.ended; - stream = streams[0] - ) { - while (stream._buffer.length) { - self.push(stream._buffer.shift()); - } + var streamIdx = 0; - streams.shift(); - } + function read(cb) { + var self = this; - if (!streams.length) { + var activeStream = streams[streamIdx]; + if (!activeStream) { self.push(null); + return cb(null); } - }); - stream.on('error', this.emit.bind(this, 'error')); + function cleanup() { + activeStream.off('readable', onRead); + activeStream.off('error', onError); + activeStream.off('end', onEnd); + } - streams.push(stream); -} + function onError(err) { + cleanup(); + cb(err); + } -function OrderedStreams(streams, options) { - if (!(this instanceof OrderedStreams)) { - return new OrderedStreams(streams, options); - } + function onEnd() { + // When a stream ends, we want to increment index of the stream we are reading from + streamIdx++; + // Then we want to cleanup handlers on the previously active stream + cleanup(); + // Finally we recursively call this function to read from the next stream + read.call(self, cb); + } - streams = streams || []; - options = options || {}; + function onRead(chunk) { + var drained = true; - options.objectMode = true; + // If the chunk is null, we don't want to cleanup because the + // `end` event might be emitted afterwards + if (chunk === null) { + return; + } - Readable.call(this, options); + while (chunk !== null && drained) { + drained = self.push(chunk); + // If our stream is not backpressured, we want to process another chunk + if (drained) { + chunk = activeStream.read(); + } + } - if (!Array.isArray(streams)) { - streams = [streams]; - } - if (!streams.length) { - return this.push(null); // no streams, close - } + cleanup(); + cb(null); + } + + activeStream.once('error', onError); + activeStream.once('end', onEnd); - var addStreamBinded = addStream.bind(this, []); + // Try reading the first chunk + var chunk = activeStream.read(); - streams.forEach(function (item) { - if (Array.isArray(item)) { - item.forEach(addStreamBinded); + // If we backpressured the OrderedReadStream, we'll have a chunk + // and don't need to wait for a `readable` event + if (chunk) { + onRead(chunk); } else { - addStreamBinded(item); + // If the first chunk is null we want to wait for `readable` to handle both the first + // access and a backpressured stream + activeStream.once('readable', function () { + // Once `readable`, we need to grab the first chunk before passing it to onRead + var chunk = activeStream.read(); + onRead(chunk); + }); } - }); -} -util.inherits(OrderedStreams, Readable); + } -OrderedStreams.prototype._read = function () {}; + return new Readable(options); +} module.exports = OrderedStreams; diff --git a/package.json b/package.json index 6845ef7..d0696eb 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,18 @@ "name": "ordered-read-streams", "version": "1.0.1", "description": "Combines array of streams into one read stream in strict order", + "author": "Gulp Team (https://gulpjs.com/)", + "contributors": [ + "Blaine Bublitz ", + "Artem Medeu " + ], + "repository": "gulpjs/ordered-read-streams", + "license": "MIT", + "engines": { + "node": ">= 10.13.0" + }, "files": [ + "LICENSE", "index.js" ], "scripts": { @@ -10,24 +21,17 @@ "pretest": "npm run lint", "test": "nyc mocha --async-only" }, - "repository": "gulpjs/ordered-read-streams", - "author": "Gulp Team (https://gulpjs.com/)", - "license": "MIT", - "engines": { - "node": ">= 10.13.0" - }, "dependencies": { - "readable-stream": "^3.6.0" + "streamx": "^2.12.5" }, "devDependencies": { "eslint": "^7.32.0", "eslint-config-gulp": "^5.0.1", "eslint-plugin-node": "^11.1.0", "expect": "^27.4.2", - "mississippi": "^4.0.0", "mocha": "^8.4.0", "nyc": "^15.1.0", - "through2": "^4.0.2" + "readable-stream": "^3.6.0" }, "nyc": { "reporter": [ @@ -37,5 +41,13 @@ }, "prettier": { "singleQuote": true - } + }, + "keywords": [ + "streams", + "ordered", + "group", + "combine", + "streamx", + "readable" + ] } diff --git a/test/main.js b/test/main.js index 41e1f31..4285e05 100644 --- a/test/main.js +++ b/test/main.js @@ -1,145 +1,320 @@ var expect = require('expect'); -var miss = require('mississippi'); - var OrderedStreams = require('..'); -var to = miss.to; -var from = miss.from; -var pipe = miss.pipe; -var concat = miss.concat; - -function fromOnce(fn) { - var called = false; - return from.obj(function (size, next) { - if (called) { - return next(null, null); - } - called = true; - fn.apply(this, arguments); - }); -} +function suite(moduleName) { + var stream = require(moduleName); -describe('ordered-read-streams', function () { - it('ends if no streams are given', function (done) { - var streams = new OrderedStreams(); + function fromOnce(fn) { + var called = false; + return new stream.Readable({ + objectMode: true, + read: function (cb) { + var self = this; + if (called) { + this.push(null); + if (typeof cb === 'function') { + cb(null); + } + return; + } + fn(function (err, chunk) { + called = true; + if (!err) { + self.push(chunk); + } - pipe([streams, concat()], done); - }); + if (typeof cb === 'function') { + cb(err); + } else { + if (err) { + return self.destroy(err); + } + } + }); + }, + }); + } - it('throws an error if stream is not readable', function (done) { - var writable = to(); + function concat(fn, timeout) { + var items = []; + return new stream.Writable({ + objectMode: true, + write: function (chunk, enc, cb) { + if (typeof enc === 'function') { + cb = enc; + } + setTimeout(function () { + items.push(chunk); + cb(); + }, timeout || 1); + }, + final: function (cb) { + if (typeof fn === 'function') { + fn(items); + } - function withWritable() { - new OrderedStreams(writable); - } + cb(); + }, + }); + } - expect(withWritable).toThrow('All input streams must be readable'); + describe('ordered-read-streams (' + moduleName + ')', function () { + it('ends if no streams are given', function (done) { + var streams = new OrderedStreams(); - done(); - }); + stream.pipeline([streams, concat()], done); + }); - it('emits data from all streams', function (done) { - var s1 = from.obj([{ value: 'stream 1' }]); - var s2 = from.obj([{ value: 'stream 2' }]); - var s3 = from.obj([{ value: 'stream 3' }]); + it('throws an error if stream is not readable', function (done) { + var writable = new stream.Writable({ write: function () {} }); - var streams = new OrderedStreams([s1, s2, s3]); + function withWritable() { + new OrderedStreams(writable); + } - function assert(results) { - expect(results.length).toEqual(3); - expect(results[0]).toEqual({ value: 'stream 1' }); - expect(results[1]).toEqual({ value: 'stream 2' }); - expect(results[2]).toEqual({ value: 'stream 3' }); - } + expect(withWritable).toThrow('All input streams must be readable'); - pipe([streams, concat(assert)], done); - }); + done(); + }); - it('emits all data event from each stream', function (done) { - var s = from.obj([ - { value: 'data1' }, - { value: 'data2' }, - { value: 'data3' }, - ]); + it('throws an error if stream does not have a _read function', function (done) { + function FakeReadable() { + this.readable = true; + this.pipe = function () {}; + } - var streams = new OrderedStreams(s); + function withoutRead() { + new OrderedStreams(new FakeReadable()); + } - function assert(results) { - expect(results.length).toEqual(3); - } + expect(withoutRead).toThrow('All input streams must be readable'); - pipe([streams, concat(assert)], done); - }); + done(); + }); - it('preserves streams order', function (done) { - var s1 = fromOnce(function (size, next) { - setTimeout(function () { - next(null, { value: 'stream 1' }); - }, 200); + it('emits data from all streams', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var streams = new OrderedStreams([s1, s2, s3]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } + + stream.pipeline([streams, concat(assert)], done); }); - var s2 = fromOnce(function (size, next) { - setTimeout(function () { - next(null, { value: 'stream 2' }); - }, 30); + + it('works without new keyword', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var ordered = OrderedStreams; + + var streams = ordered([s1, s2, s3]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } + + stream.pipeline([streams, concat(assert)], done); }); - var s3 = fromOnce(function (size, next) { - setTimeout(function () { - next(null, { value: 'stream 3' }); - }, 100); + + it('flattens arrays of streams 1 levels deep', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var streams = new OrderedStreams([s1, [s2, s3]]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } + + stream.pipeline([streams, concat(assert)], done); }); - var streams = new OrderedStreams([s1, s2, s3]); + it('does not allow changing our read function', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); - function assert(results) { - expect(results.length).toEqual(3); - expect(results[0]).toEqual({ value: 'stream 1' }); - expect(results[1]).toEqual({ value: 'stream 2' }); - expect(results[2]).toEqual({ value: 'stream 3' }); - } + var streams = new OrderedStreams([s1, s2, s3], { + read: function () { + throw new Error('boom'); + }, + }); - pipe([streams, concat(assert)], done); - }); + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } - it('emits stream errors downstream', function (done) { - var s = fromOnce(function (size, next) { - setTimeout(function () { - next(new Error('stahp!')); - }, 500); + stream.pipeline([streams, concat(assert)], done); }); - var s2 = from.obj([{ value: 'Im ok!' }]); - var streams = new OrderedStreams([s, s2]); + it('emits all data event from each stream', function (done) { + var s = stream.Readable.from([ + { value: 'data1' }, + { value: 'data2' }, + { value: 'data3' }, + ]); - function assert(err) { - expect(err.message).toEqual('stahp!'); - done(); - } + var streams = new OrderedStreams(s); - pipe([streams, concat()], assert); - }); + function assert(results) { + expect(results.length).toEqual(3); + } - it('emits received data before a stream errors downstream', function (done) { - var s = fromOnce(function (size, next) { - setTimeout(function () { - next(new Error('stahp!')); - }, 500); + stream.pipeline([streams, concat(assert)], done); }); - var s2 = from.obj([{ value: 'Im ok!' }]); - // Invert the order to emit data first - var streams = new OrderedStreams([s2, s]); + it('respects highWaterMark', function (done) { + this.timeout(5000); - function assertData(chunk, enc, next) { - expect(chunk).toEqual({ value: 'Im ok!' }); - next(); - } + var s1 = stream.Readable.from( + [{ value: 'data1' }, { value: 'data2' }, { value: 'data3' }], + { highWaterMark: 1 } + ); + var s2 = stream.Readable.from( + [{ value: 'data4' }, { value: 'data5' }, { value: 'data6' }], + { highWaterMark: 1 } + ); + var s3 = stream.Readable.from( + [{ value: 'data7' }, { value: 'data8' }, { value: 'data9' }], + { highWaterMark: 1 } + ); - function assertErr(err) { - expect(err.message).toEqual('stahp!'); - done(); - } + var streams = new OrderedStreams([s1, s2, s3]); + + function assert(results) { + expect(results.length).toEqual(9); + } + + stream.pipeline([streams, concat(assert, 250)], done); + }); + + it('can set highWaterMark on self', function (done) { + this.timeout(5000); + + var s1 = stream.Readable.from([ + { value: 'data1' }, + { value: 'data2' }, + { value: 'data3' }, + ]); + var s2 = stream.Readable.from([ + { value: 'data4' }, + { value: 'data5' }, + { value: 'data6' }, + ]); + var s3 = stream.Readable.from([ + { value: 'data7' }, + { value: 'data8' }, + { value: 'data9' }, + ]); + + var streams = new OrderedStreams([s1, s2, s3], { highWaterMark: 1 }); + + function assert(results) { + expect(results.length).toEqual(9); + } + + stream.pipeline([streams, concat(assert, 250)], done); + }); + + it('preserves streams order', function (done) { + var s1 = fromOnce(function (next) { + setTimeout(function () { + next(null, { value: 'stream 1' }); + }, 200); + }); + var s2 = fromOnce(function (next) { + setTimeout(function () { + next(null, { value: 'stream 2' }); + }, 30); + }); + var s3 = fromOnce(function (next) { + setTimeout(function () { + next(null, { value: 'stream 3' }); + }, 100); + }); + + var streams = new OrderedStreams([s1, s2, s3]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } - pipe([streams, to.obj(assertData)], assertErr); + stream.pipeline([streams, concat(assert)], done); + }); + + it('emits stream errors downstream', function (done) { + var s = fromOnce(function (next) { + setTimeout(function () { + next(new Error('stahp!')); + }, 500); + }); + var s2 = stream.Readable.from([{ value: 'Im ok!' }]); + + var streams = new OrderedStreams([s, s2]); + + function assert(err) { + expect(err.message).toEqual('stahp!'); + done(); + } + + stream.pipeline([streams, concat()], assert); + }); + + it('emits received data before a stream errors downstream', function (done) { + var s = fromOnce(function (next) { + setTimeout(function () { + next(new Error('stahp!')); + }, 500); + }); + var s2 = stream.Readable.from([{ value: 'Im ok!' }]); + + // Invert the order to emit data first + var streams = new OrderedStreams([s2, s]); + + function assertData(chunk, enc, next) { + if (typeof enc === 'function') { + next = enc; + } + expect(chunk).toEqual({ value: 'Im ok!' }); + next(); + } + + function assertErr(err) { + expect(err.message).toEqual('stahp!'); + done(); + } + + stream.pipeline( + [streams, new stream.Writable({ objectMode: true, write: assertData })], + assertErr + ); + }); }); -}); +} + +suite('stream'); +suite('streamx'); +suite('readable-stream');