Skip to content

Commit

Permalink
stream interface for cursors
Browse files Browse the repository at this point in the history
start of #614

need more tests
  • Loading branch information
aheckmann committed Nov 17, 2011
1 parent ded03a8 commit 2c3ae79
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 0 deletions.
158 changes: 158 additions & 0 deletions lib/cursorstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@

/**
* Module dependencies.
*/

var Stream = require('stream').Stream
var utils = require('./utils')


/**
* CursorStream
*
* Returns a stream interface for the `query`.
*
* @param {Query} query
* @return {Stream}
*/

function CursorStream (query) {
Stream.call(this);

this.query = query;
this.readable = true;
this._cursor = null;
this._destroyed = null;

// give time to hook up events
var self = this;
process.nextTick(function () {
self._init();
});
}

/**
* Inherit from Stream
* @private
*/

CursorStream.prototype.__proto__ = Stream.prototype;

/**
* Initialize the query.
* @private
*/

CursorStream.prototype._init = function () {
var query = this.query
, model = query.model
, options = query._optionsForExec(model)
, self = this

try {
query.cast(model);
} catch (err) {
return self.destroy(err);
}

self._fields = utils.clone(options.fields = query._fields);

model.collection.find(query._conditions, options, function (err, cursor) {
if (err) return self.destroy(err);
self._cursor = cursor;
self._next();
});
}

/**
* Pull the next document from the cursor.
* @private
*/

CursorStream.prototype._next = function () {
if (this._paused || this._destroyed) return;

var self = this;

// nextTick is necessary to avoid stack overflows when
// dealing with large result sets.
process.nextTick(function () {
self._cursor.nextObject(function (err, doc) {
self._onNextObject(err, doc);
});
});
}

/**
* Handle each document as its returned from the cursor
* transforming the raw `doc` from -native into a model
* instance.
*
* @private
*/

CursorStream.prototype._onNextObject = function (err, doc) {
if (err) return this.destroy(err);

// when doc is null we hit the end of the cursor
if (!doc) {
return this.destroy();
}

var instance = new this.query.model(undefined, this._fields);

// skip _id for pre-init hooks
delete instance._doc._id;

var self = this;
instance.init(doc, this.query, function (err) {
if (err) return self.destroy(err);
self.emit('data', instance);
self._next();
});
}

/**
* Pauses this stream.
*/

CursorStream.prototype.pause = function () {
this._paused = true;
}

/**
* Resumes this stream.
*/

CursorStream.prototype.resume = function () {
this._paused = false;
this._next();
}

/**
* Destroys the stream, closing the underlying
* cursor. No more events will be emitted.
*/

CursorStream.prototype.destroy = function (err) {
if (this._destroyed) return;
this._destroyed = true;
this.readable = false;

if (this._cursor) {
this._cursor.close();
}

if (err) {
this.emit('error', err);
}

this.emit('close');
}

// TODO - maybe implement the -native raw option to pass binary?
//CursorStream.prototype.setEncoding = function () {
//}


module.exports = exports = CursorStream;
15 changes: 15 additions & 0 deletions lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var utils = require('./utils')
, Document = require('./document')
, inGroupsOf = utils.inGroupsOf
, tick = utils.tick
, CursorStream = require('./cursorstream')

/**
* Query constructor
Expand Down Expand Up @@ -1234,6 +1235,7 @@ Query.prototype.remove = function (callback) {

/**
* Sets population options.
* @api public
*/

Query.prototype.populate = function (path, fields, conditions) {
Expand All @@ -1247,6 +1249,19 @@ Query.prototype.populate = function (path, fields, conditions) {
return this;
};

/**
* Returns a stream interface
*
* Example:
* Thing.find({ name: /^hello/ }).stream().pipe(res)
*
* @api public
*/

Query.prototype.stream = function stream () {
return new CursorStream(this);
}

/**
* Exports.
*/
Expand Down
76 changes: 76 additions & 0 deletions test/model.stream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@

/**
* Test dependencies.
*/

var start = require('./common')
, should = require('should')
, mongoose = start.mongoose
, random = require('../lib/utils').random
, Query = require('../lib/query')
, Schema = mongoose.Schema
, SchemaType = mongoose.SchemaType
, CastError = SchemaType.CastError
, ObjectId = Schema.ObjectId
, MongooseBuffer = mongoose.Types.Buffer
, DocumentObjectId = mongoose.Types.ObjectId
, fs = require('fs')

var names = fs.readFileSync(__dirname + '/testnames.txt', 'utf8').split(' ')

/**
* Setup.
*/

var Person = new Schema({
name: String
});

mongoose.model('PersonForStream', Person);
var collection = 'personforstream_' + random();

;(function setup () {
var db = start()
, P = db.model('PersonForStream', collection)

var people = names.map(function (name) {
return { name: name };
});

P.create(people, function (err) {
should.strictEqual(null, err);
db.close();
assignExports();
});
})()

function assignExports () {
exports['cursor stream'] = function () {
var db = start()
, P = db.model('PersonForStream', collection)
, i = 0

var stream = P.find({}).stream();
stream.on('data', function (doc) {
console.error(doc);
if (!(++i % 100)) {
console.error('pausing...');
stream.pause();
setTimeout(function () {
console.error('... resuming');
stream.resume();
}, 1000);
}
});
stream.on('error', function (err) {
console.error('ERROR!');
console.error(err.stack);
});
stream.on('close', function () {
console.error('closed');
P.remove(function () {
db.close();
});
});
}
}
1 change: 1 addition & 0 deletions test/testnames.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Jacob Ethan Michael Jayden William Alexander Noah Daniel Aiden Anthony Joshua Mason Christopher Andrew David Matthew Logan Elijah James Joseph Gabriel Benjamin Ryan Samuel Jackson John Nathan Jonathan Christian Liam Dylan Landon Caleb Tyler Lucas Evan Gavin Nicholas Isaac Brayden Luke Angel Brandon Jack Isaiah Jordan Owen Carter Connor Justin Jose Jeremiah Julian Robert Aaron Adrian Wyatt Kevin Hunter Cameron Zachary Thomas Charles Austin Eli Chase Henry Sebastian Jason Levi Xavier Ian Colton Dominic Juan Cooper Josiah Luis Ayden Carson Adam Nathaniel Brody Tristan Diego Parker Blake Oliver Cole Carlos Jaden Jesus Alex Aidan Eric Hayden Bryan Max Jaxon Brian Bentley Alejandro Sean Nolan Riley Kaden Kyle Micah Vincent Antonio Colin Bryce Miguel Giovanni Timothy Jake Kaleb Steven Caden Bryson Damian Grayson Kayden Jesse Brady Ashton Richard Victor Patrick Marcus Preston Joel Santiago Maxwell Ryder Edward Miles Hudson Asher Devin Elias Jeremy Ivan Jonah Easton Jace Oscar Collin Peyton Leonardo Cayden Gage Eduardo Emmanuel Grant Alan Conner Cody Wesley Kenneth Mark Nicolas Malachi George Seth Kaiden Trevor Jorge Derek Jude Braxton Jaxson Sawyer Jaiden Omar Tanner Travis Paul Camden Maddox Andres Cristian Rylan Josue Roman Bradley Axel Fernando Garrett Javier Damien Peter Leo Abraham Ricardo Francisco Lincoln Erick Drake Shane Cesar Stephen Jaylen Tucker Kai Landen Braden Mario Edwin Avery Manuel Trenton Ezekiel Kingston Calvin Edgar Johnathan Donovan Alexis Israel Mateo Silas Jeffrey Weston Raymond Hector Spencer Andre Brendan Zion Griffin Lukas Maximus Harrison Andy Braylon Tyson Shawn Sergio Zane Emiliano Jared Ezra Charlie Keegan Chance Drew Troy Greyson Corbin Simon Clayton Myles Xander Dante Erik Rafael Martin Dominick Dalton Cash Skyler Theodore Marco Caiden Johnny Ty Gregory Kyler Roberto Brennan Luca Emmett Kameron Declan Quinn Jameson Amir Bennett Colby Pedro Emanuel Malik Graham Dean Jasper Everett Aden Dawson Angelo Reid Abel Dakota Zander Paxton Ruben Judah Jayce Jakob Finn Elliot Frank Lane Fabian Dillon Brock Derrick Emilio Joaquin Marcos Ryker Anderson Grady Devon Elliott Holden Amari Dallas Corey Danny Cruz Lorenzo Allen Trey Leland Armando Rowan Taylor Cade Colt Felix Adan Jayson Tristen Julius Raul Braydon Zayden Julio Nehemiah Darius Ronald Louis Trent Keith Payton Enrique Jax Randy Scott Desmond Gerardo Jett Dustin Phillip Beckett Ali Romeo Kellen Cohen Pablo Ismael Jaime Brycen Larry Kellan Keaton Gunner Braylen Brayan Landyn Walter Jimmy Marshall Beau Saul Donald Esteban Karson Reed Phoenix Brenden Tony Kade Jamari Jerry Mitchell Colten Arthur Brett Dennis Rocco Jalen Tate Chris Quentin Titus Casey Brooks Izaiah Mathew King Philip Zackary Darren Russell Gael Albert Braeden Dane Gustavo Kolton Cullen Jay Rodrigo Alberto Leon Alec Damon Arturo Waylon Milo Davis Walker Moises Kobe Curtis Matteo August Mauricio Marvin Emerson Maximilian Reece Orlando River Bryant Issac Yahir Uriel Hugo Mohamed Enzo Karter Lance Porter Maurice Leonel Zachariah Ricky Joe Johan Nikolas Dexter Jonas Justice Knox Lawrence Salvador Alfredo Gideon Maximiliano Nickolas Talon Byron Orion Solomon Braiden Alijah Kristopher Rhys Gary Jacoby Davion Jamarion Pierce Sam Cason Noel Ramon Kason Mekhi Shaun Warren Douglas Ernesto Ibrahim Armani Cyrus Quinton Isaias Reese Jaydon Ryland Terry Frederick Chandler Jamison Deandre Dorian Khalil Ari Franklin Maverick Amare Muhammad Ronan London Eddie Moses Roger Aldo Nasir Demetrius Adriel Brodie Kelvin Morgan Tobias Ahmad Keagan Prince Trace Alvin Giovani Kendrick Malcolm Skylar Conor Camron Abram Jonathon Bruce Noe Quincy Rohan Ahmed Nathanael Barrett Remington Kamari Kristian Kieran Finnegan Boston Xzavier Chad Guillermo Uriah Archer Rodney Gunnar Micheal Ulises Bobby Aaden Kamden Roy Kane Kasen Julien Ezequiel Lucian Atticus Javon Melvin Jeffery Terrance Nelson Aarav Carl Malakai Jadon Triston Harley Jon Kian Alonzo Cory Marc Moshe Gianni Kole Dayton Jermaine Asa Wilson Felipe Kale Terrence Nico Dominik Tommy Kendall Cristopher Isiah Finley Tristin Cannon Mohammed Wade Kash Marlon Ariel Madden Rhett Jase Layne Memphis Allan Jamal Nash Jessie Joey Reginald Giovanny Lawson Zaiden Ace Korbin Rashad Will Urijah Billy Aron Brennen Branden Leonard Rene Kenny Tomas Willie Darian Kody Brendon Aydan Alonso Blaine Arjun Raiden Layton Marquis Sincere Terrell Channing Chace Iker Mohammad Jordyn Messiah Omari Santino Sullivan Brent Raphael Deshawn Elisha Harry Luciano Jefferson Jaylin Ray Yandel Aydin Craig Tristian Zechariah Bently Francis Toby Tripp Kylan Semaj Alessandro Alexzander Lee Ronnie Gerald Dwayne Jadiel Javion Markus Kolby Neil Stanley Makai Davin Teagan Cale Harper Callen Ben Kaeden Clark Jamie Damarion Davian Deacon Jairo Kareem Damion Jamir Aidyn Lamar Duncan Matias Rex Jaeden Jasiah Jorden Vicente Aryan Case Tyrone Yusuf Gavyn Lewis Rogelio Zayne Giancarlo Osvaldo Rolando Camren Luka Rylee Cedric Jensen Soren Darwin Draven Maxim Ellis Nikolai Bradyn Mathias Zackery Zavier Emery Brantley Rudy Trevon Alfonso Beckham Darrell Harold Jerome Daxton Royce Jaylon Rory Rodolfo Tatum Bruno Sterling Gauge Van Hamza Ayaan Rayan Zachery Keenan Jagger Heath Jovani Killian Dax Junior Misael Roland Ramiro Vance Alvaro Bode Conrad Eugene Augustus Carmelo Adrien Kamron Gilberto Johnathon Kolten Wayne Zain Quintin Steve Tyrell Niko Antoine Hassan Jean Coleman Elian Frankie Valentin Adonis Jamar Jaxton Kymani Bronson Clay Freddy Jeramiah Kayson Hank Abdiel Efrain Leandro Yosef Aditya Ean Konnor Sage Samir Todd Lyric Deven Derick Jovanni Valentino Demarcus Ishaan Konner Kyson Deangelo Matthias Maximo Sidney Benson Dilan Gilbert Kyron Xavi Bo Sylas Fisher Marcel Franco Jaron Alden Agustin Bentlee Malaki Westin Cael Jerimiah Randall Blaze Branson Brogan Callum Dominique Justus Krish Rey Marcelo Ronin Odin Camryn Jair Izayah Brice Jabari Mayson Isai Tyree Mike Samson Stefan Devan Emmitt Fletcher Jaidyn Remy Casen Houston Santos Seamus Jedidiah Major Vincenzo Gaige Winston Aedan Deon Jaycob Kamryn Quinten Darnell Jaxen Deegan Landry Humberto Jadyn Salvatore Aarush Edison Kadyn Abdullah Alfred Ameer Carsen Jaydin Lionel Howard Davon Eden Trystan Zaire Johann Antwan Bodhi Jayvion Marley Theo Bridger Donte Lennon Irvin Yael Jencarlos Arnav Devyn Ernest Ignacio Leighton Leonidas Octavio Rayden Hezekiah Ross Hayes Lennox Nigel Vaughn Anders Keon Dario Leroy Cortez Darryl Jakobe Koen Darien Haiden Legend Tyrese Zaid Dangelo Maxx Pierre Camdyn Chaim Damari Sonny Antony Blaise Cain Pranav Roderick Yadiel Eliot Hugh Broderick Lathan Makhi Ronaldo Ralph Zack Kael Keyon Kingsley Talan Yair Demarion Gibson Reagan Cristofer Daylen Jordon Dashawn

0 comments on commit 2c3ae79

Please sign in to comment.