diff --git a/lib/query.js b/lib/query.js index fe7f8509a71..5033f6220b4 100644 --- a/lib/query.js +++ b/lib/query.js @@ -2350,13 +2350,24 @@ PopulateOptions.prototype.constructor = Object; * // the stream is closed * }); * + * ####Valid options + * + * - transform: optional function which accepts a mongoose document. The return value of the function will be emitted. + * + * ####Example + * + * // JSON.stringify all documents before emitting + * var stream = Thing.find().stream({ transform: JSON.stringify }); + * stream.pipe(writeStream); + * * @return {QueryStream} + * @param {Object} [options] * @see QueryStream * @api public */ -Query.prototype.stream = function stream () { - return new QueryStream(this); +Query.prototype.stream = function stream (opts) { + return new QueryStream(this, opts); } // helpers diff --git a/lib/querystream.js b/lib/querystream.js index 1d5214b2758..064bd7a31ec 100644 --- a/lib/querystream.js +++ b/lib/querystream.js @@ -5,6 +5,7 @@ var Stream = require('stream').Stream var utils = require('./utils') +var K = function(k){ return k } /** * Provides a Node.js 0.8 style [ReadStream](http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream) interface for Queries. @@ -36,7 +37,7 @@ var utils = require('./utils') * @api public */ -function QueryStream (query) { +function QueryStream (query, options) { Stream.call(this); this.query = query; @@ -48,6 +49,9 @@ function QueryStream (query) { this._buffer = null; this._inline = T_INIT; this._running = false; + this._transform = options && 'function' == typeof options.transform + ? options.transform + : K; // give time to hook up events var self = this; @@ -197,7 +201,7 @@ QueryStream.prototype._onNextObject = function _onNextObject (err, doc) { } if (this.query.options && true === this.query.options.lean) { - this.emit('data', doc); + this.emit('data', this._transform(doc)); // trampoline management if (T_IDLE === this._inline) { @@ -216,7 +220,7 @@ QueryStream.prototype._onNextObject = function _onNextObject (err, doc) { var self = this; instance.init(doc, this.query, function (err) { if (err) return self.destroy(err); - self.emit('data', instance); + self.emit('data', self._transform(instance)); // trampoline management if (T_IDLE === self._inline) { diff --git a/test/model.stream.test.js b/test/model.stream.test.js index f97d249fa81..606a9d23e6a 100644 --- a/test/model.stream.test.js +++ b/test/model.stream.test.js @@ -223,11 +223,12 @@ describe('query stream:', function(){ , filename = '/tmp/_mongoose_stream_out.txt' , out = fs.createWriteStream(filename) - var stream = P.find().sort('name').limit(20).stream(); + var opts = { transform: JSON.stringify } + var stream = P.find().sort('name').limit(20).stream(opts); stream.pipe(out); stream.on('error', cb); - stream.on('close', cb); + out.on('close', cb); function cb (err) { db.close();