Skip to content

Commit

Permalink
Convert pullstream to new stream API
Browse files Browse the repository at this point in the history
Update pullstream for the new stream API in isaacs/readable-stream for
improved speed and conciseness. PullStream now extends PassThrough
stream and uses PassThrough stream's internal buffer to replace the
WritableStreamBuffer.
  • Loading branch information
Evan Oxfeld committed Oct 29, 2012
1 parent 43bdcc2 commit 1449f1f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 298 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -88,24 +88,25 @@ ps.pipe(100, out).on('end', function() {
``` ```


<a name="pullStreamWrite" /> <a name="pullStreamWrite" />
### ps.write(data) ### ps.write(data, [encoding])


Writes data to input side of a pull stream. Writes data to input side of a pull stream.


__Arguments__ __Arguments__


* data - Buffer to write to the input side of the pull stream. * data - Buffer or string to write to the input side of the pull stream.
* encoding (optional) - Encoding to use if data is a string. If not specified 'utf8' is used.


__Example__ __Example__


```javascript ```javascript
var ps = new PullStream(); var ps = new PullStream();


ps.pull(5, function(err, data) { ps.pull(5, function(err, data) {
console.log(data.toString('utf8')); console.log(data.toString('ascii'));
}); });


ps.write(new Buffer('Hello World', 'utf8')); ps.write('Hello World', 'ascii');
``` ```


<a name="pullStreamEnd" /> <a name="pullStreamEnd" />
Expand All @@ -122,7 +123,7 @@ ps.pull(5, function(err, data) {
console.log(data.toString('utf8')); console.log(data.toString('utf8'));
}); });


ps.write(new Buffer('Hello World', 'utf8')); ps.write('Hello World');
ps.end(); ps.end();
``` ```


Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
"pull" "pull"
], ],
"devDependencies": { "devDependencies": {
"nodeunit": "~0.7.4" "nodeunit": "~0.7.4",
"stream-buffers": "~0.2.3",
"async": "~0.1.22"
}, },
"dependencies": { "dependencies": {
"over": "~0.0.5", "over": "~0.0.5",
"stream-buffers": "~0.2.3" "readable-stream": "0.0.3"
} }
} }
183 changes: 44 additions & 139 deletions pullstream.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,78 +3,23 @@
module.exports = PullStream; module.exports = PullStream;


var inherits = require("util").inherits; var inherits = require("util").inherits;
var Stream = require('stream').Stream; var PassThrough = require('readable-stream/passthrough');
var over = require('over'); var over = require('over');
var streamBuffers = require("stream-buffers");


function PullStream(opts) { function PullStream(opts) {
var self = this; var self = this;
Stream.apply(this);
this.opts = opts || {}; this.opts = opts || {};
this.opts.minBufferSize = this.opts.minBufferSize | (1 * 1024 * 1024); PassThrough.call(this, opts);
this.opts.maxBufferSize = this.opts.maxBufferSize | (10 * 1024 * 1024); this._flushed = false;
this.readable = false; this._writesFinished = false;
this.writable = true; this.once('finish', function() {
this._buffer = new streamBuffers.WritableStreamBuffer(); self._writesFinished = true;
this.paused = false; if (self._flushed) {
this._positionInStream = 0; process.nextTick(self._finish.bind(self));
this._recvEnd = false;
this._serviceRequests = null;
this.eof = false;
this._srcStream = null;
this.on('pipe', function (srcStream) {
self._srcStream = srcStream;
});
}
inherits(PullStream, Stream);

PullStream.prototype._sendPauseBuffer = function () {
this.process();
};

PullStream.prototype.write = function (data) {
this._buffer.write(data);
if (this._buffer.maxSize() > this.opts.maxBufferSize && this._srcStream) {
this._srcStream.pause();
}
this.process();
return true;
};

PullStream.prototype.end = function (data) {
this.data = function () {
throw new Error("End already called");
};
this.end = function () {
throw new Error("End already called");
};

this._recvEnd = true;
if (data) {
this._buffer.write(data);
}
this.process();
return true;
};

PullStream.prototype.process = function () {
if (this._recvEnd && this._serviceRequests === null) {
this._finish();
} else {
if (this._serviceRequests) {
this._serviceRequests();
} }
}
};

PullStream.prototype._finish = function () {
var self = this;
process.nextTick(function () {
self.emit('end');
self.emit('close');
}); });
this._finish = function () {}; }
}; inherits(PullStream, PassThrough);


PullStream.prototype.pull = over([ PullStream.prototype.pull = over([
[over.numberOptionalWithDefault(null), over.func, function (len, callback) { [over.numberOptionalWithDefault(null), over.func, function (len, callback) {
Expand All @@ -83,112 +28,72 @@ PullStream.prototype.pull = over([
} }


var self = this; var self = this;
this._serviceRequests = pullServiceRequest;
pullServiceRequest(); pullServiceRequest();


function pullServiceRequest() { function pullServiceRequest() {
if (self.paused) { if (self._flushed) {
return; return callback(new Error('End of Stream'));
} }
self._serviceRequests = null;


if ((len !== null && self._buffer.size() >= len) || (len === null && self._recvEnd)) { var data = self.read(len || undefined);
self._serviceRequests = null; if (data) {
var results = self._buffer.getContents(len); process.nextTick(callback.bind(null, null, data));
self._resumeSrcStream();
results.posInStream = self._positionInStream;
self._positionInStream += results.length;
process.nextTick(function () {
callback(null, results);

if (self._recvEnd && self._buffer.size() === 0) {
self._finish();
}
});
} else if (self._recvEnd && self._buffer.size() === 0) {
callback(new Error('End of Stream'));
self._finish();
} else { } else {
self._resumeSrcStream(); self._serviceRequests = pullServiceRequest;
self.once('readable', self._serviceRequests);
} }
} }
}] }]
]); ]);


PullStream.prototype.pipe = over([ PullStream.prototype.pipe = over([
[over.numberOptionalWithDefault(null), over.object, function (len, destStream) { [over.numberOptionalWithDefault(null), over.object, function (len, destStream) {
if (!len) {
return PassThrough.prototype.pipe.call(this, destStream);
}

if (len === 0) { if (len === 0) {
return destStream.end(); return destStream.end();
} }


var self = this; var self = this;
var lenLeft = len;
this._serviceRequests = pipeServiceRequest;
pipeServiceRequest(); pipeServiceRequest();


function pipeServiceRequest() { function pipeServiceRequest() {
if (self.paused) { self._serviceRequests = null;
return; var data = self.read(len);
} if (data) {

destStream.write(data);
var lenToRemove; destStream.end();
if (lenLeft === null) {
lenToRemove = self._buffer.size();
} else { } else {
lenToRemove = Math.min(self._buffer.size(), lenLeft); self._serviceRequests = pipeServiceRequest;
} self.once('readable', self._serviceRequests);
if (lenToRemove > 0) {
var results = self._buffer.getContents(lenToRemove);
self._resumeSrcStream();
results.posInStream = self._positionInStream;
self._positionInStream += results.length;
if (lenLeft !== null) {
lenLeft -= lenToRemove;
if (lenLeft === 0) {
self._serviceRequests = null;
}
}
destStream.write(results);
if (lenLeft === 0) {
destStream.end();
destStream = null;
}
} else {
self._resumeSrcStream();
}

if (self._recvEnd && self._buffer.size() === 0) {
if (destStream) {
destStream.end();
destStream = null;
}
self._finish();
} }
} }


return destStream; return destStream;
}] }]
]); ]);


PullStream.prototype._resumeSrcStream = function () { PullStream.prototype._flush = function (outputFn, callback) {
if (this._srcStream && this._buffer.size() < this.opts.minBufferSize) { var self = this;
this._srcStream.resume();
if (this._readableState.length > 0) {
return process.nextTick(self._flush.bind(self, outputFn, callback));
} }
};


PullStream.prototype.pause = function () { this._flushed = true;
this.paused = true; if (this._writesFinished) {
if (this._srcStream && this._srcStream.pause) { process.nextTick(self._finish.bind(self));
this._srcStream.pause();
} }
}; };


PullStream.prototype.resume = function () { PullStream.prototype._finish = function () {
var self = this; var self = this;
process.nextTick(function () { if (this._serviceRequests) {
self.paused = false; this._serviceRequests();
self._sendPauseBuffer(); } else {
if (self._srcStream && self._srcStream.resume) { process.nextTick(self.emit.bind(self, 'end'));
self._srcStream.resume(); }
} };
});
};
Loading

0 comments on commit 1449f1f

Please sign in to comment.