Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream should unpipe from Readable on destroy. #361

Merged
merged 2 commits into from
Aug 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,22 @@ _.seq = function () {
};
};

function pipeReadable(xs, stream) {
// write any errors into the stream
xs.on('error', writeStreamError);
xs.pipe(stream);

// TODO: Replace with onDestroy in v3.
stream._destructors.push(function () {
xs.unpipe(stream);
xs.removeListener('error', writeStreamError);
});

function writeStreamError(err) {
stream.write(new StreamError(err));
}
}

function promiseGenerator(promise) {
return _(function (push) {
promise.then(function (value) {
Expand Down Expand Up @@ -561,11 +577,7 @@ function Stream(/*optional*/xs, /*optional*/ee, /*optional*/mappingHint) {
else if (_.isObject(xs)) {
// check to see if we have a readable stream
if (_.isFunction(xs.on) && _.isFunction(xs.pipe)) {
// write any errors into the stream
xs.on('error', function (err) {
self.write(new StreamError(err));
});
xs.pipe(self);
pipeReadable(xs, self);
}
else if (_.isFunction(xs.then)) {
//probably a promise
Expand Down
21 changes: 20 additions & 1 deletion test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ exports['constructor'] = {
test.strictEqual(s, _(s));
test.done();
},
'from Readable stream with next function - issue #303': function (test) {
'from Readable with next function - issue #303': function (test) {
var Readable = Stream.Readable;

var rs = new Readable;
Expand All @@ -424,6 +424,25 @@ exports['constructor'] = {
.toArray(this.tester(['a', 'b', 'c'], test));
test.done();
},
'from Readable - unpipes on destroy': function (test) {
var rs = streamify([1, 2, 3]);

var s = _(rs);
s.pull(valueEquals(test, 1));
s.destroy();

var writtenTo = false;
var write = s.write;
s.write = function () {
console.log('writtenTo');
writtenTo = true;
write.call(s, arguments);
};
s.emit('drain');

test.ok(!writtenTo, 'Drain should not cause write to be called.');
test.done();
},
'throws error for unsupported object': function (test) {
test.throws(function () {
_({}).done(function () {});
Expand Down