diff --git a/benchmark/bson_benchmark.js b/benchmark/bson_benchmark.js index 4161df2084..884f9ea724 100644 --- a/benchmark/bson_benchmark.js +++ b/benchmark/bson_benchmark.js @@ -6,11 +6,11 @@ var BSON = require('../lib/mongodb').BSONNative.BSON, debug = require('util').debug, inspect = require('util').inspect; -var BSON = require('../lib/mongodb').BSONPure.BSON, - ObjectID = require('../lib/mongodb').BSONPure.ObjectID, - Code = require('../lib/mongodb').BSONPure.Code, - Long = require('../lib/mongodb').BSONPure.Long, - Binary = require('../lib/mongodb').BSONPure.Binary; +// var BSON = require('../lib/mongodb').BSONPure.BSON, +// ObjectID = require('../lib/mongodb').BSONPure.ObjectID, +// Code = require('../lib/mongodb').BSONPure.Code, +// Long = require('../lib/mongodb').BSONPure.Long, +// Binary = require('../lib/mongodb').BSONPure.Binary; var COUNT = 1000; var COUNT = 100; @@ -58,6 +58,9 @@ for(var i = 0; i < numberOfObjects; i++) { var x, start, end, j var objectBSON, objectJSON +// Allocate the return array (avoid concatinating everything) +var results = new Array(numberOfObjects); + console.log(COUNT + "x (objectBSON = BSON.serialize(object))") start = new Date @@ -68,11 +71,16 @@ start = new Date // console.dir(objects) for (j=COUNT; --j>=0; ) { - var objects = BSON.deserializeStream(data, 0, numberOfObjects); + var nextIndex = BSON.deserializeStream(data, 0, numberOfObjects, results, 0); } end = new Date var opsprsecond = COUNT / ((end - start)/1000); console.log("bson size (bytes): ", objectBSON.length); console.log("time = ", end - start, "ms -", COUNT / ((end - start)/1000), " ops/sec"); -console.log("MB/s = " + ((opsprsecond*objectBSON.length)/1024)); \ No newline at end of file +console.log("MB/s = " + ((opsprsecond*objectBSON.length)/1024)); + +// console.dir(nextIndex) +// console.dir(results) + + diff --git a/deps/nodeunit/bin/nodeunit b/deps/nodeunit/bin/nodeunit index 94ad0fb4e0..8db586ba97 100755 --- a/deps/nodeunit/bin/nodeunit +++ b/deps/nodeunit/bin/nodeunit @@ -1,4 +1,5 @@ -#!/usr/bin/env node +#!/usr/bin/env node +//--prof --trace-opt --trace-deopt --trace-bailout var fs = require('fs'), diff --git a/external-libs/bson/bson.cc b/external-libs/bson/bson.cc index eb854655fe..71b851ef79 100644 --- a/external-libs/bson/bson.cc +++ b/external-libs/bson/bson.cc @@ -1154,17 +1154,19 @@ Handle BSON::BSONDeserializeStream(const Arguments &args) { HandleScope scope; // At least 3 arguments required - if(args.Length() < 3) VException("Arguments required (Buffer(data), Number(index in data), Number(number of documents to deserialize), Object(optional))"); + if(args.Length() < 5) VException("Arguments required (Buffer(data), Number(index in data), Number(number of documents to deserialize), Array(results), Number(index in the array), Object(optional))"); // If the number of argumets equals 3 - if(args.Length() >= 3) { + if(args.Length() >= 5) { if(!Buffer::HasInstance(args[0])) return VException("First argument must be Buffer instance"); if(!args[1]->IsUint32()) return VException("Second argument must be a positive index number"); if(!args[2]->IsUint32()) return VException("Third argument must be a positive number of documents to deserialize"); + if(!args[3]->IsArray()) return VException("Fourth argument must be an array the size of documents to deserialize"); + if(!args[4]->IsUint32()) return VException("Sixth argument must be a positive index number"); } // If we have 4 arguments - if(args.Length() == 4 && !args[3]->IsObject()) return VException("Fourth argument must be an object with options"); + if(args.Length() == 6 && !args[5]->IsObject()) return VException("Fifth argument must be an object with options"); // Define pointer to data char *data; @@ -1172,6 +1174,7 @@ Handle BSON::BSONDeserializeStream(const Arguments &args) { Local obj = args[0]->ToObject(); uint32_t numberOfDocuments = args[2]->ToUint32()->Value(); uint32_t index = args[1]->ToUint32()->Value(); + uint32_t resultIndex = args[4]->ToUint32()->Value(); // Unpack the buffer variable #if NODE_MAJOR_VERSION == 0 && NODE_MINOR_VERSION < 3 @@ -1183,10 +1186,11 @@ Handle BSON::BSONDeserializeStream(const Arguments &args) { length = Buffer::Length(obj); #endif - // Create return Object to wrap data in - Local resultObject = Object::New(); - // Create an array for results - Local documents = Array::New(args[2]->ToUint32()->Value()); + // // Create return Object to wrap data in + // Local resultObject = Object::New(); + // // Create an array for results + // Local documents = Array::New(args[2]->ToUint32()->Value()); + Local documents = args[3]->ToObject(); for(uint32_t i = 0; i < numberOfDocuments; i++) { // Decode the size of the BSON data structure @@ -1196,16 +1200,16 @@ Handle BSON::BSONDeserializeStream(const Arguments &args) { Handle result = BSON::deserialize(data, index, NULL); // Add result to array - documents->Set(i, result); + documents->Set(i + resultIndex, result); // Adjust the index for next pass index = index + size; } // Add objects to the result Object - resultObject->Set(String::New("index"), Uint32::New(index)); - resultObject->Set(String::New("documents"), documents); - return scope.Close(resultObject); + // resultObject->Set(String::New("index"), Uint32::New(index)); + // resultObject->Set(String::New("documents"), documents); + return scope.Close(Uint32::New(index)); } Handle BSON::BSONDeserialize(const Arguments &args) { diff --git a/lib/mongodb/bson/bson.js b/lib/mongodb/bson/bson.js index afdf21d3c0..977fcbeba0 100644 --- a/lib/mongodb/bson/bson.js +++ b/lib/mongodb/bson/bson.js @@ -811,9 +811,9 @@ var crc32 = function(string, start, end) { * @param {TODO} options * @return {TODO} */ -BSON.deserializeStream = function(data, startIndex, numberOfDocuments, options) { +BSON.deserializeStream = function(data, startIndex, numberOfDocuments, documents, docStartIndex, options) { + // if(numberOfDocuments !== documents.length) throw new Error("Number of expected results back is less than the number of documents"); options = options != null ? options : {}; - var documents = new Array(numberOfDocuments); var index = startIndex; // Loop over all documents for(var i = 0; i < numberOfDocuments; i++) { @@ -822,13 +822,13 @@ BSON.deserializeStream = function(data, startIndex, numberOfDocuments, options) // Update options with index options['index'] = index; // Parse the document at this point - documents[i] = BSON.deserialize(data, options); + documents[docStartIndex + i] = BSON.deserialize(data, options); // Adjust index by the document size index = index + size; } // Return object containing end index of parsing and list of documents - return {index:index, documents:documents}; + return index; } /** diff --git a/lib/mongodb/connection/connection.js b/lib/mongodb/connection/connection.js index c8a04fa0df..8e719fcedd 100644 --- a/lib/mongodb/connection/connection.js +++ b/lib/mongodb/connection/connection.js @@ -3,7 +3,6 @@ var utils = require('./connection_utils'), net = require('net'), debug = require('util').debug, inspect = require('util').inspect, - // SimpleEmitter = require('./simple_emitter').SimpleEmitter, EventEmitter = require('events').EventEmitter, inherits = require('util').inherits, binaryutils = require('../bson/binary_utils'), diff --git a/lib/mongodb/connection/repl_set_servers.js b/lib/mongodb/connection/repl_set_servers.js index 9135958f59..978be91c18 100644 --- a/lib/mongodb/connection/repl_set_servers.js +++ b/lib/mongodb/connection/repl_set_servers.js @@ -630,7 +630,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) { // console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 2") // Initialize all the connections for(var i = 0; i < serverConnections.length; i++) { - try { + // try { // Set up the logger for the server connection serverConnections[i].logger = replSetSelf.logger; // Default empty socket options object @@ -652,15 +652,15 @@ ReplSetServers.prototype.connect = function(parent, options, callback) { serverConnections[i].socketOptions = socketOptions; // Connect to server serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, connectionHandler(serverConnections[i])); - } catch (err) { - numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1; - // Remove from list off addresses, close down and fire error - replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port] - // Close connections - replSetSelf.close(); - // Add error message - replSetSelf._state.errorMessages.push(err); - } + // } catch (err) { + // numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1; + // // Remove from list off addresses, close down and fire error + // replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port] + // // Close connections + // replSetSelf.close(); + // // Add error message + // replSetSelf._state.errorMessages.push(err); + // } } // console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 3") diff --git a/lib/mongodb/db.js b/lib/mongodb/db.js index 3783329225..145f4a6dd2 100644 --- a/lib/mongodb/db.js +++ b/lib/mongodb/db.js @@ -8,6 +8,7 @@ var QueryCommand = require('./commands/query_command').QueryCommand, ReplSetServers = require('./connection/repl_set_servers').ReplSetServers, Cursor = require('./cursor').Cursor, EventEmitter = require('events').EventEmitter, + // EventEmitter = require('eventemitter2').EventEmitter2, inherits = require('util').inherits, crypto = require('crypto'), debug = require('util').debug, diff --git a/lib/mongodb/responses/mongo_reply.js b/lib/mongodb/responses/mongo_reply.js index 0f48c3c3ff..4bb9ac8ea1 100644 --- a/lib/mongodb/responses/mongo_reply.js +++ b/lib/mongodb/responses/mongo_reply.js @@ -40,14 +40,16 @@ MongoReply.prototype.parseHeader = function(binary_reply, bson) { MongoReply.prototype.parseBody = function(binary_reply, bson, raw, callback) { raw = raw == null ? false : raw; - var docLimitSize = 1024*10; + // Just set a doc limit for deserializing + var docLimitSize = 1024*20; // If our message length is very long, let's switch to process.nextTick for messages if(this.messageLength > docLimitSize) { var batchSize = this.numberReturned; + this.documents = new Array(this.numberReturned); // Just walk down until we get a positive number >= 1 - for(var i = 10; i > 0; i--) { + for(var i = 50; i > 0; i--) { if((this.numberReturned/i) >= 1) { batchSize = i; break; @@ -57,28 +59,37 @@ MongoReply.prototype.parseBody = function(binary_reply, bson, raw, callback) { // Actual main creator of the processFunction setting internal state to control the flow var parseFunction = function(_self, _binary_reply, _batchSize, _numberReturned) { var object_index = 0; + // Internal loop process that will use nextTick to ensure we yield some time var processFunction = function() { - // Iterate over the batch - for(var i = 0; i < _batchSize; i++) { - // Update number of docs parsed - object_index = object_index + 1; - if(object_index <= _numberReturned) { - // Read the size of the bson object - var bsonObjectSize = _binary_reply[_self.index] | _binary_reply[_self.index + 1] << 8 | _binary_reply[_self.index + 2] << 16 | _binary_reply[_self.index + 3] << 24; - // If we are storing the raw responses to pipe straight through - if(raw) { - // Deserialize the object and add to the documents array - _self.documents.push(binary_reply.slice(_self.index, _self.index + bsonObjectSize)); - } else { - // Deserialize the object and add to the documents array - _self.documents.push(bson.BSON.deserialize(binary_reply.slice(_self.index, _self.index + bsonObjectSize))); - } - // Adjust binary index to point to next block of binary bson data - _self.index = _self.index + bsonObjectSize; - } + // Adjust batchSize if we have less results left than batchsize + if((_numberReturned - object_index) < _batchSize) { + _batchSize = _numberReturned - object_index; } + // If raw just process the entries + if(raw) { + // Iterate over the batch + for(var i = 0; i < _batchSize; i++) { + // Are we done ? + if(object_index <= _numberReturned) { + // Read the size of the bson object + var bsonObjectSize = _binary_reply[_self.index] | _binary_reply[_self.index + 1] << 8 | _binary_reply[_self.index + 2] << 16 | _binary_reply[_self.index + 3] << 24; + // If we are storing the raw responses to pipe straight through + _self.documents[object_index] = binary_reply.slice(_self.index, _self.index + bsonObjectSize); + // Adjust binary index to point to next block of binary bson data + _self.index = _self.index + bsonObjectSize; + // Update number of docs parsed + object_index = object_index + 1; + } + } + } else { + // Parse documents + _self.index = bson.BSON.deserializeStream(binary_reply, _self.index, _batchSize, _self.documents, object_index); + // Adjust index + object_index = object_index + _batchSize; + } + // If we hav more documents process NextTick if(object_index < _numberReturned) { process.nextTick(processFunction); diff --git a/test/find_test.js b/test/find_test.js index 37f872d4dc..d3601d930a 100644 --- a/test/find_test.js +++ b/test/find_test.js @@ -47,7 +47,7 @@ var tests = testCase({ }, // Test a simple find - shouldCorrectlyPerformSimpleFind : function(test) { + shouldCorrectlyPerformSimpleFind : function(test) { client.createCollection('test_find_simple', function(err, r) { var collection = client.collection('test_find_simple', function(err, collection) { var doc1 = null; @@ -1075,7 +1075,7 @@ var tests = testCase({ test.done(); }); }, - + shouldCorrectlyReturnErrorFromMongodbOnFindAndModifyForcedError : function(test) { client.createCollection('shouldCorrectlyReturnErrorFromMongodbOnFindAndModifyForcedError', function(err, collection) { var q = { x: 1 }; @@ -1083,7 +1083,7 @@ var tests = testCase({ var opts = { new: true, upsert: true }; // Original doc var doc = {_id: new client.bson_serializer.ObjectID(), x:1}; - + // Insert original doc collection.insert(doc, {safe:true}, function(err, result) { collection.findAndModify(q, [], set, opts, function (err, res) { @@ -1094,49 +1094,49 @@ var tests = testCase({ }); }, - // shouldCorrectlyExecuteFindAndModifyUnderConcurrentLoad : function(test) { - // var p_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize:10}), {native_parser: (process.env['TEST_NATIVE'] != null)}); - // p_client.bson_deserializer = client.bson_deserializer; - // p_client.bson_serializer = client.bson_serializer; - // p_client.pkFactory = client.pkFactory; - // var running = true; - // - // p_client.open(function(err, p_client) { - // // Create a collection - // p_client.collection("collection1", function(err, collection) { - // // Wait a bit and then execute something that will throw a duplicate error - // setTimeout(function() { - // var id = new p_client.bson_serializer.ObjectID(); - // - // collection.insert({_id:id, a:1}, {safe:true}, function(err, result) { - // test.equal(null, err); - // - // collection.insert({_id:id, a:1}, {safe:true}, function(err, result) { - // running = false; - // test.done(); - // p_client.close(); - // }); - // }); - // }, 200); - // }); - // - // p_client.collection("collection2", function(err, collection) { - // // Keep hammering in inserts - // var insert = function() { - // process.nextTick(function() { - // collection.insert({a:1}); - // if(running) process.nextTick(insert); - // }); - // } - // - // // while(running) { - // // process.nextTick(function() { - // // collection.insert({a:1}); - // // }) - // // } - // }); - // }); - // }, + shouldCorrectlyExecuteFindAndModifyUnderConcurrentLoad : function(test) { + var p_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize:10}), {native_parser: (process.env['TEST_NATIVE'] != null)}); + p_client.bson_deserializer = client.bson_deserializer; + p_client.bson_serializer = client.bson_serializer; + p_client.pkFactory = client.pkFactory; + var running = true; + + p_client.open(function(err, p_client) { + // Create a collection + p_client.collection("collection1", function(err, collection) { + // Wait a bit and then execute something that will throw a duplicate error + setTimeout(function() { + var id = new p_client.bson_serializer.ObjectID(); + + collection.insert({_id:id, a:1}, {safe:true}, function(err, result) { + test.equal(null, err); + + collection.insert({_id:id, a:1}, {safe:true}, function(err, result) { + running = false; + test.done(); + p_client.close(); + }); + }); + }, 200); + }); + + p_client.collection("collection2", function(err, collection) { + // Keep hammering in inserts + var insert = function() { + process.nextTick(function() { + collection.insert({a:1}); + if(running) process.nextTick(insert); + }); + } + + // while(running) { + // process.nextTick(function() { + // collection.insert({a:1}); + // }) + // } + }); + }); + }, noGlobalsLeaked : function(test) { var leaks = gleak.detectNew();