From d22a3377e67cc42ce23d7e941e3448034de7fda4 Mon Sep 17 00:00:00 2001 From: Christian Amor Kvalheim Date: Thu, 7 Jul 2011 16:51:33 +0200 Subject: [PATCH] Single buffer pass update command --- lib/mongodb/commands/insert_command.js | 150 ++++++++++-------------- lib/mongodb/commands/query_command.js | 126 +++++++++++++++----- lib/mongodb/commands/update_command.js | 153 ++++++++++++++++++++++++- test/find_test.js | 8 +- 4 files changed, 314 insertions(+), 123 deletions(-) diff --git a/lib/mongodb/commands/insert_command.js b/lib/mongodb/commands/insert_command.js index 8b0fa64ea8..352eaeb7d5 100644 --- a/lib/mongodb/commands/insert_command.js +++ b/lib/mongodb/commands/insert_command.js @@ -17,8 +17,6 @@ var InsertCommand = exports.InsertCommand = function(db, collectionName, checkKe // OpCodes InsertCommand.OP_INSERT = 2002; -// inherits(InsertCommand, InsertCommand); - InsertCommand.prototype.add = function(document) { this.documents.push(document); return this; @@ -32,37 +30,16 @@ struct { BSON[] documents; // one or more documents to insert into the collection } */ -InsertCommand.prototype.getCommandAsBuffers = function(buffers) { - var collectionNameBuffers = InsertCommand.encodeCString(this.collectionName); - // Add command to buffers - buffers.push(InsertCommand.encodeInt(0), collectionNameBuffers[0], collectionNameBuffers[1]); - // Basic command length - var commandLength = 4 + collectionNameBuffers[0].length + 1; - - for(var i = 0; i < this.documents.length; i++) { - var command = this.db.bson_serializer.BSON.serialize(this.documents[i], this.checkKeys, true); - commandLength += command.length; - buffers.push(command); - } - - return commandLength; -} - InsertCommand.prototype.toBinary = function() { + ////////////////////////////////////////////////////////////////////////////////////// // Calculate total length of the document var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + (4 * 4); - var documentLengths = []; // var docLength = 0 for(var i = 0; i < this.documents.length; i++) { // Calculate size of document - var documentLength = this.db.bson_serializer.BSON.calculateObjectSize(this.documents[i]); - // Save the size for writing the command - documentLengths.push(documentLength); - // Add to total of command - totalLengthOfCommand += documentLength + totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.documents[i]); } - - ////////////////////////////////////////////////////////////////////////////////////// + // Let's build the single pass buffer command var _index = 0; var _command = new Buffer(totalLengthOfCommand); @@ -99,13 +76,12 @@ InsertCommand.prototype.toBinary = function() { _command[_index++] = 0; // Write the collection name to the command _index = _index + _command.write(this.collectionName, _index, 'utf8') + 1; + _command[_index - 1] = 0; // Write all the bson documents to the buffer at the index offset for(var i = 0; i < this.documents.length; i++) { // Serialize the document straight to the buffer var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.documents[i], this.checkKeys, _command, _index) - _index + 1; - // debug("============= documentLength = " + (documentLength - _index)) - // Write the length to the document _command[_index + 3] = (documentLength >> 24) & 0xff; _command[_index + 2] = (documentLength >> 16) & 0xff; @@ -117,71 +93,71 @@ InsertCommand.prototype.toBinary = function() { _command[_index - 1] = 0; } - // return _command; + return _command; ////////////////////////////////////////////////////////////////////////////////////// - // Build list of Buffer objects to write out - var buffers = []; - - // Get the command op code - var op_code = InsertCommand.OP_INSERT; - var commandBuffers = []; - - // Get the command data structure - var commandLength = this.getCommandAsBuffers(commandBuffers); - - - // Total Size of command - var totalSize = 4 * 4 + commandLength; - // Encode totalSize, requestId, responseId and opcode - buffers.push(InsertCommand.encodeInt(totalSize), InsertCommand.encodeInt(this.requestId), InsertCommand.encodeInt(0), InsertCommand.encodeInt(op_code)); - - // Add the command items - buffers = buffers.concat(commandBuffers); - // Allocate single buffer for write - var finalBuffer = new Buffer(totalSize); - - var index = 0; - - for(var i = 0; i < buffers.length; i++) { - buffers[i].copy(finalBuffer, index); - index = index + buffers[i].length; - } - - // debug("==================== finalBuffer.length = " + finalBuffer.length) - // debug("==================== totalLengthOfCommand = " + totalLengthOfCommand) - // debug("==================== totalSize = " + totalSize) - // debug("==================== docLength = " + docLength) - // debug("==================== commandLength = " + commandLength) + // // Build list of Buffer objects to write out + // var buffers = []; + // + // // Get the command op code + // var op_code = InsertCommand.OP_INSERT; + // var commandBuffers = []; + // + // // Get the command data structure + // var commandLength = this.getCommandAsBuffers(commandBuffers); + // + // + // // Total Size of command + // var totalSize = 4 * 4 + commandLength; + // // Encode totalSize, requestId, responseId and opcode + // buffers.push(InsertCommand.encodeInt(totalSize), InsertCommand.encodeInt(this.requestId), InsertCommand.encodeInt(0), InsertCommand.encodeInt(op_code)); + // + // // Add the command items + // buffers = buffers.concat(commandBuffers); + // // Allocate single buffer for write + // var finalBuffer = new Buffer(totalSize); + // + // var index = 0; // - // for(var i = 0; i < finalBuffer.length; i++) { - // debug(i + " :: [" + _command[i] + "] = [" + finalBuffer[i] + "]" + (_command[i] != finalBuffer[i] ? " = FALSE" : "")) + // for(var i = 0; i < buffers.length; i++) { + // buffers[i].copy(finalBuffer, index); + // index = index + buffers[i].length; // } - - return finalBuffer; + // + // // debug("==================== finalBuffer.length = " + finalBuffer.length) + // // debug("==================== totalLengthOfCommand = " + totalLengthOfCommand) + // // debug("==================== totalSize = " + totalSize) + // // debug("==================== docLength = " + docLength) + // // debug("==================== commandLength = " + commandLength) + // // + // // for(var i = 0; i < finalBuffer.length; i++) { + // // debug(i + " :: [" + _command[i] + "] = [" + finalBuffer[i] + "]" + (_command[i] != finalBuffer[i] ? " = FALSE" : "")) + // // } + // + // return finalBuffer; }; - -InsertCommand.encodeInt = function(value) { - var buffer = new Buffer(4); - buffer[3] = (value >> 24) & 0xff; - buffer[2] = (value >> 16) & 0xff; - buffer[1] = (value >> 8) & 0xff; - buffer[0] = value & 0xff; - return buffer; -} - -InsertCommand.encodeIntInPlace = function(value, buffer, index) { - buffer[index + 3] = (value >> 24) & 0xff; - buffer[index + 2] = (value >> 16) & 0xff; - buffer[index + 1] = (value >> 8) & 0xff; - buffer[index] = value & 0xff; -} - -InsertCommand.encodeCString = function(string) { - var buf = new Buffer(string, 'utf8'); - return [buf, new Buffer([0])]; -} +// +// InsertCommand.encodeInt = function(value) { +// var buffer = new Buffer(4); +// buffer[3] = (value >> 24) & 0xff; +// buffer[2] = (value >> 16) & 0xff; +// buffer[1] = (value >> 8) & 0xff; +// buffer[0] = value & 0xff; +// return buffer; +// } +// +// InsertCommand.encodeIntInPlace = function(value, buffer, index) { +// buffer[index + 3] = (value >> 24) & 0xff; +// buffer[index + 2] = (value >> 16) & 0xff; +// buffer[index + 1] = (value >> 8) & 0xff; +// buffer[index] = value & 0xff; +// } +// +// InsertCommand.encodeCString = function(string) { +// var buf = new Buffer(string, 'utf8'); +// return [buf, new Buffer([0])]; +// } diff --git a/lib/mongodb/commands/query_command.js b/lib/mongodb/commands/query_command.js index ac57c561aa..4f441ebfc8 100644 --- a/lib/mongodb/commands/query_command.js +++ b/lib/mongodb/commands/query_command.js @@ -8,8 +8,6 @@ var BaseCommand = require('./base_command').BaseCommand, Insert Document Command **/ var QueryCommand = exports.QueryCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector) { - BaseCommand.call(this); - this.collectionName = collectionName; this.queryOptions = queryOptions; this.numberToSkip = numberToSkip; @@ -19,11 +17,7 @@ var QueryCommand = exports.QueryCommand = function(db, collectionName, queryOpti this.db = db; }; -inherits(QueryCommand, BaseCommand); - -QueryCommand.prototype.getOpCode = function() { - return BaseCommand.OP_QUERY; -}; +QueryCommand.OP_QUERY = 2004; /* struct { @@ -36,32 +30,106 @@ struct { [ BSON returnFieldSelector; ] // OPTIONAL : selector indicating the fields to return. See below for details. } */ -QueryCommand.prototype.getCommandAsBuffers = function(buffers) { - var collectionNameBuffers = BaseCommand.encodeCString(this.collectionName); - var queryCommand = this.db.bson_serializer.BSON.serialize(this.query, false, true); - var totalObjectLength = 0; +QueryCommand.prototype.toBinary = function() { + ////////////////////////////////////////////////////////////////////////////////////// + // Calculate total length of the document + var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.query) + (4 * 4); + // Calculate extra fields size + if(this.returnFieldSelector != null) { + if(Object.keys(this.returnFieldSelector).length > 0) { + totalLengthOfCommand += this.db.bson_serializer.BSON.calculateObjectSize(this.returnFieldSelector); + } + } + + // Let's build the single pass buffer command + var _index = 0; + var _command = new Buffer(totalLengthOfCommand); + // Write the header information to the buffer + _command[_index + 3] = (totalLengthOfCommand >> 24) & 0xff; + _command[_index + 2] = (totalLengthOfCommand >> 16) & 0xff; + _command[_index + 1] = (totalLengthOfCommand >> 8) & 0xff; + _command[_index] = totalLengthOfCommand & 0xff; + // Adjust index + _index = _index + 4; + // Write the request ID + _command[_index + 3] = (this.requestId >> 24) & 0xff; + _command[_index + 2] = (this.requestId >> 16) & 0xff; + _command[_index + 1] = (this.requestId >> 8) & 0xff; + _command[_index] = this.requestId & 0xff; + // Adjust index + _index = _index + 4; + // Write zero + _command[_index++] = 0; + _command[_index++] = 0; + _command[_index++] = 0; + _command[_index++] = 0; + // Write the op_code for the command + _command[_index + 3] = (QueryCommand.OP_QUERY >> 24) & 0xff; + _command[_index + 2] = (QueryCommand.OP_QUERY >> 16) & 0xff; + _command[_index + 1] = (QueryCommand.OP_QUERY >> 8) & 0xff; + _command[_index] = QueryCommand.OP_QUERY & 0xff; + // Adjust index + _index = _index + 4; - // Push basic options + query - buffers.push(BaseCommand.encodeInt(this.queryOptions), collectionNameBuffers[0], collectionNameBuffers[1], - BaseCommand.encodeInt(this.numberToSkip), BaseCommand.encodeInt(this.numberToReturn), - queryCommand); + // Write the query options + _command[_index + 3] = (this.queryOptions >> 24) & 0xff; + _command[_index + 2] = (this.queryOptions >> 16) & 0xff; + _command[_index + 1] = (this.queryOptions >> 8) & 0xff; + _command[_index] = this.queryOptions & 0xff; + // Adjust index + _index = _index + 4; + + // Write the collection name to the command + _index = _index + _command.write(this.collectionName, _index, 'utf8') + 1; + _command[_index - 1] = 0; + + // Write the number of documents to skip + _command[_index + 3] = (this.numberToSkip >> 24) & 0xff; + _command[_index + 2] = (this.numberToSkip >> 16) & 0xff; + _command[_index + 1] = (this.numberToSkip >> 8) & 0xff; + _command[_index] = this.numberToSkip & 0xff; + // Adjust index + _index = _index + 4; + + // Write the number of documents to return + _command[_index + 3] = (this.numberToReturn >> 24) & 0xff; + _command[_index + 2] = (this.numberToReturn >> 16) & 0xff; + _command[_index + 1] = (this.numberToReturn >> 8) & 0xff; + _command[_index] = this.numberToReturn & 0xff; + // Adjust index + _index = _index + 4; - // Add up total length - totalObjectLength += 4 + collectionNameBuffers[0].length + 1 + 4 + 4 + queryCommand.length; + // Serialize the query document straight to the buffer + var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.query, this.checkKeys, _command, _index) - _index + 1; + // Write the length to the document + _command[_index + 3] = (documentLength >> 24) & 0xff; + _command[_index + 2] = (documentLength >> 16) & 0xff; + _command[_index + 1] = (documentLength >> 8) & 0xff; + _command[_index] = documentLength & 0xff; + // Update index in buffer + _index = _index + documentLength; + // Add terminating 0 for the object + _command[_index - 1] = 0; // Push field selector if available if(this.returnFieldSelector != null) { - var count = 0; for(var name in this.returnFieldSelector) { count += 1; } - if(count > 0) { - var fieldsCommand = this.db.bson_serializer.BSON.serialize(this.returnFieldSelector, false, true); - totalObjectLength += fieldsCommand.length; - buffers.push(fieldsCommand); + if(Object.keys(this.returnFieldSelector).length > 0) { + var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.returnFieldSelector, this.checkKeys, _command, _index) - _index + 1; + // Write the length to the document + _command[_index + 3] = (documentLength >> 24) & 0xff; + _command[_index + 2] = (documentLength >> 16) & 0xff; + _command[_index + 1] = (documentLength >> 8) & 0xff; + _command[_index] = documentLength & 0xff; + // Update index in buffer + _index = _index + documentLength; + // Add terminating 0 for the object + _command[_index - 1] = 0; } } - // Return value - return totalObjectLength -} + return _command; + ////////////////////////////////////////////////////////////////////////////////////// +}; // Constants QueryCommand.OPTS_NONE = 0; @@ -70,4 +138,10 @@ QueryCommand.OPTS_SLAVE = 4; QueryCommand.OPTS_OPLOG_REPLY = 8; QueryCommand.OPTS_NO_CURSOR_TIMEOUT = 16; QueryCommand.OPTS_AWAIT_DATA = 32; -QueryCommand.OPTS_EXHAUST = 64; \ No newline at end of file +QueryCommand.OPTS_EXHAUST = 64; + +var id = 1; +QueryCommand.prototype.getRequestId = function() { + if (!this.requestId) this.requestId = id++; + return this.requestId; +}; \ No newline at end of file diff --git a/lib/mongodb/commands/update_command.js b/lib/mongodb/commands/update_command.js index f209598372..9f36ddc17c 100644 --- a/lib/mongodb/commands/update_command.js +++ b/lib/mongodb/commands/update_command.js @@ -8,8 +8,6 @@ var BaseCommand = require('./base_command').BaseCommand, Update Document Command **/ var UpdateCommand = exports.UpdateCommand = function(db, collectionName, spec, document, options) { - BaseCommand.apply(this); - this.collectionName = collectionName; this.spec = spec; this.document = document; @@ -25,11 +23,10 @@ var UpdateCommand = exports.UpdateCommand = function(db, collectionName, spec, d this.flags = parseInt(db_multi_update.toString() + db_upsert.toString(), 2); }; -inherits(UpdateCommand, BaseCommand); - +UpdateCommand.OP_UPDATE = 2001; UpdateCommand.prototype.getOpCode = function() { - return BaseCommand.OP_UPDATE; + return UpdateCommand.OP_UPDATE; }; /* @@ -53,6 +50,150 @@ UpdateCommand.prototype.getCommandAsBuffers = function(buffers) { return totalObjectLength; } +UpdateCommand.prototype.toBinary = function() { + ////////////////////////////////////////////////////////////////////////////////////// + // Calculate total length of the document + var totalLengthOfCommand = 4 + Buffer.byteLength(this.collectionName) + 1 + 4 + this.db.bson_serializer.BSON.calculateObjectSize(this.spec) + + this.db.bson_serializer.BSON.calculateObjectSize(this.document) + (4 * 4); + + // Let's build the single pass buffer command + var _index = 0; + var _command = new Buffer(totalLengthOfCommand); + // Write the header information to the buffer + _command[_index + 3] = (totalLengthOfCommand >> 24) & 0xff; + _command[_index + 2] = (totalLengthOfCommand >> 16) & 0xff; + _command[_index + 1] = (totalLengthOfCommand >> 8) & 0xff; + _command[_index] = totalLengthOfCommand & 0xff; + // Adjust index + _index = _index + 4; + // Write the request ID + _command[_index + 3] = (this.requestId >> 24) & 0xff; + _command[_index + 2] = (this.requestId >> 16) & 0xff; + _command[_index + 1] = (this.requestId >> 8) & 0xff; + _command[_index] = this.requestId & 0xff; + // Adjust index + _index = _index + 4; + // Write zero + _command[_index++] = 0; + _command[_index++] = 0; + _command[_index++] = 0; + _command[_index++] = 0; + // Write the op_code for the command + _command[_index + 3] = (UpdateCommand.OP_UPDATE >> 24) & 0xff; + _command[_index + 2] = (UpdateCommand.OP_UPDATE >> 16) & 0xff; + _command[_index + 1] = (UpdateCommand.OP_UPDATE >> 8) & 0xff; + _command[_index] = UpdateCommand.OP_UPDATE & 0xff; + // Adjust index + _index = _index + 4; + + // Write zero + _command[_index++] = 0; + _command[_index++] = 0; + _command[_index++] = 0; + _command[_index++] = 0; + + // Write the collection name to the command + _index = _index + _command.write(this.collectionName, _index, 'utf8') + 1; + _command[_index - 1] = 0; + + // Write the update flags + _command[_index + 3] = (this.flags >> 24) & 0xff; + _command[_index + 2] = (this.flags >> 16) & 0xff; + _command[_index + 1] = (this.flags >> 8) & 0xff; + _command[_index] = this.flags & 0xff; + // Adjust index + _index = _index + 4; + + // Serialize the spec document + var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.spec, this.checkKeys, _command, _index) - _index + 1; + // Write the length to the document + _command[_index + 3] = (documentLength >> 24) & 0xff; + _command[_index + 2] = (documentLength >> 16) & 0xff; + _command[_index + 1] = (documentLength >> 8) & 0xff; + _command[_index] = documentLength & 0xff; + // Update index in buffer + _index = _index + documentLength; + // Add terminating 0 for the object + _command[_index - 1] = 0; + + // Serialize the document + var documentLength = this.db.bson_serializer.BSON.serializeWithBufferAndIndex(this.document, this.checkKeys, _command, _index) - _index + 1; + // Write the length to the document + _command[_index + 3] = (documentLength >> 24) & 0xff; + _command[_index + 2] = (documentLength >> 16) & 0xff; + _command[_index + 1] = (documentLength >> 8) & 0xff; + _command[_index] = documentLength & 0xff; + // Update index in buffer + _index = _index + documentLength; + // Add terminating 0 for the object + _command[_index - 1] = 0; + + return _command; + ////////////////////////////////////////////////////////////////////////////////////// + + // Build list of Buffer objects to write out + var buffers = []; + + // Get the command op code + var op_code = this.getOpCode(); + var commandBuffers = []; + + // Get the command data structure + var commandLength = this.getCommandAsBuffers(commandBuffers); + // Total Size of command + var totalSize = 4*4 + commandLength; + // Encode totalSize, requestId, responseId and opcode + buffers.push(BaseCommand.encodeInt(totalSize), BaseCommand.encodeInt(this.requestId), BaseCommand.encodeInt(0), BaseCommand.encodeInt(op_code)); + + // Add the command items + buffers = buffers.concat(commandBuffers); + // Allocate single buffer for write + var finalBuffer = new Buffer(totalSize); + + var index = 0; + + for(var i = 0; i < buffers.length; i++) { + buffers[i].copy(finalBuffer, index); + index = index + buffers[i].length; + } + + for(var i = 0; i < finalBuffer.length; i++) { + debug(i + " :: [" + _command[i] + "] = [" + finalBuffer[i] + "]" + (_command[i] != finalBuffer[i] ? " = FALSE" : "")) + } + + debug("===================================== finalBuffer.length :: " + finalBuffer.length) + debug("===================================== totalLengthOfCommand :: " + totalLengthOfCommand) + + return finalBuffer; +}; + // Constants UpdateCommand.DB_UPSERT = 0; -UpdateCommand.DB_MULTI_UPDATE = 1; \ No newline at end of file +UpdateCommand.DB_MULTI_UPDATE = 1; + +UpdateCommand.encodeInt = function(value) { + var buffer = new Buffer(4); + buffer[3] = (value >> 24) & 0xff; + buffer[2] = (value >> 16) & 0xff; + buffer[1] = (value >> 8) & 0xff; + buffer[0] = value & 0xff; + return buffer; +} + +UpdateCommand.encodeIntInPlace = function(value, buffer, index) { + buffer[index + 3] = (value >> 24) & 0xff; + buffer[index + 2] = (value >> 16) & 0xff; + buffer[index + 1] = (value >> 8) & 0xff; + buffer[index] = value & 0xff; +} + +UpdateCommand.encodeCString = function(string) { + var buf = new Buffer(string, 'utf8'); + return [buf, new Buffer([0])]; +} + +// var id = 1; +// BaseCommand.prototype.getRequestId = function() { +// if (!this.requestId) this.requestId = id++; +// return this.requestId; +// }; \ No newline at end of file diff --git a/test/find_test.js b/test/find_test.js index 60215053d3..afb6f2c1dd 100644 --- a/test/find_test.js +++ b/test/find_test.js @@ -558,7 +558,7 @@ var tests = testCase({ test.equal(3, updated_doc.b); }) }); - + // Test return old document on change collection.insert({'a':2, 'b':2}, {safe:true}, function(err, doc) { // Let's modify the document in place @@ -567,7 +567,7 @@ var tests = testCase({ test.equal(2, updated_doc.b); }) }); - + // Test remove object on change collection.insert({'a':3, 'b':2}, {safe:true}, function(err, doc) { // Let's modify the document in place @@ -576,13 +576,13 @@ var tests = testCase({ test.equal(2, updated_doc.b); }) }); - + // Let's upsert! collection.findAndModify({'a':4}, [], {'$set':{'b':3}}, {'new': true, upsert: true}, function(err, updated_doc) { test.equal(4, updated_doc.a); test.equal(3, updated_doc.b); }); - + // Test selecting a subset of fields collection.insert({a: 100, b: 101}, {safe:true}, function (err, ids) { collection.findAndModify({'a': 100}, [], {'$set': {'b': 5}}, {'new': true, fields: {b: 1}}, function (err, updated_doc) {