Permalink
Browse files

CursorStream: No stack overflow on any size result

Consider configuring the number of docs to process at a time
through the query#batchSize option / method
which defaults to 1000.

    T.find().batchSize(2000).stream();

closes #929
  • Loading branch information...
1 parent dfdac5e commit 6f5feff80cc05fd9d2aa1d95a1949a13d0567d5b @aheckmann aheckmann committed Jun 6, 2012
Showing with 50 additions and 14 deletions.
  1. +50 −14 lib/querystream.js
View
@@ -25,6 +25,7 @@ function QueryStream (query) {
this._destroyed = null;
this._fields = null;
this._ticks = 0;
+ this._inline = T_INIT;
// give time to hook up events
var self = this;
@@ -52,6 +53,11 @@ QueryStream.prototype.readable;
QueryStream.prototype.paused;
+// trampoline flags
+var T_INIT = 0;
+var T_IDLE = 1;
+var T_CONT = 2;
+
/**
* Initialize the query.
* @private
@@ -81,27 +87,49 @@ QueryStream.prototype._init = function () {
}
/**
- * Pull the next document from the cursor.
- * @private
+ * _next
+ *
+ * Trampoline for pulling the next doc from cursor.
+ *
+ * @see __next
+ * @api private
*/
QueryStream.prototype._next = function () {
+ // avoid stack overflows with large result sets.
+ // trampoline instead of recursion.
+ var fn;
+ while (fn = this.__next()) fn.call(this);
+}
+
+/**
+ * __next
+ *
+ * Pull the next doc from the cursor.
+ *
+ * @see _next
+ * @api private
+ */
+
+QueryStream.prototype.__next = function () {
if (this.paused || this._destroyed) return;
var self = this;
+ self._inline = T_INIT;
- // nextTick is necessary to avoid stack overflows when
- // dealing with large result sets. yield occasionally.
- if (!(++this._ticks % 20)) {
- process.nextTick(function () {
- self._cursor.nextObject(function (err, doc) {
- self._onNextObject(err, doc);
- });
- });
+ self._cursor.nextObject(function (err, doc) {
+ self._onNextObject(err, doc);
+ });
+
+ // if onNextObject() was already called in this tick
+ // return ourselves to the trampoline.
+ if (T_CONT === this._inline) {
+ return this.__next;
} else {
- self._cursor.nextObject(function (err, doc) {
- self._onNextObject(err, doc);
- });
+ // onNextObject() hasn't fired yet. tell onNextObject
+ // that its ok to call _next b/c we are not within
+ // the trampoline anymore.
+ this._inline = T_IDLE;
}
}
@@ -130,7 +158,15 @@ QueryStream.prototype._onNextObject = function (err, doc) {
instance.init(doc, this.query, function (err) {
if (err) return self.destroy(err);
self.emit('data', instance);
- self._next();
+
+ // trampoline management
+ if (T_IDLE === self._inline) {
+ // no longer in trampoline. restart it.
+ self._next();
+ } else
+ // in a trampoline. tell __next that its
+ // ok to continue jumping.
+ self._inline = T_CONT;
});
}

0 comments on commit 6f5feff

Please sign in to comment.