Skip to content

Commit

Permalink
added; QueryStream transform option
Browse files Browse the repository at this point in the history
closes #1346
  • Loading branch information
aheckmann committed Mar 14, 2013
1 parent 87fa412 commit 6eb5783
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
15 changes: 13 additions & 2 deletions lib/query.js
Expand Up @@ -2350,13 +2350,24 @@ PopulateOptions.prototype.constructor = Object;
* // the stream is closed
* });
*
* ####Valid options
*
* - transform: optional function which accepts a mongoose document. The return value of the function will be emitted.
*
* ####Example
*
* // JSON.stringify all documents before emitting
* var stream = Thing.find().stream({ transform: JSON.stringify });
* stream.pipe(writeStream);
*
* @return {QueryStream}
* @param {Object} [options]
* @see QueryStream
* @api public
*/

Query.prototype.stream = function stream () {
return new QueryStream(this);
Query.prototype.stream = function stream (opts) {
return new QueryStream(this, opts);
}

// helpers
Expand Down
10 changes: 7 additions & 3 deletions lib/querystream.js
Expand Up @@ -5,6 +5,7 @@

var Stream = require('stream').Stream
var utils = require('./utils')
var K = function(k){ return k }

/**
* Provides a Node.js 0.8 style [ReadStream](http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream) interface for Queries.
Expand Down Expand Up @@ -36,7 +37,7 @@ var utils = require('./utils')
* @api public
*/

function QueryStream (query) {
function QueryStream (query, options) {
Stream.call(this);

this.query = query;
Expand All @@ -48,6 +49,9 @@ function QueryStream (query) {
this._buffer = null;
this._inline = T_INIT;
this._running = false;
this._transform = options && 'function' == typeof options.transform
? options.transform
: K;

// give time to hook up events
var self = this;
Expand Down Expand Up @@ -197,7 +201,7 @@ QueryStream.prototype._onNextObject = function _onNextObject (err, doc) {
}

if (this.query.options && true === this.query.options.lean) {
this.emit('data', doc);
this.emit('data', this._transform(doc));

// trampoline management
if (T_IDLE === this._inline) {
Expand All @@ -216,7 +220,7 @@ QueryStream.prototype._onNextObject = function _onNextObject (err, doc) {
var self = this;
instance.init(doc, this.query, function (err) {
if (err) return self.destroy(err);
self.emit('data', instance);
self.emit('data', self._transform(instance));

// trampoline management
if (T_IDLE === self._inline) {
Expand Down
5 changes: 3 additions & 2 deletions test/model.stream.test.js
Expand Up @@ -223,11 +223,12 @@ describe('query stream:', function(){
, filename = '/tmp/_mongoose_stream_out.txt'
, out = fs.createWriteStream(filename)

var stream = P.find().sort('name').limit(20).stream();
var opts = { transform: JSON.stringify }
var stream = P.find().sort('name').limit(20).stream(opts);
stream.pipe(out);

stream.on('error', cb);
stream.on('close', cb);
out.on('close', cb);

function cb (err) {
db.close();
Expand Down

0 comments on commit 6eb5783

Please sign in to comment.