Permalink
Browse files

Run outstanding service requests on drain

Long running streams piped to pullstream may have too much data
buffered internally to emit readable again, so we have to run
outstanding service requests on both 'readable' and 'drain'.
  • Loading branch information...
1 parent 1449f1f commit d726614eeefd7d4dd416526f9f82aeef817c84a4 Evan Oxfeld committed Oct 30, 2012
Showing with 62 additions and 13 deletions.
  1. +23 −13 pullstream.js
  2. +39 −0 test/pullStreamTest.js
View
@@ -10,14 +10,18 @@ function PullStream(opts) {
var self = this;
this.opts = opts || {};
PassThrough.call(this, opts);
- this._flushed = false;
- this._writesFinished = false;
this.once('finish', function() {
self._writesFinished = true;
if (self._flushed) {
process.nextTick(self._finish.bind(self));
}
});
+ this.on('readable', function() {
+ self._process();
+ });
+ this.on('drain', function() {
+ self._process();
+ });
}
inherits(PullStream, PassThrough);
@@ -31,17 +35,16 @@ PullStream.prototype.pull = over([
pullServiceRequest();
function pullServiceRequest() {
+ self._serviceRequests = null;
if (self._flushed) {
return callback(new Error('End of Stream'));
}
- self._serviceRequests = null;
var data = self.read(len || undefined);
if (data) {
process.nextTick(callback.bind(null, null, data));
} else {
self._serviceRequests = pullServiceRequest;
- self.once('readable', self._serviceRequests);
}
}
}]
@@ -68,32 +71,39 @@ PullStream.prototype.pipe = over([
destStream.end();
} else {
self._serviceRequests = pipeServiceRequest;
- self.once('readable', self._serviceRequests);
}
}
return destStream;
}]
]);
+PullStream.prototype._process = function () {
+ if (this._serviceRequests) {
+ this._serviceRequests();
+ }
+};
+
PullStream.prototype._flush = function (outputFn, callback) {
var self = this;
-
if (this._readableState.length > 0) {
return process.nextTick(self._flush.bind(self, outputFn, callback));
}
this._flushed = true;
- if (this._writesFinished) {
- process.nextTick(self._finish.bind(self));
- }
+ return process.nextTick(function() {
+ if (self._writesFinished) {
+ self._finish(callback);
+ } else {
+ callback();
+ }
+ });
};
-PullStream.prototype._finish = function () {
- var self = this;
+PullStream.prototype._finish = function (callback) {
+ callback = callback || function () {};
if (this._serviceRequests) {
this._serviceRequests();
- } else {
- process.nextTick(self.emit.bind(self, 'end'));
}
+ process.nextTick(callback);
};
View
@@ -280,5 +280,44 @@ module.exports = {
}
t.done();
+ },
+
+ "pipe more bytes than the pullstream buffer size": function (t) {
+ t.expect(1);
+ var ps = new PullStream();
+ ps.on('end', function() {
+ sourceStream.destroy();
+ });
+
+ var aVals = "", bVals = "";
+ for (var i = 0; i < 20 * 1000; i++) {
+ aVals += 'a';
+ }
+ for (var i = 0; i < 180 * 1000; i++) {
+ bVals += 'b';
+ }
+ var combined = aVals + bVals;
+
+ var sourceStream = new streamBuffers.ReadableStreamBuffer({
+ frequency: 0,
+ chunkSize: 40 * 1024
+ });
+ sourceStream.put(aVals);
+
+ sourceStream.pipe(ps);
+
+ var writableStream = new streamBuffers.WritableStreamBuffer({
+ initialSize: 200 * 1000
+ });
+ writableStream.on('close', function () {
+ var str = writableStream.getContentsAsString('utf8');
+ t.equal(combined, str);
+ t.done();
+ });
+
+ ps.once('drain', function () {
+ ps.pipe(200 * 1000, writableStream);
+ process.nextTick(sourceStream.put.bind(null, bVals));
+ });
}
};

0 comments on commit d726614

Please sign in to comment.