Skip to content

Commit

Permalink
feat!: Switch to streamx
Browse files Browse the repository at this point in the history
feat!: Avoid eagerly reading streams

chore: Ensure project works with different streams

chore: Update example and change description in README
  • Loading branch information
phated committed Aug 31, 2022
1 parent f6e5671 commit 1d79952
Show file tree
Hide file tree
Showing 4 changed files with 431 additions and 221 deletions.
96 changes: 53 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,72 @@

[![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url]

Combines array of streams into one read stream in strict order.

## Overview

`ordered-read-streams` handles all data/errors from input streams in parallel, but emits data/errors in strict order in which streams are passed to constructor. This is `objectMode = true` stream.
Combines array of streams into one Readable stream in strict order.

## Usage

```js
var through = require('through2');
var Ordered = require('ordered-read-streams');

var s1 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 200);
var { Readable } = require('streamx');
var ordered = require('ordered-read-streams');

var s1 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 1');
cb(null);
}, 200);
},
});
var s2 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 30);
var s2 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 2');
cb(null);
}, 30);
},
});
var s3 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 100);
var s3 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 3');
cb(null);
}, 100);
},
});

var streams = new Ordered([s1, s2, s3]);
streams.on('data', function (data) {
var readable = ordered([s1, s2, s3]);
readable.on('data', function (data) {
console.log(data);
// Logs:
// stream 1
// stream 2
// stream 3
});

s1.write('stream 1');
s1.end();

s2.write('stream 2');
s2.end();

s3.write('stream 3');
s3.end();
```

Ouput will be:
## API

```
stream 1
stream 2
stream 3
```
### `ordered(streams, [options])`

Takes an array of Readable streams and produces a single Readable stream that will consume the provided streams in strict order. The produced Readable stream respects backpressure on itself and any provided streams.

## License

Expand Down
133 changes: 73 additions & 60 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
var Readable = require('readable-stream/readable');
var util = require('util');
var Readable = require('streamx').Readable;

function isReadable(stream) {
if (typeof stream.pipe !== 'function') {
Expand All @@ -10,92 +9,106 @@ function isReadable(stream) {
return false;
}

if (typeof stream._read !== 'function') {
return false;
}

if (!stream._readableState) {
if (typeof stream.read !== 'function') {
return false;
}

return true;
}

function addStream(streams, stream) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
function OrderedStreams(streams, options) {
streams = streams || [];

if (!Array.isArray(streams)) {
streams = [streams];
}

var self = this;
streams = Array.prototype.concat.apply([], streams);

stream._buffer = [];
options = Object.assign({}, options, {
read: read,
});

stream.on('readable', function () {
var chunk = stream.read();
while (chunk) {
if (this === streams[0]) {
self.push(chunk);
} else {
this._buffer.push(chunk);
}
chunk = stream.read();
streams.forEach(function (stream) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
}
});

stream.on('end', function () {
for (
var stream = streams[0];
stream && stream._readableState.ended;
stream = streams[0]
) {
while (stream._buffer.length) {
self.push(stream._buffer.shift());
}
var streamIdx = 0;

streams.shift();
}
function read(cb) {
var self = this;

if (!streams.length) {
var activeStream = streams[streamIdx];
if (!activeStream) {
self.push(null);
return cb(null);
}
});

stream.on('error', this.emit.bind(this, 'error'));
function cleanup() {
activeStream.off('readable', onRead);
activeStream.off('error', onError);
activeStream.off('end', onEnd);
}

streams.push(stream);
}
function onError(err) {
cleanup();
cb(err);
}

function OrderedStreams(streams, options) {
if (!(this instanceof OrderedStreams)) {
return new OrderedStreams(streams, options);
}
function onEnd() {
// When a stream ends, we want to increment index of the stream we are reading from
streamIdx++;
// Then we want to cleanup handlers on the previously active stream
cleanup();
// Finally we recursively call this function to read from the next stream
read.call(self, cb);
}

streams = streams || [];
options = options || {};
function onRead(chunk) {
var drained = true;

options.objectMode = true;
// If the chunk is null, we don't want to cleanup because the
// `end` event might be emitted afterwards
if (chunk === null) {
return;
}

Readable.call(this, options);
while (chunk !== null && drained) {
drained = self.push(chunk);
// If our stream is not backpressured, we want to process another chunk
if (drained) {
chunk = activeStream.read();
}
}

if (!Array.isArray(streams)) {
streams = [streams];
}
if (!streams.length) {
return this.push(null); // no streams, close
}
cleanup();
cb(null);
}

activeStream.once('error', onError);
activeStream.once('end', onEnd);

var addStreamBinded = addStream.bind(this, []);
// Try reading the first chunk
var chunk = activeStream.read();

streams.forEach(function (item) {
if (Array.isArray(item)) {
item.forEach(addStreamBinded);
// If we backpressured the OrderedReadStream, we'll have a chunk
// and don't need to wait for a `readable` event
if (chunk) {
onRead(chunk);
} else {
addStreamBinded(item);
// If the first chunk is null we want to wait for `readable` to handle both the first
// access and a backpressured stream
activeStream.once('readable', function () {
// Once `readable`, we need to grab the first chunk before passing it to onRead
var chunk = activeStream.read();
onRead(chunk);
});
}
});
}
util.inherits(OrderedStreams, Readable);
}

OrderedStreams.prototype._read = function () {};
return new Readable(options);
}

module.exports = OrderedStreams;
32 changes: 22 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,36 @@
"name": "ordered-read-streams",
"version": "1.0.1",
"description": "Combines array of streams into one read stream in strict order",
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)",
"contributors": [
"Blaine Bublitz <blaine.bublitz@gmail.com>",
"Artem Medeu <artem.medeusheyev@gmail.com>"
],
"repository": "gulpjs/ordered-read-streams",
"license": "MIT",
"engines": {
"node": ">= 10.13.0"
},
"files": [
"LICENSE",
"index.js"
],
"scripts": {
"lint": "eslint .",
"pretest": "npm run lint",
"test": "nyc mocha --async-only"
},
"repository": "gulpjs/ordered-read-streams",
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)",
"license": "MIT",
"engines": {
"node": ">= 10.13.0"
},
"dependencies": {
"readable-stream": "^3.6.0"
"streamx": "^2.12.5"
},
"devDependencies": {
"eslint": "^7.32.0",
"eslint-config-gulp": "^5.0.1",
"eslint-plugin-node": "^11.1.0",
"expect": "^27.4.2",
"mississippi": "^4.0.0",
"mocha": "^8.4.0",
"nyc": "^15.1.0",
"through2": "^4.0.2"
"readable-stream": "^3.6.0"
},
"nyc": {
"reporter": [
Expand All @@ -37,5 +41,13 @@
},
"prettier": {
"singleQuote": true
}
},
"keywords": [
"streams",
"ordered",
"group",
"combine",
"streamx",
"readable"
]
}

0 comments on commit 1d79952

Please sign in to comment.