Skip to content

Commit

Permalink
feat!: Switch to streamx (#27)
Browse files Browse the repository at this point in the history
feat!: Avoid eagerly reading streams
chore: Ensure project works with different streams
chore: Update example and change description in README
chore: Update license & description
fix!: Destroy all streams correctly when another is destroyed
  • Loading branch information
phated committed Oct 9, 2022
1 parent f6e5671 commit edb9aa1
Show file tree
Hide file tree
Showing 5 changed files with 528 additions and 222 deletions.
2 changes: 1 addition & 1 deletion LICENSE
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2014 Artem Medeusheyev & 2020-2021 Blaine Bublitz <blaine.bublitz@gmail.com> and Eric Schoffstall <yo@contra.io>
Copyright (c) 2014 Artem Medeusheyev & 2020-2022 Blaine Bublitz <blaine.bublitz@gmail.com> and Eric Schoffstall <yo@contra.io>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
96 changes: 53 additions & 43 deletions README.md
Expand Up @@ -8,62 +8,72 @@

[![NPM version][npm-image]][npm-url] [![Downloads][downloads-image]][npm-url] [![Build Status][ci-image]][ci-url] [![Coveralls Status][coveralls-image]][coveralls-url]

Combines array of streams into one read stream in strict order.

## Overview

`ordered-read-streams` handles all data/errors from input streams in parallel, but emits data/errors in strict order in which streams are passed to constructor. This is `objectMode = true` stream.
Combines array of streams into one Readable stream in strict order.

## Usage

```js
var through = require('through2');
var Ordered = require('ordered-read-streams');

var s1 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 200);
var { Readable } = require('streamx');
var ordered = require('ordered-read-streams');

var s1 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 1');
cb(null);
}, 200);
},
});
var s2 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 30);
var s2 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 2');
cb(null);
}, 30);
},
});
var s3 = through.obj(function (data, enc, next) {
var self = this;
setTimeout(function () {
self.push(data);
next();
}, 100);
var s3 = new Readable({
read: function (cb) {
var self = this;
if (self.called) {
self.push(null);
return cb(null);
}
setTimeout(function () {
self.called = true;
self.push('stream 3');
cb(null);
}, 100);
},
});

var streams = new Ordered([s1, s2, s3]);
streams.on('data', function (data) {
var readable = ordered([s1, s2, s3]);
readable.on('data', function (data) {
console.log(data);
// Logs:
// stream 1
// stream 2
// stream 3
});

s1.write('stream 1');
s1.end();

s2.write('stream 2');
s2.end();

s3.write('stream 3');
s3.end();
```

Ouput will be:
## API

```
stream 1
stream 2
stream 3
```
### `ordered(streams, [options])`

Takes an array of Readable streams and produces a single Readable stream that will consume the provided streams in strict order. The produced Readable stream respects backpressure on itself and any provided streams.

## License

Expand Down
149 changes: 90 additions & 59 deletions index.js
@@ -1,5 +1,4 @@
var Readable = require('readable-stream/readable');
var util = require('util');
var Readable = require('streamx').Readable;

function isReadable(stream) {
if (typeof stream.pipe !== 'function') {
Expand All @@ -10,92 +9,124 @@ function isReadable(stream) {
return false;
}

if (typeof stream._read !== 'function') {
return false;
}

if (!stream._readableState) {
if (typeof stream.read !== 'function') {
return false;
}

return true;
}

function addStream(streams, stream) {
function assertReadableStream(stream) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
}
}

var self = this;
function OrderedStreams(streams, options) {
streams = streams || [];

stream._buffer = [];
if (!Array.isArray(streams)) {
streams = [streams];
}

stream.on('readable', function () {
var chunk = stream.read();
while (chunk) {
if (this === streams[0]) {
self.push(chunk);
} else {
this._buffer.push(chunk);
}
chunk = stream.read();
}
streams = Array.prototype.concat.apply([], streams);

streams.forEach(assertReadableStream);

options = Object.assign({}, options, {
read: read,
predestroy: predestroy,
});

stream.on('end', function () {
for (
var stream = streams[0];
stream && stream._readableState.ended;
stream = streams[0]
) {
while (stream._buffer.length) {
self.push(stream._buffer.shift());
}
var readable = new Readable(options);

streams.shift();
}
var streamIdx = 0;
var activeStream = streams[streamIdx];

if (!activeStream) {
readable.push(null);
}

var destroyedIdx = -1;
var destroyedByError = false;
var readableClosed = false;

streams.forEach(setup);

function setup(stream, idx) {
stream.on('data', onData);
stream.once('error', onError);
stream.once('end', onEnd);
stream.once('close', onClose);

stream.pause();

if (!streams.length) {
self.push(null);
function cleanup() {
stream.off('data', onData);
stream.off('error', onError);
stream.off('end', onEnd);
stream.off('close', onClose);
}
});

stream.on('error', this.emit.bind(this, 'error'));
function onError(err) {
destroyedByError = true;
cleanup();
readable.destroy(err);
}

streams.push(stream);
}
function onEnd() {
streamIdx++;
activeStream = streams[streamIdx];
cleanup();
if (!activeStream) {
readable.push(null);
} else {
activeStream.resume();
}
}

function OrderedStreams(streams, options) {
if (!(this instanceof OrderedStreams)) {
return new OrderedStreams(streams, options);
function onClose() {
destroyedIdx = idx;
readableClosed = true;
cleanup();
readable.destroy();
}
}

streams = streams || [];
options = options || {};
function predestroy() {
streams.forEach(destroyStream);
}

options.objectMode = true;
function destroyStream(stream, idx) {
if (destroyedIdx === idx) {
return;
}

Readable.call(this, options);
if (destroyedByError) {
return stream.destroy();
}
if (readableClosed) {
return stream.destroy();
}

if (!Array.isArray(streams)) {
streams = [streams];
stream.destroy(new Error('Wrapper destroyed'));
}
if (!streams.length) {
return this.push(null); // no streams, close

function onData(chunk) {
var drained = readable.push(chunk);
// If the stream is not drained, we pause the activeStream
// The activeStream will be resumed on the next call to `read`
if (!drained) {
activeStream.pause();
}
}

var addStreamBinded = addStream.bind(this, []);
function read(cb) {
activeStream.resume();
cb();
}

streams.forEach(function (item) {
if (Array.isArray(item)) {
item.forEach(addStreamBinded);
} else {
addStreamBinded(item);
}
});
return readable;
}
util.inherits(OrderedStreams, Readable);

OrderedStreams.prototype._read = function () {};

module.exports = OrderedStreams;
34 changes: 23 additions & 11 deletions package.json
@@ -1,33 +1,37 @@
{
"name": "ordered-read-streams",
"version": "1.0.1",
"description": "Combines array of streams into one read stream in strict order",
"description": "Combines array of streams into one Readable stream in strict order.",
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)",
"contributors": [
"Blaine Bublitz <blaine.bublitz@gmail.com>",
"Artem Medeu <artem.medeusheyev@gmail.com>"
],
"repository": "gulpjs/ordered-read-streams",
"license": "MIT",
"engines": {
"node": ">= 10.13.0"
},
"files": [
"LICENSE",
"index.js"
],
"scripts": {
"lint": "eslint .",
"pretest": "npm run lint",
"test": "nyc mocha --async-only"
},
"repository": "gulpjs/ordered-read-streams",
"author": "Gulp Team <team@gulpjs.com> (https://gulpjs.com/)",
"license": "MIT",
"engines": {
"node": ">= 10.13.0"
},
"dependencies": {
"readable-stream": "^3.6.0"
"streamx": "^2.12.5"
},
"devDependencies": {
"eslint": "^7.32.0",
"eslint-config-gulp": "^5.0.1",
"eslint-plugin-node": "^11.1.0",
"expect": "^27.4.2",
"mississippi": "^4.0.0",
"mocha": "^8.4.0",
"nyc": "^15.1.0",
"through2": "^4.0.2"
"readable-stream": "^3.6.0"
},
"nyc": {
"reporter": [
Expand All @@ -37,5 +41,13 @@
},
"prettier": {
"singleQuote": true
}
},
"keywords": [
"streams",
"ordered",
"group",
"combine",
"streamx",
"readable"
]
}

0 comments on commit edb9aa1

Please sign in to comment.