Skip to content

Commit

Permalink
Merge pull request #361 from vqvu/unpipe-on-destroy
Browse files Browse the repository at this point in the history
Stream should unpipe from Readable on destroy.
  • Loading branch information
vqvu committed Aug 29, 2015
2 parents efef260 + fa6acb3 commit 746b1fa
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
22 changes: 17 additions & 5 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,22 @@ _.seq = function () {
};
};

function pipeReadable(xs, stream) {
// write any errors into the stream
xs.on('error', writeStreamError);
xs.pipe(stream);

// TODO: Replace with onDestroy in v3.
stream._destructors.push(function () {
xs.unpipe(stream);
xs.removeListener('error', writeStreamError);
});

function writeStreamError(err) {
stream.write(new StreamError(err));
}
}

function promiseGenerator(promise) {
return _(function (push) {
promise.then(function (value) {
Expand Down Expand Up @@ -561,11 +577,7 @@ function Stream(/*optional*/xs, /*optional*/ee, /*optional*/mappingHint) {
else if (_.isObject(xs)) {
// check to see if we have a readable stream
if (_.isFunction(xs.on) && _.isFunction(xs.pipe)) {
// write any errors into the stream
xs.on('error', function (err) {
self.write(new StreamError(err));
});
xs.pipe(self);
pipeReadable(xs, self);
}
else if (_.isFunction(xs.then)) {
//probably a promise
Expand Down
21 changes: 20 additions & 1 deletion test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ exports['constructor'] = {
test.strictEqual(s, _(s));
test.done();
},
'from Readable stream with next function - issue #303': function (test) {
'from Readable with next function - issue #303': function (test) {
var Readable = Stream.Readable;

var rs = new Readable;
Expand All @@ -424,6 +424,25 @@ exports['constructor'] = {
.toArray(this.tester(['a', 'b', 'c'], test));
test.done();
},
'from Readable - unpipes on destroy': function (test) {
var rs = streamify([1, 2, 3]);

var s = _(rs);
s.pull(valueEquals(test, 1));
s.destroy();

var writtenTo = false;
var write = s.write;
s.write = function () {
console.log('writtenTo');
writtenTo = true;
write.call(s, arguments);
};
s.emit('drain');

test.ok(!writtenTo, 'Drain should not cause write to be called.');
test.done();
},
'throws error for unsupported object': function (test) {
test.throws(function () {
_({}).done(function () {});
Expand Down

0 comments on commit 746b1fa

Please sign in to comment.