From edb9aa1e1937e6be80ec022aa56ff21817eb7515 Mon Sep 17 00:00:00 2001 From: Blaine Bublitz Date: Sun, 9 Oct 2022 15:44:40 -0700 Subject: [PATCH] feat!: Switch to streamx (#27) feat!: Avoid eagerly reading streams chore: Ensure project works with different streams chore: Update example and change description in README chore: Update license & description fix!: Destroy all streams correctly when another is destroyed --- LICENSE | 2 +- README.md | 96 ++++++----- index.js | 149 +++++++++------- package.json | 34 ++-- test/main.js | 469 +++++++++++++++++++++++++++++++++++++++------------ 5 files changed, 528 insertions(+), 222 deletions(-) diff --git a/LICENSE b/LICENSE index 0c953f6..748b6bb 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2014 Artem Medeusheyev & 2020-2021 Blaine Bublitz and Eric Schoffstall +Copyright (c) 2014 Artem Medeusheyev & 2020-2022 Blaine Bublitz and Eric Schoffstall Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index b22d33a..e5f7473 100644 --- a/README.md +++ b/README.md @@ -8,62 +8,72 @@ [![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url] -Combines array of streams into one read stream in strict order. - -## Overview - -`ordered-read-streams` handles all data/errors from input streams in parallel, but emits data/errors in strict order in which streams are passed to constructor. This is `objectMode = true` stream. +Combines array of streams into one Readable stream in strict order. ## Usage ```js -var through = require('through2'); -var Ordered = require('ordered-read-streams'); - -var s1 = through.obj(function (data, enc, next) { - var self = this; - setTimeout(function () { - self.push(data); - next(); - }, 200); +var { Readable } = require('streamx'); +var ordered = require('ordered-read-streams'); + +var s1 = new Readable({ + read: function (cb) { + var self = this; + if (self.called) { + self.push(null); + return cb(null); + } + setTimeout(function () { + self.called = true; + self.push('stream 1'); + cb(null); + }, 200); + }, }); -var s2 = through.obj(function (data, enc, next) { - var self = this; - setTimeout(function () { - self.push(data); - next(); - }, 30); +var s2 = new Readable({ + read: function (cb) { + var self = this; + if (self.called) { + self.push(null); + return cb(null); + } + setTimeout(function () { + self.called = true; + self.push('stream 2'); + cb(null); + }, 30); + }, }); -var s3 = through.obj(function (data, enc, next) { - var self = this; - setTimeout(function () { - self.push(data); - next(); - }, 100); +var s3 = new Readable({ + read: function (cb) { + var self = this; + if (self.called) { + self.push(null); + return cb(null); + } + setTimeout(function () { + self.called = true; + self.push('stream 3'); + cb(null); + }, 100); + }, }); -var streams = new Ordered([s1, s2, s3]); -streams.on('data', function (data) { +var readable = ordered([s1, s2, s3]); +readable.on('data', function (data) { console.log(data); + // Logs: + // stream 1 + // stream 2 + // stream 3 }); - -s1.write('stream 1'); -s1.end(); - -s2.write('stream 2'); -s2.end(); - -s3.write('stream 3'); -s3.end(); ``` -Ouput will be: +## API -``` -stream 1 -stream 2 -stream 3 -``` +### `ordered(streams, [options])` + +Takes an array of Readable streams and produces a single Readable stream that will consume the provided streams in strict order. The produced Readable stream respects backpressure on itself and any provided streams. ## License diff --git a/index.js b/index.js index 79ac0f4..89e51f2 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,4 @@ -var Readable = require('readable-stream/readable'); -var util = require('util'); +var Readable = require('streamx').Readable; function isReadable(stream) { if (typeof stream.pipe !== 'function') { @@ -10,92 +9,124 @@ function isReadable(stream) { return false; } - if (typeof stream._read !== 'function') { - return false; - } - - if (!stream._readableState) { + if (typeof stream.read !== 'function') { return false; } return true; } -function addStream(streams, stream) { +function assertReadableStream(stream) { if (!isReadable(stream)) { throw new Error('All input streams must be readable'); } +} - var self = this; +function OrderedStreams(streams, options) { + streams = streams || []; - stream._buffer = []; + if (!Array.isArray(streams)) { + streams = [streams]; + } - stream.on('readable', function () { - var chunk = stream.read(); - while (chunk) { - if (this === streams[0]) { - self.push(chunk); - } else { - this._buffer.push(chunk); - } - chunk = stream.read(); - } + streams = Array.prototype.concat.apply([], streams); + + streams.forEach(assertReadableStream); + + options = Object.assign({}, options, { + read: read, + predestroy: predestroy, }); - stream.on('end', function () { - for ( - var stream = streams[0]; - stream && stream._readableState.ended; - stream = streams[0] - ) { - while (stream._buffer.length) { - self.push(stream._buffer.shift()); - } + var readable = new Readable(options); - streams.shift(); - } + var streamIdx = 0; + var activeStream = streams[streamIdx]; + + if (!activeStream) { + readable.push(null); + } + + var destroyedIdx = -1; + var destroyedByError = false; + var readableClosed = false; + + streams.forEach(setup); + + function setup(stream, idx) { + stream.on('data', onData); + stream.once('error', onError); + stream.once('end', onEnd); + stream.once('close', onClose); + + stream.pause(); - if (!streams.length) { - self.push(null); + function cleanup() { + stream.off('data', onData); + stream.off('error', onError); + stream.off('end', onEnd); + stream.off('close', onClose); } - }); - stream.on('error', this.emit.bind(this, 'error')); + function onError(err) { + destroyedByError = true; + cleanup(); + readable.destroy(err); + } - streams.push(stream); -} + function onEnd() { + streamIdx++; + activeStream = streams[streamIdx]; + cleanup(); + if (!activeStream) { + readable.push(null); + } else { + activeStream.resume(); + } + } -function OrderedStreams(streams, options) { - if (!(this instanceof OrderedStreams)) { - return new OrderedStreams(streams, options); + function onClose() { + destroyedIdx = idx; + readableClosed = true; + cleanup(); + readable.destroy(); + } } - streams = streams || []; - options = options || {}; + function predestroy() { + streams.forEach(destroyStream); + } - options.objectMode = true; + function destroyStream(stream, idx) { + if (destroyedIdx === idx) { + return; + } - Readable.call(this, options); + if (destroyedByError) { + return stream.destroy(); + } + if (readableClosed) { + return stream.destroy(); + } - if (!Array.isArray(streams)) { - streams = [streams]; + stream.destroy(new Error('Wrapper destroyed')); } - if (!streams.length) { - return this.push(null); // no streams, close + + function onData(chunk) { + var drained = readable.push(chunk); + // If the stream is not drained, we pause the activeStream + // The activeStream will be resumed on the next call to `read` + if (!drained) { + activeStream.pause(); + } } - var addStreamBinded = addStream.bind(this, []); + function read(cb) { + activeStream.resume(); + cb(); + } - streams.forEach(function (item) { - if (Array.isArray(item)) { - item.forEach(addStreamBinded); - } else { - addStreamBinded(item); - } - }); + return readable; } -util.inherits(OrderedStreams, Readable); - -OrderedStreams.prototype._read = function () {}; module.exports = OrderedStreams; diff --git a/package.json b/package.json index 6845ef7..222b257 100644 --- a/package.json +++ b/package.json @@ -1,8 +1,19 @@ { "name": "ordered-read-streams", "version": "1.0.1", - "description": "Combines array of streams into one read stream in strict order", + "description": "Combines array of streams into one Readable stream in strict order.", + "author": "Gulp Team (https://gulpjs.com/)", + "contributors": [ + "Blaine Bublitz ", + "Artem Medeu " + ], + "repository": "gulpjs/ordered-read-streams", + "license": "MIT", + "engines": { + "node": ">= 10.13.0" + }, "files": [ + "LICENSE", "index.js" ], "scripts": { @@ -10,24 +21,17 @@ "pretest": "npm run lint", "test": "nyc mocha --async-only" }, - "repository": "gulpjs/ordered-read-streams", - "author": "Gulp Team (https://gulpjs.com/)", - "license": "MIT", - "engines": { - "node": ">= 10.13.0" - }, "dependencies": { - "readable-stream": "^3.6.0" + "streamx": "^2.12.5" }, "devDependencies": { "eslint": "^7.32.0", "eslint-config-gulp": "^5.0.1", "eslint-plugin-node": "^11.1.0", "expect": "^27.4.2", - "mississippi": "^4.0.0", "mocha": "^8.4.0", "nyc": "^15.1.0", - "through2": "^4.0.2" + "readable-stream": "^3.6.0" }, "nyc": { "reporter": [ @@ -37,5 +41,13 @@ }, "prettier": { "singleQuote": true - } + }, + "keywords": [ + "streams", + "ordered", + "group", + "combine", + "streamx", + "readable" + ] } diff --git a/test/main.js b/test/main.js index 41e1f31..69edb4b 100644 --- a/test/main.js +++ b/test/main.js @@ -1,145 +1,398 @@ var expect = require('expect'); -var miss = require('mississippi'); - var OrderedStreams = require('..'); -var to = miss.to; -var from = miss.from; -var pipe = miss.pipe; -var concat = miss.concat; - -function fromOnce(fn) { - var called = false; - return from.obj(function (size, next) { - if (called) { - return next(null, null); - } - called = true; - fn.apply(this, arguments); - }); -} +function suite(moduleName) { + var stream = require(moduleName); -describe('ordered-read-streams', function () { - it('ends if no streams are given', function (done) { - var streams = new OrderedStreams(); + function fromOnce(fn) { + var called = false; + return new stream.Readable({ + objectMode: true, + read: function (cb) { + var self = this; + if (called) { + this.push(null); + if (typeof cb === 'function') { + cb(null); + } + return; + } + fn(function (err, chunk) { + called = true; + if (!err) { + self.push(chunk); + } - pipe([streams, concat()], done); - }); + if (typeof cb === 'function') { + cb(err); + } else { + if (err) { + return self.destroy(err); + } + } + }); + }, + }); + } - it('throws an error if stream is not readable', function (done) { - var writable = to(); + function concat(fn, timeout) { + var items = []; + return new stream.Writable({ + objectMode: true, + write: function (chunk, enc, cb) { + if (typeof enc === 'function') { + cb = enc; + } + setTimeout(function () { + items.push(chunk); + cb(); + }, timeout || 1); + }, + final: function (cb) { + if (typeof fn === 'function') { + fn(items); + } - function withWritable() { - new OrderedStreams(writable); - } + cb(); + }, + }); + } - expect(withWritable).toThrow('All input streams must be readable'); + describe('ordered-read-streams (' + moduleName + ')', function () { + it('ends if no streams are given', function (done) { + var streams = new OrderedStreams(); - done(); - }); + stream.pipeline([streams, concat()], done); + }); - it('emits data from all streams', function (done) { - var s1 = from.obj([{ value: 'stream 1' }]); - var s2 = from.obj([{ value: 'stream 2' }]); - var s3 = from.obj([{ value: 'stream 3' }]); + it('throws an error if stream is not readable', function (done) { + var writable = new stream.Writable({ write: function () {} }); - var streams = new OrderedStreams([s1, s2, s3]); + function withWritable() { + new OrderedStreams(writable); + } - function assert(results) { - expect(results.length).toEqual(3); - expect(results[0]).toEqual({ value: 'stream 1' }); - expect(results[1]).toEqual({ value: 'stream 2' }); - expect(results[2]).toEqual({ value: 'stream 3' }); - } + expect(withWritable).toThrow('All input streams must be readable'); - pipe([streams, concat(assert)], done); - }); + done(); + }); - it('emits all data event from each stream', function (done) { - var s = from.obj([ - { value: 'data1' }, - { value: 'data2' }, - { value: 'data3' }, - ]); + it('throws an error if stream does not have a _read function', function (done) { + function FakeReadable() { + this.readable = true; + this.pipe = function () {}; + } - var streams = new OrderedStreams(s); + function withoutRead() { + new OrderedStreams(new FakeReadable()); + } - function assert(results) { - expect(results.length).toEqual(3); - } + expect(withoutRead).toThrow('All input streams must be readable'); - pipe([streams, concat(assert)], done); - }); + done(); + }); + + it('emits data from all streams', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var streams = new OrderedStreams([s1, s2, s3]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } + + stream.pipeline([streams, concat(assert)], done); + }); + + it('works without new keyword', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var ordered = OrderedStreams; + + var streams = ordered([s1, s2, s3]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } - it('preserves streams order', function (done) { - var s1 = fromOnce(function (size, next) { - setTimeout(function () { - next(null, { value: 'stream 1' }); - }, 200); + stream.pipeline([streams, concat(assert)], done); }); - var s2 = fromOnce(function (size, next) { - setTimeout(function () { - next(null, { value: 'stream 2' }); - }, 30); + + it('flattens arrays of streams 1 levels deep', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var streams = new OrderedStreams([s1, [s2, s3]]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } + + stream.pipeline([streams, concat(assert)], done); }); - var s3 = fromOnce(function (size, next) { - setTimeout(function () { - next(null, { value: 'stream 3' }); - }, 100); + + it('does not allow changing our read function', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var streams = new OrderedStreams([s1, s2, s3], { + read: function () { + throw new Error('boom'); + }, + }); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } + + stream.pipeline([streams, concat(assert)], done); }); - var streams = new OrderedStreams([s1, s2, s3]); + it('emits all data event from each stream', function (done) { + var s = stream.Readable.from([ + { value: 'data1' }, + { value: 'data2' }, + { value: 'data3' }, + ]); - function assert(results) { - expect(results.length).toEqual(3); - expect(results[0]).toEqual({ value: 'stream 1' }); - expect(results[1]).toEqual({ value: 'stream 2' }); - expect(results[2]).toEqual({ value: 'stream 3' }); - } + var streams = new OrderedStreams(s); - pipe([streams, concat(assert)], done); - }); + function assert(results) { + expect(results.length).toEqual(3); + } - it('emits stream errors downstream', function (done) { - var s = fromOnce(function (size, next) { - setTimeout(function () { - next(new Error('stahp!')); - }, 500); + stream.pipeline([streams, concat(assert)], done); }); - var s2 = from.obj([{ value: 'Im ok!' }]); - var streams = new OrderedStreams([s, s2]); + it('respects highWaterMark', function (done) { + this.timeout(5000); - function assert(err) { - expect(err.message).toEqual('stahp!'); - done(); - } + var s1 = stream.Readable.from( + [{ value: 'data1' }, { value: 'data2' }, { value: 'data3' }], + { highWaterMark: 1 } + ); + var s2 = stream.Readable.from( + [{ value: 'data4' }, { value: 'data5' }, { value: 'data6' }], + { highWaterMark: 1 } + ); + var s3 = stream.Readable.from( + [{ value: 'data7' }, { value: 'data8' }, { value: 'data9' }], + { highWaterMark: 1 } + ); - pipe([streams, concat()], assert); - }); + var streams = new OrderedStreams([s1, s2, s3]); - it('emits received data before a stream errors downstream', function (done) { - var s = fromOnce(function (size, next) { - setTimeout(function () { - next(new Error('stahp!')); - }, 500); + function assert(results) { + expect(results.length).toEqual(9); + } + + stream.pipeline([streams, concat(assert, 250)], done); }); - var s2 = from.obj([{ value: 'Im ok!' }]); - // Invert the order to emit data first - var streams = new OrderedStreams([s2, s]); + it('can set highWaterMark on self', function (done) { + this.timeout(5000); - function assertData(chunk, enc, next) { - expect(chunk).toEqual({ value: 'Im ok!' }); - next(); - } + var s1 = stream.Readable.from([ + { value: 'data1' }, + { value: 'data2' }, + { value: 'data3' }, + ]); + var s2 = stream.Readable.from([ + { value: 'data4' }, + { value: 'data5' }, + { value: 'data6' }, + ]); + var s3 = stream.Readable.from([ + { value: 'data7' }, + { value: 'data8' }, + { value: 'data9' }, + ]); - function assertErr(err) { - expect(err.message).toEqual('stahp!'); - done(); - } + var highWaterMark = moduleName === 'streamx' ? 1024 : 1; - pipe([streams, to.obj(assertData)], assertErr); + var streams = new OrderedStreams([s1, s2, s3], { + highWaterMark: highWaterMark, + }); + + function assert(results) { + expect(results.length).toEqual(9); + } + + stream.pipeline([streams, concat(assert, 250)], done); + }); + + it('preserves streams order', function (done) { + var s1 = fromOnce(function (next) { + setTimeout(function () { + next(null, { value: 'stream 1' }); + }, 200); + }); + var s2 = fromOnce(function (next) { + setTimeout(function () { + next(null, { value: 'stream 2' }); + }, 30); + }); + var s3 = fromOnce(function (next) { + setTimeout(function () { + next(null, { value: 'stream 3' }); + }, 100); + }); + + var streams = new OrderedStreams([s1, s2, s3]); + + function assert(results) { + expect(results.length).toEqual(3); + expect(results[0]).toEqual({ value: 'stream 1' }); + expect(results[1]).toEqual({ value: 'stream 2' }); + expect(results[2]).toEqual({ value: 'stream 3' }); + } + + stream.pipeline([streams, concat(assert)], done); + }); + + it('emits stream errors downstream', function (done) { + var s = fromOnce(function (next) { + setTimeout(function () { + next(new Error('stahp!')); + }, 500); + }); + var s2 = stream.Readable.from([{ value: 'Im ok!' }]); + + var streams = new OrderedStreams([s, s2]); + + function assert(err) { + expect(err.message).toEqual('stahp!'); + done(); + } + + stream.pipeline([streams, concat()], assert); + }); + + it('emits received data before a stream errors downstream', function (done) { + var s = fromOnce(function (next) { + setTimeout(function () { + next(new Error('stahp!')); + }, 500); + }); + var s2 = stream.Readable.from([{ value: 'Im ok!' }]); + + // Invert the order to emit data first + var streams = new OrderedStreams([s2, s]); + + function assertData(chunk, enc, next) { + if (typeof enc === 'function') { + next = enc; + } + expect(chunk).toEqual({ value: 'Im ok!' }); + next(); + } + + function assertErr(err) { + expect(err.message).toEqual('stahp!'); + done(); + } + + stream.pipeline( + [streams, new stream.Writable({ objectMode: true, write: assertData })], + assertErr + ); + }); + + it('destroys all readable streams if the wrapper is destroyed', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var streams = new OrderedStreams([s1, s2, s3]); + + var errors = []; + + s1.on('error', function (err) { + errors.push(err); + assertErr(); + }); + s2.on('error', function (err) { + errors.push(err); + assertErr(); + }); + s3.on('error', function (err) { + errors.push(err); + assertErr(); + }); + + function assertErr() { + if (errors.length === 3) { + expect(errors[0].message).toEqual('Wrapper destroyed'); + expect(errors[1].message).toEqual('Wrapper destroyed'); + expect(errors[2].message).toEqual('Wrapper destroyed'); + done(); + } + } + + streams.destroy(); + }); + + it('destroys the wrapper and other streams if any readable stream is destroyed', function (done) { + var s1 = stream.Readable.from([{ value: 'stream 1' }]); + var s2 = stream.Readable.from([{ value: 'stream 2' }]); + var s3 = stream.Readable.from([{ value: 'stream 3' }]); + + var streams = new OrderedStreams([s1, s2, s3]); + + var closed = []; + + s1.on('close', function () { + closed.push('s1'); + assert(); + }); + s2.on('close', function () { + closed.push('s2'); + assert(); + }); + s3.on('close', function () { + closed.push('s3'); + assert(); + }); + streams.on('close', function () { + closed.push('wrapper'); + assert(); + }); + + function assert() { + if (closed.length === 4) { + expect(closed).toContain('s2'); + expect(closed).toContain('wrapper'); + expect(closed).toContain('s1'); + expect(closed).toContain('s3'); + + done(); + } + } + + s2.destroy(); + }); }); -}); +} + +suite('stream'); +suite('streamx'); +suite('readable-stream');