Skip to content

Commit

Permalink
Minor optimiziations and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Dec 19, 2011
1 parent edec7c8 commit 5b80ab8
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 100 deletions.
22 changes: 15 additions & 7 deletions benchmark/bson_benchmark.js
Expand Up @@ -6,11 +6,11 @@ var BSON = require('../lib/mongodb').BSONNative.BSON,
debug = require('util').debug,
inspect = require('util').inspect;

var BSON = require('../lib/mongodb').BSONPure.BSON,
ObjectID = require('../lib/mongodb').BSONPure.ObjectID,
Code = require('../lib/mongodb').BSONPure.Code,
Long = require('../lib/mongodb').BSONPure.Long,
Binary = require('../lib/mongodb').BSONPure.Binary;
// var BSON = require('../lib/mongodb').BSONPure.BSON,
// ObjectID = require('../lib/mongodb').BSONPure.ObjectID,
// Code = require('../lib/mongodb').BSONPure.Code,
// Long = require('../lib/mongodb').BSONPure.Long,
// Binary = require('../lib/mongodb').BSONPure.Binary;

var COUNT = 1000;
var COUNT = 100;
Expand Down Expand Up @@ -58,6 +58,9 @@ for(var i = 0; i < numberOfObjects; i++) {
var x, start, end, j
var objectBSON, objectJSON

// Allocate the return array (avoid concatinating everything)
var results = new Array(numberOfObjects);

console.log(COUNT + "x (objectBSON = BSON.serialize(object))")
start = new Date

Expand All @@ -68,11 +71,16 @@ start = new Date
// console.dir(objects)

for (j=COUNT; --j>=0; ) {
var objects = BSON.deserializeStream(data, 0, numberOfObjects);
var nextIndex = BSON.deserializeStream(data, 0, numberOfObjects, results, 0);
}

end = new Date
var opsprsecond = COUNT / ((end - start)/1000);
console.log("bson size (bytes): ", objectBSON.length);
console.log("time = ", end - start, "ms -", COUNT / ((end - start)/1000), " ops/sec");
console.log("MB/s = " + ((opsprsecond*objectBSON.length)/1024));
console.log("MB/s = " + ((opsprsecond*objectBSON.length)/1024));

// console.dir(nextIndex)
// console.dir(results)


3 changes: 2 additions & 1 deletion deps/nodeunit/bin/nodeunit
@@ -1,4 +1,5 @@
#!/usr/bin/env node
#!/usr/bin/env node
//--prof --trace-opt --trace-deopt --trace-bailout

var
fs = require('fs'),
Expand Down
26 changes: 15 additions & 11 deletions external-libs/bson/bson.cc
Expand Up @@ -1154,24 +1154,27 @@ Handle<Value> BSON::BSONDeserializeStream(const Arguments &args) {
HandleScope scope;

// At least 3 arguments required
if(args.Length() < 3) VException("Arguments required (Buffer(data), Number(index in data), Number(number of documents to deserialize), Object(optional))");
if(args.Length() < 5) VException("Arguments required (Buffer(data), Number(index in data), Number(number of documents to deserialize), Array(results), Number(index in the array), Object(optional))");

// If the number of argumets equals 3
if(args.Length() >= 3) {
if(args.Length() >= 5) {
if(!Buffer::HasInstance(args[0])) return VException("First argument must be Buffer instance");
if(!args[1]->IsUint32()) return VException("Second argument must be a positive index number");
if(!args[2]->IsUint32()) return VException("Third argument must be a positive number of documents to deserialize");
if(!args[3]->IsArray()) return VException("Fourth argument must be an array the size of documents to deserialize");
if(!args[4]->IsUint32()) return VException("Sixth argument must be a positive index number");
}

// If we have 4 arguments
if(args.Length() == 4 && !args[3]->IsObject()) return VException("Fourth argument must be an object with options");
if(args.Length() == 6 && !args[5]->IsObject()) return VException("Fifth argument must be an object with options");

// Define pointer to data
char *data;
uint32_t length;
Local<Object> obj = args[0]->ToObject();
uint32_t numberOfDocuments = args[2]->ToUint32()->Value();
uint32_t index = args[1]->ToUint32()->Value();
uint32_t resultIndex = args[4]->ToUint32()->Value();

// Unpack the buffer variable
#if NODE_MAJOR_VERSION == 0 && NODE_MINOR_VERSION < 3
Expand All @@ -1183,10 +1186,11 @@ Handle<Value> BSON::BSONDeserializeStream(const Arguments &args) {
length = Buffer::Length(obj);
#endif

// Create return Object to wrap data in
Local<Object> resultObject = Object::New();
// Create an array for results
Local<Array> documents = Array::New(args[2]->ToUint32()->Value());
// // Create return Object to wrap data in
// Local<Object> resultObject = Object::New();
// // Create an array for results
// Local<Array> documents = Array::New(args[2]->ToUint32()->Value());
Local<Object> documents = args[3]->ToObject();

for(uint32_t i = 0; i < numberOfDocuments; i++) {
// Decode the size of the BSON data structure
Expand All @@ -1196,16 +1200,16 @@ Handle<Value> BSON::BSONDeserializeStream(const Arguments &args) {
Handle<Value> result = BSON::deserialize(data, index, NULL);

// Add result to array
documents->Set(i, result);
documents->Set(i + resultIndex, result);

// Adjust the index for next pass
index = index + size;
}

// Add objects to the result Object
resultObject->Set(String::New("index"), Uint32::New(index));
resultObject->Set(String::New("documents"), documents);
return scope.Close(resultObject);
// resultObject->Set(String::New("index"), Uint32::New(index));
// resultObject->Set(String::New("documents"), documents);
return scope.Close(Uint32::New(index));
}

Handle<Value> BSON::BSONDeserialize(const Arguments &args) {
Expand Down
8 changes: 4 additions & 4 deletions lib/mongodb/bson/bson.js
Expand Up @@ -811,9 +811,9 @@ var crc32 = function(string, start, end) {
* @param {TODO} options
* @return {TODO}
*/
BSON.deserializeStream = function(data, startIndex, numberOfDocuments, options) {
BSON.deserializeStream = function(data, startIndex, numberOfDocuments, documents, docStartIndex, options) {
// if(numberOfDocuments !== documents.length) throw new Error("Number of expected results back is less than the number of documents");
options = options != null ? options : {};
var documents = new Array(numberOfDocuments);
var index = startIndex;
// Loop over all documents
for(var i = 0; i < numberOfDocuments; i++) {
Expand All @@ -822,13 +822,13 @@ BSON.deserializeStream = function(data, startIndex, numberOfDocuments, options)
// Update options with index
options['index'] = index;
// Parse the document at this point
documents[i] = BSON.deserialize(data, options);
documents[docStartIndex + i] = BSON.deserialize(data, options);
// Adjust index by the document size
index = index + size;
}

// Return object containing end index of parsing and list of documents
return {index:index, documents:documents};
return index;
}

/**
Expand Down
1 change: 0 additions & 1 deletion lib/mongodb/connection/connection.js
Expand Up @@ -3,7 +3,6 @@ var utils = require('./connection_utils'),
net = require('net'),
debug = require('util').debug,
inspect = require('util').inspect,
// SimpleEmitter = require('./simple_emitter').SimpleEmitter,
EventEmitter = require('events').EventEmitter,
inherits = require('util').inherits,
binaryutils = require('../bson/binary_utils'),
Expand Down
20 changes: 10 additions & 10 deletions lib/mongodb/connection/repl_set_servers.js
Expand Up @@ -630,7 +630,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 2")
// Initialize all the connections
for(var i = 0; i < serverConnections.length; i++) {
try {
// try {
// Set up the logger for the server connection
serverConnections[i].logger = replSetSelf.logger;
// Default empty socket options object
Expand All @@ -652,15 +652,15 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
serverConnections[i].socketOptions = socketOptions;
// Connect to server
serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, connectionHandler(serverConnections[i]));
} catch (err) {
numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1;
// Remove from list off addresses, close down and fire error
replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port]
// Close connections
replSetSelf.close();
// Add error message
replSetSelf._state.errorMessages.push(err);
}
// } catch (err) {
// numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1;
// // Remove from list off addresses, close down and fire error
// replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port]
// // Close connections
// replSetSelf.close();
// // Add error message
// replSetSelf._state.errorMessages.push(err);
// }
}

// console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 3")
Expand Down
1 change: 1 addition & 0 deletions lib/mongodb/db.js
Expand Up @@ -8,6 +8,7 @@ var QueryCommand = require('./commands/query_command').QueryCommand,
ReplSetServers = require('./connection/repl_set_servers').ReplSetServers,
Cursor = require('./cursor').Cursor,
EventEmitter = require('events').EventEmitter,
// EventEmitter = require('eventemitter2').EventEmitter2,
inherits = require('util').inherits,
crypto = require('crypto'),
debug = require('util').debug,
Expand Down
51 changes: 31 additions & 20 deletions lib/mongodb/responses/mongo_reply.js
Expand Up @@ -40,14 +40,16 @@ MongoReply.prototype.parseHeader = function(binary_reply, bson) {

MongoReply.prototype.parseBody = function(binary_reply, bson, raw, callback) {
raw = raw == null ? false : raw;
var docLimitSize = 1024*10;
// Just set a doc limit for deserializing
var docLimitSize = 1024*20;

// If our message length is very long, let's switch to process.nextTick for messages
if(this.messageLength > docLimitSize) {
var batchSize = this.numberReturned;
this.documents = new Array(this.numberReturned);

// Just walk down until we get a positive number >= 1
for(var i = 10; i > 0; i--) {
for(var i = 50; i > 0; i--) {
if((this.numberReturned/i) >= 1) {
batchSize = i;
break;
Expand All @@ -57,28 +59,37 @@ MongoReply.prototype.parseBody = function(binary_reply, bson, raw, callback) {
// Actual main creator of the processFunction setting internal state to control the flow
var parseFunction = function(_self, _binary_reply, _batchSize, _numberReturned) {
var object_index = 0;

// Internal loop process that will use nextTick to ensure we yield some time
var processFunction = function() {
// Iterate over the batch
for(var i = 0; i < _batchSize; i++) {
// Update number of docs parsed
object_index = object_index + 1;
if(object_index <= _numberReturned) {
// Read the size of the bson object
var bsonObjectSize = _binary_reply[_self.index] | _binary_reply[_self.index + 1] << 8 | _binary_reply[_self.index + 2] << 16 | _binary_reply[_self.index + 3] << 24;
// If we are storing the raw responses to pipe straight through
if(raw) {
// Deserialize the object and add to the documents array
_self.documents.push(binary_reply.slice(_self.index, _self.index + bsonObjectSize));
} else {
// Deserialize the object and add to the documents array
_self.documents.push(bson.BSON.deserialize(binary_reply.slice(_self.index, _self.index + bsonObjectSize)));
}
// Adjust binary index to point to next block of binary bson data
_self.index = _self.index + bsonObjectSize;
}
// Adjust batchSize if we have less results left than batchsize
if((_numberReturned - object_index) < _batchSize) {
_batchSize = _numberReturned - object_index;
}

// If raw just process the entries
if(raw) {
// Iterate over the batch
for(var i = 0; i < _batchSize; i++) {
// Are we done ?
if(object_index <= _numberReturned) {
// Read the size of the bson object
var bsonObjectSize = _binary_reply[_self.index] | _binary_reply[_self.index + 1] << 8 | _binary_reply[_self.index + 2] << 16 | _binary_reply[_self.index + 3] << 24;
// If we are storing the raw responses to pipe straight through
_self.documents[object_index] = binary_reply.slice(_self.index, _self.index + bsonObjectSize);
// Adjust binary index to point to next block of binary bson data
_self.index = _self.index + bsonObjectSize;
// Update number of docs parsed
object_index = object_index + 1;
}
}
} else {
// Parse documents
_self.index = bson.BSON.deserializeStream(binary_reply, _self.index, _batchSize, _self.documents, object_index);
// Adjust index
object_index = object_index + _batchSize;
}

// If we hav more documents process NextTick
if(object_index < _numberReturned) {
process.nextTick(processFunction);
Expand Down
92 changes: 46 additions & 46 deletions test/find_test.js
Expand Up @@ -47,7 +47,7 @@ var tests = testCase({
},

// Test a simple find
shouldCorrectlyPerformSimpleFind : function(test) {
shouldCorrectlyPerformSimpleFind : function(test) {
client.createCollection('test_find_simple', function(err, r) {
var collection = client.collection('test_find_simple', function(err, collection) {
var doc1 = null;
Expand Down Expand Up @@ -1075,15 +1075,15 @@ var tests = testCase({
test.done();
});
},

shouldCorrectlyReturnErrorFromMongodbOnFindAndModifyForcedError : function(test) {
client.createCollection('shouldCorrectlyReturnErrorFromMongodbOnFindAndModifyForcedError', function(err, collection) {
var q = { x: 1 };
var set = { y:2, _id: new client.bson_serializer.ObjectID() };
var opts = { new: true, upsert: true };
// Original doc
var doc = {_id: new client.bson_serializer.ObjectID(), x:1};

// Insert original doc
collection.insert(doc, {safe:true}, function(err, result) {
collection.findAndModify(q, [], set, opts, function (err, res) {
Expand All @@ -1094,49 +1094,49 @@ var tests = testCase({
});
},

// shouldCorrectlyExecuteFindAndModifyUnderConcurrentLoad : function(test) {
// var p_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize:10}), {native_parser: (process.env['TEST_NATIVE'] != null)});
// p_client.bson_deserializer = client.bson_deserializer;
// p_client.bson_serializer = client.bson_serializer;
// p_client.pkFactory = client.pkFactory;
// var running = true;
//
// p_client.open(function(err, p_client) {
// // Create a collection
// p_client.collection("collection1", function(err, collection) {
// // Wait a bit and then execute something that will throw a duplicate error
// setTimeout(function() {
// var id = new p_client.bson_serializer.ObjectID();
//
// collection.insert({_id:id, a:1}, {safe:true}, function(err, result) {
// test.equal(null, err);
//
// collection.insert({_id:id, a:1}, {safe:true}, function(err, result) {
// running = false;
// test.done();
// p_client.close();
// });
// });
// }, 200);
// });
//
// p_client.collection("collection2", function(err, collection) {
// // Keep hammering in inserts
// var insert = function() {
// process.nextTick(function() {
// collection.insert({a:1});
// if(running) process.nextTick(insert);
// });
// }
//
// // while(running) {
// // process.nextTick(function() {
// // collection.insert({a:1});
// // })
// // }
// });
// });
// },
shouldCorrectlyExecuteFindAndModifyUnderConcurrentLoad : function(test) {
var p_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize:10}), {native_parser: (process.env['TEST_NATIVE'] != null)});
p_client.bson_deserializer = client.bson_deserializer;
p_client.bson_serializer = client.bson_serializer;
p_client.pkFactory = client.pkFactory;
var running = true;

p_client.open(function(err, p_client) {
// Create a collection
p_client.collection("collection1", function(err, collection) {
// Wait a bit and then execute something that will throw a duplicate error
setTimeout(function() {
var id = new p_client.bson_serializer.ObjectID();

collection.insert({_id:id, a:1}, {safe:true}, function(err, result) {
test.equal(null, err);

collection.insert({_id:id, a:1}, {safe:true}, function(err, result) {
running = false;
test.done();
p_client.close();
});
});
}, 200);
});

p_client.collection("collection2", function(err, collection) {
// Keep hammering in inserts
var insert = function() {
process.nextTick(function() {
collection.insert({a:1});
if(running) process.nextTick(insert);
});
}

// while(running) {
// process.nextTick(function() {
// collection.insert({a:1});
// })
// }
});
});
},

noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
Expand Down

0 comments on commit 5b80ab8

Please sign in to comment.