Skip to content

Commit

Permalink
fixed; stream() issues ...
Browse files Browse the repository at this point in the history
fix streams prematurely dying
properly handles pause/resume within getMore
prevent data events emitted after close

closes #1092
  • Loading branch information
aheckmann committed Sep 7, 2012
1 parent e24b60d commit 857a22e
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions lib/querystream.js
Expand Up @@ -24,8 +24,9 @@ function QueryStream (query) {
this._cursor = null;
this._destroyed = null;
this._fields = null;
this._ticks = 0;
this._buffer = null;
this._inline = T_INIT;
this._waiting = false;

// give time to hook up events
var self = this;
Expand Down Expand Up @@ -95,7 +96,19 @@ QueryStream.prototype._init = function () {
* @api private
*/

QueryStream.prototype._next = function () {
QueryStream.prototype._next = function _next () {
if (this.paused || this._destroyed) return;

if (this._buffer && this._buffer.length) {
var arg;
while (!this.paused && !this._destroyed && (arg = this._buffer.shift())) {
this._onNextObject.apply(this, arg);
}
}

// account for possible nextObjects calling user code
if (this.paused || this._destroyed) return;

// avoid stack overflows with large result sets.
// trampoline instead of recursion.
var fn;
Expand All @@ -112,12 +125,13 @@ QueryStream.prototype._next = function () {
*/

QueryStream.prototype.__next = function () {
if (this.paused || this._destroyed) return;
if (this._waiting || this.paused || this._destroyed) return;

var self = this;
self._inline = T_INIT;

self._cursor.nextObject(function (err, doc) {
self._cursor.nextObject(function cursorcb (err, doc) {
self._waiting = false;
self._onNextObject(err, doc);
});

Expand All @@ -130,6 +144,7 @@ QueryStream.prototype.__next = function () {
// that its ok to call _next b/c we are not within
// the trampoline anymore.
this._inline = T_IDLE;
this._waiting = true;
}
}

Expand All @@ -141,8 +156,15 @@ QueryStream.prototype.__next = function () {
* @private
*/

QueryStream.prototype._onNextObject = function (err, doc) {
QueryStream.prototype._onNextObject = function _onNextObject (err, doc) {
if (this._destroyed) return;

if (this.paused) {
this._buffer || (this._buffer = []);
this._buffer.push([err, doc]);
return;
}

if (err) return this.destroy(err);

// when doc is null we hit the end of the cursor
Expand Down

0 comments on commit 857a22e

Please sign in to comment.