Skip to content

Commit

Permalink
Added ability to use multiple db's with the same connection set
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Nov 9, 2011
1 parent 95fc37a commit d452154
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 66 deletions.
4 changes: 4 additions & 0 deletions HISTORY
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
0.9.7
* Added priority setting to replicaset manager
* Added correct handling of passive servers in replicaset
* Reworked socket code for simpler clearer handling
Expand All @@ -11,6 +12,9 @@
* Allows raw (no bson parser mode for insert, update, remove, find and findOne)
* control raw mode passing in option raw:true on the commands
* will return buffers with the binary bson objects
* Fixed memory leak in cursor.toArray
* Fixed bug in command creation for mongodb server with wrong scope of call
* Added db(dbName) method to db.js to allow for reuse of connections against other databases

0.9.6-22 2011-10-15
* Fixed bug in js bson parser that could cause wrong object size on serialization, Issue #370
Expand Down
7 changes: 3 additions & 4 deletions lib/mongodb/commands/db_command.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ var QueryCommand = require('./query_command').QueryCommand,
/**
Db Command
**/
var DbCommand = exports.DbCommand = function(db, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector) {
QueryCommand.call(db, this);

var DbCommand = exports.DbCommand = function(dbInstance, collectionName, queryOptions, numberToSkip, numberToReturn, query, returnFieldSelector) {
QueryCommand.call(this);
this.collectionName = collectionName;
this.queryOptions = queryOptions;
this.numberToSkip = numberToSkip;
this.numberToReturn = numberToReturn;
this.query = query;
this.returnFieldSelector = returnFieldSelector;
this.db = db;
this.db = dbInstance;
};

inherits(DbCommand, QueryCommand);
Expand Down
3 changes: 3 additions & 0 deletions lib/mongodb/commands/query_command.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var QueryCommand = exports.QueryCommand = function(db, collectionName, queryOpti
throw error;
}
}

// console.log("--------------------------------------------------- THE SCOPE")
// console.dir(this)

this.collectionName = collectionName;
this.queryOptions = queryOptions;
Expand Down
72 changes: 30 additions & 42 deletions lib/mongodb/connection/repl_set_servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,35 @@ var cleanupConnections = function(connections, addresses) {
}
}

ReplSetServers.prototype.allServerInstances = function() {
// Close all the servers (concatenate entire list of servers first for ease)
var allServers = self._state.master != null ? [self._state.master] : [];

// Secondary keys
var keys = Object.keys(self._state.secondaries);
// Add all secondaries
for(var i = 0; i < keys.length; i++) {
allServers.push(self._state.secondaries[keys[i]]);
}

// Arbiter keys
var keys = Object.keys(self._state.arbiters);
// Add all arbiters
for(var i = 0; i < keys.length; i++) {
allServers.push(self._state.arbiters[keys[i]]);
}

// Passive keys
var keys = Object.keys(self._state.passives);
// Add all arbiters
for(var i = 0; i < keys.length; i++) {
allServers.push(self._state.passives[keys[i]]);
}

// Return complete list of all servers
return allServers;
}

ReplSetServers.prototype.connect = function(parent, options, callback) {
if('function' === typeof options) callback = options, options = {};
if(options == null) options = {};
Expand Down Expand Up @@ -450,45 +479,4 @@ ReplSetServers.prototype.close = function(callback) { //
}
})
}
}

// //
// // My own simple synchronous emit support, We don't need the overhead of the built in flexible node.js
// // event emitter as we are looking for as low latency as possible.
// //
// ReplSetServers.prototype.on = function(event, callback) {
// if(this.eventHandlers[event] == null) throw "Event handler only accepts values of " + Object.keys(this.eventHandlers);
// // Just add callback to our event handler (avoiding the cost of the node.js event handler)
// this.eventHandlers[event].push(callback);
// }
//
// ReplSetServers.prototype.emit = function(event, err, object) {
// if(this.eventHandlers[event] == null) throw "Event handler only accepts values of " + Object.keys(this.eventHandlers);
// // Fire off all the callbacks
// var callbacks = this.eventHandlers[event];
// // Attemp to emit
// try {
// // Perform a callback on all the registered callback handlers
// for(var i = 0; i < callbacks.length; i++) {
// callbacks[i](err, object);
// }
// } catch (err) {
// this.emit("error", err);
// }
// }
//
// ReplSetServers.prototype.removeListeners = function(event) {
// if(this.eventHandlers[event] == null) throw "Event handler only accepts values of " + Object.keys(this.eventHandlers);
// // Throw away all handlers
// this.eventHandlers[event] = [];
// }
//
// ReplSetServers.prototype.removeAllListeners = function() {
// // Fetch all the keys of handlers
// var keys = Object.keys(this.eventHandlers);
// // Remove all handlers
// for(var i = 0; i < keys.length; i++) {
// this.eventHandlers[keys[i]] = [];
// }
// }

}
60 changes: 44 additions & 16 deletions lib/mongodb/connection/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var Server = exports.Server = function(host, port, options) {

// Just keeps list of events we allow
this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[]};

// Internal state of server connection
this._serverState = 'disconnected';
};
Expand Down Expand Up @@ -54,15 +53,21 @@ Server.prototype.isConnected = function() {
return this.connectionPool.isConnected();
}

Server.prototype.allServerInstances = function() {
return [this];
}

Server.prototype.connect = function(dbInstance, options, callback) {
if('function' === typeof options) callback = options, options = {};
if(options == null) options = {};
if(!('function' === typeof callback)) callback = null;

// Let's connect
var server = this;
// Let's us override the main receiver of events
var eventReceiver = options.eventReceiver != null ? options.eventReceiver : dbInstance;
// Save reference to dbInstance
this.dbInstances = [dbInstance];

// Set server state to connecting
this._serverState = 'connecting';
Expand Down Expand Up @@ -103,7 +108,6 @@ Server.prototype.connect = function(dbInstance, options, callback) {
connectionPool.on("poolReady", function() {
// Create db command and Add the callback to the list of callbacks by the request id (mapping outgoing messages to correct callbacks)
var db_command = DbCommand.NcreateIsMasterCommand(dbInstance, dbInstance.databaseName);

// Check out a reader from the pool
var connection = connectionPool.checkoutConnection();

Expand Down Expand Up @@ -132,19 +136,43 @@ Server.prototype.connect = function(dbInstance, options, callback) {
// Emit the error
eventReceiver.emit("error", new Error("bson length is different from message length"));
} else {
// Locate the callback info
var callbackInfo = dbInstance._findHandler(mongoReply.responseTo.toString());
// Parse the body
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw);
// Get the callback instance
var callbackInstance = dbInstance._removeHandler(mongoReply.responseTo);
// Only call if we have an actual callback instance, might have been removed by the reaper
if(callbackInstance != null) {
// Only trigger the callback if we have one that is not removed by the reaper
if(callbackInstance != null && typeof callbackInstance.callback === 'function') {
callbackInstance.callback(null, mongoReply, callbackInstance.info.connection);
}
// Attempt to locate a callback instance
for(var i = 0; i < server.dbInstances.length; i++) {
var dbInstanceObject = server.dbInstances[i];
// Locate the callback info
var callbackInfo = dbInstanceObject._findHandler(mongoReply.responseTo.toString());
// Only execute callback if we have a caller
if(typeof callbackInfo.callback === 'function') {
// Parse the body
mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw);
// Get the callback instance
var callbackInstance = dbInstanceObject._removeHandler(mongoReply.responseTo);
// Only call if we have an actual callback instance, might have been removed by the reaper
if(callbackInstance != null) {
// Only trigger the callback if we have one that is not removed by the reaper
if(callbackInstance != null && typeof callbackInstance.callback === 'function') {
callbackInstance.callback(null, mongoReply, callbackInstance.info.connection);
}
}
}

// console.log("======================================================== dbInstanceObject")
// console.log(dbInstanceObject)
}

// // Locate the callback info
// var callbackInfo = dbInstance._findHandler(mongoReply.responseTo.toString());
// // Parse the body
// mongoReply.parseBody(message, connectionPool.bson, callbackInfo.info.raw);
// // Get the callback instance
// var callbackInstance = dbInstance._removeHandler(mongoReply.responseTo);
// // Only call if we have an actual callback instance, might have been removed by the reaper
// if(callbackInstance != null) {
// // Only trigger the callback if we have one that is not removed by the reaper
// if(callbackInstance != null && typeof callbackInstance.callback === 'function') {
// callbackInstance.callback(null, mongoReply, callbackInstance.info.connection);
// }
// }
}
} catch (err) {
// Force close the pool
Expand All @@ -162,7 +190,7 @@ Server.prototype.connect = function(dbInstance, options, callback) {
// Emit error only if we are not in the process of connecting
if(server._serverState === 'connecting' && firstCall) {
// Only do a callback if we have a valid callback function, on retries this might not be true
if(typeof callback === 'function') callback(new Error(message.err));
if(typeof callback === 'function') callback(new Error(message && message.err ? message.err : message));
} else {
eventReceiver.emit("error", new Error(message.err));
}
Expand Down
1 change: 0 additions & 1 deletion lib/mongodb/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ Cursor.prototype.generateQueryCommand = function() {
if(this.hint != null && this.hint.constructor == Object) specialSelector['$hint'] = this.hint;
if(this.explainValue != null) specialSelector['$explain'] = true;
if(this.snapshot != null) specialSelector['$snapshot'] = true;

return new QueryCommand(this.db, this.collectionName, queryOptions, this.skipValue, numberToReturn, specialSelector, this.fields);
} else {
return new QueryCommand(this.db, this.collectionName, queryOptions, this.skipValue, numberToReturn, this.selector, this.fields);
Expand Down
14 changes: 14 additions & 0 deletions lib/mongodb/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@ Db.prototype.open = function(callback) {
}
};

Db.prototype.db = function(dbName) {
// Create a new db instance
var newDbInstance = new Db(dbName, this.serverConfig, this.options);
// Add the instance to the list of approved db instances
var allServerInstances = this.serverConfig.allServerInstances();
// Add ourselves to all server callback instances
for(var i = 0; i < allServerInstances.length; i++) {
var server = allServerInstances[i];
server.dbInstances.push(newDbInstance);
}
// Return new db object
return newDbInstance;
}

Db.prototype.close = function(callback) {
// Clear reaperId if it's set
if(this.reaperIntervalId != null) {
Expand Down
3 changes: 2 additions & 1 deletion test/auxilliary/authentication_test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();

var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
nodeunit = require('../../deps/nodeunit'),
gleak = require('../../tools/gleak'),
Db = mongodb.Db,
Cursor = mongodb.Cursor,
Collection = mongodb.Collection,
Expand Down
6 changes: 4 additions & 2 deletions test/auxilliary/replicaset_auth_test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();

var testCase = require('../../deps/nodeunit').testCase,
debug = require('util').debug
debug = require('util').debug,
inspect = require('util').inspect,
nodeunit = require('../../deps/nodeunit'),
gleak = require('../../tools/gleak'),
Db = mongodb.Db,
Cursor = mongodb.Cursor,
Collection = mongodb.Collection,
Expand All @@ -15,6 +16,7 @@ var testCase = require('../../deps/nodeunit').testCase,
var MONGODB = 'integration_tests';
// var client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 1}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var serverManager = null;
var RS = RS == null ? null : RS;

// Define the tests, we want them to run as a nested test so we only clean up the
// db connection once
Expand Down Expand Up @@ -206,7 +208,7 @@ var tests = testCase({
}
)
});
}
},

noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
Expand Down
Loading

0 comments on commit d452154

Please sign in to comment.