diff --git a/lib/mongodb/admin.js b/lib/mongodb/admin.js index c5775598b9..d696f8bf51 100644 --- a/lib/mongodb/admin.js +++ b/lib/mongodb/admin.js @@ -132,11 +132,14 @@ Admin.prototype.profilingInfo = function(callback) { self.db.databaseName = databaseName; }; -Admin.prototype.command = function(command, callback) { +Admin.prototype.command = function(command, options, callback) { var self = this; + var args = Array.prototype.slice.call(arguments, 1); + callback = args.pop(); + options = args.length ? args.shift() : {}; // Execute a command - this.db.executeDbAdminCommand(command, function(err, result) { + this.db.executeDbAdminCommand(command, options, function(err, result) { // Ensure change before event loop executes return callback != null ? callback(err, result) : null; }); diff --git a/lib/mongodb/connection/repl_set_servers.js b/lib/mongodb/connection/repl_set_servers.js index 567d7c706f..38f64cb972 100644 --- a/lib/mongodb/connection/repl_set_servers.js +++ b/lib/mongodb/connection/repl_set_servers.js @@ -9,6 +9,17 @@ var Connection = require('./connection').Connection, PingStrategy = require('./strategies/ping_strategy').PingStrategy, StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy; +const STATE_STARTING_PHASE_1 = 0; +const STATE_PRIMARY = 1; +const STATE_SECONDARY = 2; +const STATE_RECOVERING = 3; +const STATE_FATAL_ERROR = 4; +const STATE_STARTING_PHASE_2 = 5; +const STATE_UNKNOWN = 6; +const STATE_ARBITER = 7; +const STATE_DOWN = 8; +const STATE_ROLLBACK = 9; + /** * ReplSetServers constructor provides master-slave functionality * @@ -17,6 +28,7 @@ var Connection = require('./connection').Connection, * */ var ReplSetServers = exports.ReplSetServers = function(servers, options) { + var self = this; // Contains the master server entry this.options = options == null ? {} : options; this.reconnectWait = this.options["reconnectWait"] != null ? this.options["reconnectWait"] : 1000; @@ -27,6 +39,10 @@ var ReplSetServers = exports.ReplSetServers = function(servers, options) { this.readSecondary = this.options["read_secondary"]; this.slaveOk = true; this.closedConnectionCount = 0; + + // Tells the driver if we need to restart Replicaset connections due to changes + this.replicaSetCheckInterval = this.options["replicaSetCheckInterval"] != null ? this.options["replicaSetCheckInterval"] : 1000; + this.replicaSetChanged = false; // Set up ssl connections this.ssl = this.options.ssl == null ? false : this.options.ssl; @@ -219,7 +235,7 @@ ReplSetServers.prototype.setTarget = function(target) { ReplSetServers.prototype.isConnected = function() { // Return the state of the replicaset server - return this.primary != null && this._state.master != null && this._state.master.isConnected(); + return this.primary != null && this._state.master != null && this._state.master.isConnected() && !this.replicaSetChanged; } ReplSetServers.prototype.isPrimary = function(config) { @@ -330,6 +346,8 @@ var __executeAllCallbacksWithError = function(dbInstance, error) { } ReplSetServers.prototype.connect = function(parent, options, callback) { + // console.log("====================================================== ReplSetServers.prototype.connect") + var self = this; var dateStamp = new Date().getTime(); // console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp) if('function' === typeof options) callback = options, options = {}; @@ -340,6 +358,78 @@ ReplSetServers.prototype.connect = function(parent, options, callback) { this.db = parent; // Set server state to connecting this._serverState = 'connecting'; + // No change of the replicaset + this.replicaSetChanged = false; + + // Replicaset check state frequency + var replicasetCheckFrequencey = this.replicaSetCheckInterval; + // Just set up timer checking the state of the replicaset + var checkStateOfReplicasetFunction = function() { + if(self._serverState == 'disconnected') return; + // If we have a valid master connection let's attempt to perform a query to get the status of + // The replicaset server + var connection = self.checkoutWriter(); + + // If we have a connection + if(connection != null) { + self.db.admin().command({replSetGetStatus:1}, {connection:connection}, function(err, result) { + // If no error let's look at the members + if(err == null) { + // For all members check if we have the same ones in the current set + var members = result.documents[0].members; + // console.log("-------------------------------------------------- CHECK STATUS") + // console.log("members.length = " + members.length) + // console.log("Object.keys(self._state.addresses).length = " + Object.keys(self._state.addresses).length) + + // Peform a set of check to see if the replicaset has changed + if(self._state.addresses != null && Object.keys(self._state.addresses).length != members.length) { + // We have changed update replicaset object to restart connections + self.replicaSetChanged = true; + + // console.log("========================================================= CHECK REPLICASET STATUS 0"); + // console.dir("self.replicaSetChanged = " + self.replicaSetChanged); + } else { + // We need to dig a bit deeper and check if the roles of different servers have changed + // Located members must have the same setup (being secondary etc) for it to be equivalent + var numberOfFoundMembersWithSameSetup = 0; + + // Check for the member to see if it's anywhere + for(var i = 0; i < members.length; i++) { + // Get member + var member = members[i]; + // If it's a primary check that is a primary in the current set aswell + if(member.state == STATE_PRIMARY && self._state.master != null && (self._state.master.host + ":" + self._state.master.port) == member.name) { + numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1; + } else if(member.state == STATE_SECONDARY && (self._state.secondaries[member.name] != null || self._state.passives[member.name] != null)) { + numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1; + } else if(member.state == STATE_ARBITER && self._state.arbiters[member.name] != null) { + numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1; + } + } + + if(numberOfFoundMembersWithSameSetup != members.length) { + self.replicaSetChanged = true; + } else { + setTimeout(checkStateOfReplicasetFunction, replicasetCheckFrequencey); + } + + // Let's check if we need to reconfigure the replicaset + // console.log("========================================================= CHECK REPLICASET STATUS 1"); + // console.dir("self.replicaSetChanged = " + self.replicaSetChanged); + // console.dir("numberOfFoundMembersWithSameSetup = " + numberOfFoundMembersWithSameSetup); + // console.dir("members.length = " + members.length); + } + } else { + setTimeout(checkStateOfReplicasetFunction, replicasetCheckFrequencey); + } + }) + } else { + setTimeout(checkStateOfReplicasetFunction, replicasetCheckFrequencey); + } + } + + // Start timeout + setTimeout(checkStateOfReplicasetFunction, replicasetCheckFrequencey); // Reference to the instance var replSetSelf = this; @@ -350,27 +440,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) { var numberOfServersLeftToInitialize = serverConnections.length; // Clean up state - replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[]}; - // console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 0") - - // // Add a close event handler to ourselves to notify the parent - // this.on("close", function() { - // // console.log("----------------------------------------------------------------------- close") - // - // // Stop instance - // if(replSetSelf.strategyInstance != null) { - // // Stop the strategy - // replSetSelf.strategyInstance.stop(function() { - // // Emit close - // parent.emit("close"); - // }) - // } else { - // // Emit close - // parent.emit("close"); - // } - // }) - - // console.log("CONNECT CONNECT CONNECT CONNECT CONNECT CONNECT CONNECT CONNECT CONNECT CONNECT") + replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]}; // Create a connection handler var connectionHandler = function(instanceServer) { @@ -398,158 +468,167 @@ ReplSetServers.prototype.connect = function(parent, options, callback) { var tags = document.tags ? document.tags : {}; var primary = document.primary; var me = document.me; - - // console.log("============================================================================") - // console.dir(document) - - // Handle a closed connection - var closeHandler = function(err, server) { - // console.log("====================================================== replset::server::close") - // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply - replSetSelf.close(function() { - __executeAllCallbacksWithError(parent, new Error("connection closed")); - // Emit error - parent.emit("timeout", new Error("connection closed")); - }); - } - - // Handle a connection timeout - var timeoutHandler = function(err, server) { - // console.log("====================================================== replset::server::timeout") - // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply - replSetSelf.close(function() { - __executeAllCallbacksWithError(parent, new Error("connection timed out")); - // Emit error - parent.emit("timeout", new Error("connection timed out")); - }); - } - - // Handle an error - var errorHandler = function(err, server) { - // console.log("====================================================== replset::server::error") - // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply - replSetSelf.close(function() { - __executeAllCallbacksWithError(parent, err); - // Emit error - parent.emit("error", err); - }); - } - // Add error handler to the instance of the server - instanceServer.on("close", closeHandler); - // Add error handler to the instance of the server - instanceServer.on("error", errorHandler); - // instanceServer.on("timeout", errorHandler); - instanceServer.on("timeout", timeoutHandler); - // Add tag info - instanceServer.tags = tags; - - // For each tag in tags let's add the instance Server to the list for that tag - if(tags != null && typeof tags === 'object') { - var tagKeys = Object.keys(tags); - // For each tag file in the server add it to byTags - for(var i = 0; i < tagKeys.length; i++) { - var value = tags[tagKeys[i]]; - // Check if we have a top level tag object - if(replSetSelf._state.byTags[tagKeys[i]] == null) replSetSelf._state.byTags[tagKeys[i]] = {}; - // For the value check if we have an array of server instances - if(!Array.isArray(replSetSelf._state.byTags[tagKeys[i]][value])) replSetSelf._state.byTags[tagKeys[i]][value] = []; - // Check that the instance is not already registered there - var valueArray = replSetSelf._state.byTags[tagKeys[i]][value]; - var found = false; - - // Iterate over all values - for(var j = 0; j < valueArray.length; j++) { - if(valueArray[j].host == instanceServer.host && valueArray[j].port == instanceServer.port) { - found = true; - break; + // Do not add the server if it exists + if(isMaster == true || secondary == true || arbiterOnly == true) { + // console.log("==================================================================== document") + // console.dir(document) + + // Handle a closed connection + var closeHandler = function(err, server) { + // console.log("====================================================== replset::server::close") + // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply + replSetSelf.close(function() { + __executeAllCallbacksWithError(parent, new Error("connection closed")); + // Emit error + parent.emit("timeout", new Error("connection closed")); + }); + } + + // Handle a connection timeout + var timeoutHandler = function(err, server) { + // console.log("====================================================== replset::server::timeout") + // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply + replSetSelf.close(function() { + __executeAllCallbacksWithError(parent, new Error("connection timed out")); + // Emit error + parent.emit("timeout", new Error("connection timed out")); + }); + } + + // Handle an error + var errorHandler = function(err, server) { + // console.log("====================================================== replset::server::error") + // Shut down the replicaset for now and Fire off all the callbacks sitting with no reply + replSetSelf.close(function() { + __executeAllCallbacksWithError(parent, err); + // Emit error + parent.emit("error", err); + }); + } + + // Add error handler to the instance of the server + instanceServer.on("close", closeHandler); + // Add error handler to the instance of the server + instanceServer.on("error", errorHandler); + // instanceServer.on("timeout", errorHandler); + instanceServer.on("timeout", timeoutHandler); + // Add tag info + instanceServer.tags = tags; + + // For each tag in tags let's add the instance Server to the list for that tag + if(tags != null && typeof tags === 'object') { + var tagKeys = Object.keys(tags); + // For each tag file in the server add it to byTags + for(var i = 0; i < tagKeys.length; i++) { + var value = tags[tagKeys[i]]; + // Check if we have a top level tag object + if(replSetSelf._state.byTags[tagKeys[i]] == null) replSetSelf._state.byTags[tagKeys[i]] = {}; + // For the value check if we have an array of server instances + if(!Array.isArray(replSetSelf._state.byTags[tagKeys[i]][value])) replSetSelf._state.byTags[tagKeys[i]][value] = []; + // Check that the instance is not already registered there + var valueArray = replSetSelf._state.byTags[tagKeys[i]][value]; + var found = false; + + // Iterate over all values + for(var j = 0; j < valueArray.length; j++) { + if(valueArray[j].host == instanceServer.host && valueArray[j].port == instanceServer.port) { + found = true; + break; + } } + + // If it was not found push the instance server to the list + if(!found) valueArray.push(instanceServer); } - - // If it was not found push the instance server to the list - if(!found) valueArray.push(instanceServer); } - } - // Remove from error list - delete replSetSelf._state.errors[me]; - - // Add our server to the list of finished servers - replSetSelf._state.addresses[me] = instanceServer; - - // Assign the set name - if(replSetSelf.replicaSet == null) { - replSetSelf._state.setName = setName; - } else if(replSetSelf.replicaSet != setName && replSetSelf._serverState != 'disconnected') { - replSetSelf._state.errorMessages.push(new Error("configured mongodb replicaset does not match provided replicaset [" + setName + "] != [" + replSetSelf.replicaSet + "]")); - // Set done - replSetSelf._serverState = 'disconnected'; - // ensure no callbacks get called twice - var internalCallback = callback; - callback = null; - // Return error message ignoring rest of calls - return internalCallback(replSetSelf._state.errorMessages[0], parent); - } - - // Let's add the server to our list of server types - if(secondary == true && (passive == false || passive == null)) { - replSetSelf._state.secondaries[me] = instanceServer; - } else if(arbiterOnly == true) { - replSetSelf._state.arbiters[me] = instanceServer; - } else if(secondary == true && passive == true) { - replSetSelf._state.passives[me] = instanceServer; - } else if(isMaster == true) { - replSetSelf._state.master = instanceServer; - } else if(isMaster == false && primary != null && replSetSelf._state.addresses[primary]) { - replSetSelf._state.master = replSetSelf._state.addresses[primary]; - } - - // Let's go throught all the "possible" servers in the replicaset - var candidateServers = hosts.concat(arbiters).concat(passives); - - // If we have new servers let's add them - for(var i = 0; i < candidateServers.length; i++) { - // Fetch the server string - var candidateServerString = candidateServers[i]; - // Add the server if it's not defined - if(replSetSelf._state.addresses[candidateServerString] == null) { - // Split the server string - var parts = candidateServerString.split(/:/); - if(parts.length == 1) { - parts = [parts[0], Connection.DEFAULT_PORT]; - } + // Remove from error list + delete replSetSelf._state.errors[me]; - // Default empty socket options object - var socketOptions = {}; - // If a socket option object exists clone it - if(replSetSelf.socketOptions != null) { - var keys = Object.keys(replSetSelf.socketOptions); - for(var i = 0; i < keys.length;i++) socketOptions[keys[i]] = replSetSelf.socketOptions[keys[i]]; - } - - // Add host information to socket options - socketOptions['host'] = parts[0]; - socketOptions['port'] = parseInt(parts[1]); + // Add our server to the list of finished servers + replSetSelf._state.addresses[me] = instanceServer; - // Create a new server instance - var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions - , logger:replSetSelf.logger, ssl:replSetSelf.ssl}); - - // Add handlers - newServer.on("close", closeHandler); - newServer.on("timeout", timeoutHandler); - newServer.on("error", errorHandler); + // Assign the set name + if(replSetSelf.replicaSet == null) { + replSetSelf._state.setName = setName; + } else if(replSetSelf.replicaSet != setName && replSetSelf._serverState != 'disconnected') { + replSetSelf._state.errorMessages.push(new Error("configured mongodb replicaset does not match provided replicaset [" + setName + "] != [" + replSetSelf.replicaSet + "]")); + // Set done + replSetSelf._serverState = 'disconnected'; + // ensure no callbacks get called twice + var internalCallback = callback; + callback = null; + // Return error message ignoring rest of calls + return internalCallback(replSetSelf._state.errorMessages[0], parent); + } + + // Let's add the server to our list of server types + if(secondary == true && (passive == false || passive == null)) { + replSetSelf._state.secondaries[me] = instanceServer; + } else if(arbiterOnly == true) { + replSetSelf._state.arbiters[me] = instanceServer; + } else if(secondary == true && passive == true) { + replSetSelf._state.passives[me] = instanceServer; + } else if(isMaster == true) { + replSetSelf._state.master = instanceServer; + } else if(isMaster == false && primary != null && replSetSelf._state.addresses[primary]) { + replSetSelf._state.master = replSetSelf._state.addresses[primary]; + } + + // Let's go throught all the "possible" servers in the replicaset + var candidateServers = hosts.concat(arbiters).concat(passives); + + // If we have new servers let's add them + for(var i = 0; i < candidateServers.length; i++) { + // Fetch the server string + var candidateServerString = candidateServers[i]; + // Add the server if it's not defined + if(replSetSelf._state.addresses[candidateServerString] == null) { + // Split the server string + var parts = candidateServerString.split(/:/); + if(parts.length == 1) { + parts = [parts[0], Connection.DEFAULT_PORT]; + } + + // Default empty socket options object + var socketOptions = {}; + // If a socket option object exists clone it + if(replSetSelf.socketOptions != null) { + var keys = Object.keys(replSetSelf.socketOptions); + for(var i = 0; i < keys.length;i++) socketOptions[keys[i]] = replSetSelf.socketOptions[keys[i]]; + } - // Add server to list, ensuring we don't get a cascade of request to the same server - replSetSelf._state.addresses[candidateServerString] = newServer; + // Add host information to socket options + socketOptions['host'] = parts[0]; + socketOptions['port'] = parseInt(parts[1]); - // Add a new server to the total number of servers that need to initialized before we are done - numberOfServersLeftToInitialize = numberOfServersLeftToInitialize + 1; + // Create a new server instance + var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions + , logger:replSetSelf.logger, ssl:replSetSelf.ssl}); - // Let's set up a new server instance - newServer.connect(parent, {returnIsMasterResults: true, eventReceiver:newServer}, connectionHandler(newServer)); - } - } + // Add handlers + newServer.on("close", closeHandler); + newServer.on("timeout", timeoutHandler); + newServer.on("error", errorHandler); + + // Add server to list, ensuring we don't get a cascade of request to the same server + replSetSelf._state.addresses[candidateServerString] = newServer; + + // Add a new server to the total number of servers that need to initialized before we are done + numberOfServersLeftToInitialize = numberOfServersLeftToInitialize + 1; + + // Let's set up a new server instance + newServer.connect(parent, {returnIsMasterResults: true, eventReceiver:newServer}, connectionHandler(newServer)); + } + } + } else { + // console.log("========================================================= NOT A FREAKING SERVER") + // console.dir(Object.keys(replSetSelf._state.addresses)) + // Remove the instance from out list of servers + delete replSetSelf._state.addresses[me]; + // console.dir(Object.keys(replSetSelf._state.addresses)) + } } // If done finish up @@ -600,82 +679,59 @@ ReplSetServers.prototype.connect = function(parent, options, callback) { } } else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) { replSetSelf._serverState = 'disconnected'; - // Force close all connections - // replSetSelf.close(function() { - // replSetSelf.close(); - // ensure no callbacks get called twice - var internalCallback = callback; - callback = null; - // Perform callback - internalCallback(new Error("no secondary server found"), null); - // }); + // ensure no callbacks get called twice + var internalCallback = callback; + callback = null; + // Perform callback + internalCallback(new Error("no secondary server found"), null); } else { replSetSelf._serverState = 'disconnected'; - // Force close all connections - // replSetSelf.close(function() { - // ensure no callbacks get called twice - var internalCallback = callback; - callback = null; - // Perform callback - internalCallback(new Error("no primary server found"), null); - // }); + // ensure no callbacks get called twice + var internalCallback = callback; + callback = null; + // Perform callback + internalCallback(new Error("no primary server found"), null); } } else if((numberOfServersLeftToInitialize == 0) && replSetSelf._state.errorMessages.length > 0 && replSetSelf._serverState != 'disconnected') { // Set done replSetSelf._serverState = 'disconnected'; - // Force close all connections - // replSetSelf.close(); - // replSetSelf.close(function() { - // ensure no callbacks get called twice - var internalCallback = callback; - callback = null; - // Callback to signal we are done - internalCallback(replSetSelf._state.errorMessages[0], null); - // }); + // ensure no callbacks get called twice + var internalCallback = callback; + callback = null; + // Callback to signal we are done + internalCallback(replSetSelf._state.errorMessages[0], null); } } } - // console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 1") // Ensure we have all registered servers in our set for(var i = 0; i < serverConnections.length; i++) { replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port] = serverConnections[i]; } - // console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 2") // Initialize all the connections for(var i = 0; i < serverConnections.length; i++) { - // try { - // Set up the logger for the server connection - serverConnections[i].logger = replSetSelf.logger; - // Default empty socket options object - var socketOptions = {}; - // If a socket option object exists clone it - if(this.socketOptions != null && typeof this.socketOptions === 'object') { - var keys = Object.keys(this.socketOptions); - for(var j = 0; j < keys.length;j++) socketOptions[keys[j]] = this.socketOptions[keys[j]]; - } - - // If ssl is specified - if(replSetSelf.ssl) serverConnections[i].ssl = true; + // Set up the logger for the server connection + serverConnections[i].logger = replSetSelf.logger; + // Default empty socket options object + var socketOptions = {}; + // If a socket option object exists clone it + if(this.socketOptions != null && typeof this.socketOptions === 'object') { + var keys = Object.keys(this.socketOptions); + for(var j = 0; j < keys.length;j++) socketOptions[keys[j]] = this.socketOptions[keys[j]]; + } + + // If ssl is specified + if(replSetSelf.ssl) serverConnections[i].ssl = true; - // Add host information to socket options - socketOptions['host'] = serverConnections[i].host; - socketOptions['port'] = serverConnections[i].port; - - // Set the socket options - serverConnections[i].socketOptions = socketOptions; - // Connect to server - serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, connectionHandler(serverConnections[i])); - // } catch (err) { - // numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1; - // // Remove from list off addresses, close down and fire error - // replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port] - // // Close connections - // replSetSelf.close(); - // // Add error message - // replSetSelf._state.errorMessages.push(err); - // } + // Add host information to socket options + socketOptions['host'] = serverConnections[i].host; + socketOptions['port'] = serverConnections[i].port; + + // Set the socket options + serverConnections[i].socketOptions = socketOptions; + // Connect to server + serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, connectionHandler(serverConnections[i])); } // console.log("================================== ReplSetServers.prototype.connect :: " + dateStamp + " :: 3") @@ -826,7 +882,7 @@ ReplSetServers.prototype.close = function(callback) { // Clear out state if we are done if(numberOfServersToClose == 0) { // Clear out state - self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[]}; + self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]}; } // If we are finished perform the call back diff --git a/lib/mongodb/db.js b/lib/mongodb/db.js index fac2e6694b..0f882c6135 100644 --- a/lib/mongodb/db.js +++ b/lib/mongodb/db.js @@ -301,9 +301,15 @@ Db.prototype.collection = function(collectionName, options, callback) { try { var collection = new Collection(self, collectionName, self.pkFactory, options); } catch(err) { - return callback(err, null); + if(callback == null) { + throw err; + } else { + return callback(err, null); + } } - return callback(null, collection); + + // If we have no callback return collection object + return callback == null ? collection : callback(null, collection); } }; @@ -413,22 +419,12 @@ Db.prototype.authenticate = function(username, password, callback) { // Execute all four this._executeQueryCommand(DbCommand.createGetNonceCommand(self), {onAll:true}, function(err, result, connection) { - // console.log("------------------------------------------- nonce") - // console.dir(err) - // console.dir(result) - // Execute on all the connections if(err == null) { // Nonce used to make authentication request with md5 hash var nonce = result.documents[0].nonce; - // console.log("=============================================================") - // console.dir(DbCommand.createAuthenticationCommand(self, username, password, nonce), {connection:connection}) - // Execute command self._executeQueryCommand(DbCommand.createAuthenticationCommand(self, username, password, nonce), {connection:connection}, function(err, result) { - // console.log("------------------------------------------- auth") - // console.dir(err) - // console.dir(result) // Ensure we save any error if(err) { errorObject = err; @@ -609,8 +605,9 @@ Db.prototype.executeDbCommand = function(command_hash, options, callback) { /** Runs a command on the database as admin **/ -Db.prototype.executeDbAdminCommand = function(command_hash, callback) { - this._executeQueryCommand(DbCommand.createAdminDbCommand(this, command_hash), callback); +Db.prototype.executeDbAdminCommand = function(command_hash, options, callback) { + if(callback == null) { callback = options; options = {}; } + this._executeQueryCommand(DbCommand.createAdminDbCommand(this, command_hash, options), callback); }; /** @@ -1112,8 +1109,6 @@ exports.connect = function(url, options, callback) { // Ugh, we have to figure out which options go to which constructor manually. urlOptions.forEach(function(opt) { - // console.log(opt) - if (!opt) return; var splitOpt = opt.split('='), name = splitOpt[0], value = splitOpt[1]; diff --git a/test/replicaset/changing_replicaset_test.js b/test/replicaset/changing_replicaset_test.js new file mode 100644 index 0000000000..9523c0ef7c --- /dev/null +++ b/test/replicaset/changing_replicaset_test.js @@ -0,0 +1,286 @@ +var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false; + +var testCase = require('../../deps/nodeunit').testCase, + debug = require('util').debug, + inspect = require('util').inspect, + gleak = require('../../tools/gleak'), + ReplicaSetManager = require('../tools/replica_set_manager').ReplicaSetManager, + Db = require('../../lib/mongodb').Db, + ReplSetServers = require('../../lib/mongodb').ReplSetServers, + Server = require('../../lib/mongodb').Server, + Step = require("../../deps/step/lib/step"); + +// Keep instance of ReplicaSetManager +var serversUp = false; +var retries = 120; +var RS = RS == null ? null : RS; + +var ensureConnection = function(test, numberOfTries, callback) { + // Replica configuration + var replSet = new ReplSetServers( [ + new Server( RS.host, RS.ports[1], { auto_reconnect: true } ), + new Server( RS.host, RS.ports[0], { auto_reconnect: true } ), + new Server( RS.host, RS.ports[2], { auto_reconnect: true } ) + ], + {rs_name:RS.name} + ); + + if(numberOfTries <= 0) return callback(new Error("could not connect correctly"), null); + + var db = new Db('integration_test_', replSet); + // Print any errors + db.on("error", function(err) { + console.log("============================= ensureConnection caught error") + console.dir(err) + if(err != null && err.stack != null) console.log(err.stack) + db.close(); + }) + + // Open the db + db.open(function(err, p_db) { + db.close(); + if(err != null) { + // Wait for a sec and retry + setTimeout(function() { + numberOfTries = numberOfTries - 1; + ensureConnection(test, numberOfTries, callback); + }, 1000); + } else { + return callback(null, p_db); + } + }) +} + +var identifyServers = function(rs, dbname, callback) { + // Total number of servers to query + var numberOfServersToCheck = Object.keys(rs.mongods).length; + + // Arbiters + var arbiters = []; + var secondaries = []; + var primary = null; + + // Let's establish what all servers so we can pick targets for our queries + var keys = Object.keys(rs.mongods); + for(var i = 0; i < keys.length; i++) { + var host = rs.mongods[keys[i]].host; + var port = rs.mongods[keys[i]].port; + + // Connect to the db and query the state + var server = new Server(host, port,{auto_reconnect: true}); + // Create db instance + var db = new Db(dbname, server, {native_parser: (process.env['TEST_NATIVE'] != null)}); + // Connect to the db + db.open(function(err, db) { + numberOfServersToCheck = numberOfServersToCheck - 1; + if(db.serverConfig.isMasterDoc.ismaster) { + primary = {host:db.serverConfig.host, port:db.serverConfig.port}; + } else if(db.serverConfig.isMasterDoc.secondary) { + secondaries.push({host:db.serverConfig.host, port:db.serverConfig.port}); + } else if(db.serverConfig.isMasterDoc.arbiterOnly) { + arbiters.push({host:db.serverConfig.host, port:db.serverConfig.port}); + } + + // Close the db + db.close(); + // If we are done perform the callback + if(numberOfServersToCheck <= 0) { + callback(null, {primary:primary, secondaries:secondaries, arbiters:arbiters}); + } + }) + } +} + +module.exports = testCase({ + setUp: function(callback) { + // Create instance of replicaset manager but only for the first call + if(!serversUp && !noReplicasetStart) { + serversUp = true; + RS = new ReplicaSetManager({retries:120, secondary_count:2, passive_count:1, arbiter_count:1}); + RS.startSet(true, function(err, result) { + if(err != null) throw err; + // Finish setup + callback(); + }); + } else { + RS.restartKilledNodes(function(err, result) { + if(err != null) throw err; + callback(); + }) + } + }, + + tearDown: function(callback) { + callback(); + }, + + 'Basic replicaset changes removing a secondary server from the set, should be reflected in the driver' : function(test) { + // Fetch all the identity servers + identifyServers(RS, 'integration_test_', function(err, servers) { + // Replica configuration + var replSet = new ReplSetServers( [ + new Server( RS.host, RS.ports[1], { auto_reconnect: true } ), + new Server( RS.host, RS.ports[0], { auto_reconnect: true } ), + new Server( RS.host, RS.ports[2], { auto_reconnect: true } ) + ], + {rs_name:RS.name, readPreference:Server.READ_SECONDARY} + ); + + // Are we done processing + var timeoutInterval = 1000; + var numberOfStepsDone = 0; + var newConfig = null; + + // Replicaset server setup + var replDb = new Db('integration_test_', replSet, {native_parser: (process.env['TEST_NATIVE'] != null)}); + replDb.open(function(err, replDb) { + // Var checking function + var checking = function() { + // First step let's do a reconfig of the replicaset + if(numberOfStepsDone == 0) { + // Update to the next step + numberOfStepsDone = numberOfStepsDone + 1; + // Connect directly to the primary server to change the config setup + var _server = new Server(servers.primary.host, servers.primary.port, {auto_reconnect: true}); + // Create db instance + var _db = new Db('integration_test_', _server, {native_parser: (process.env['TEST_NATIVE'] != null)}); + _db.open(function(err, _db) { + // The number of members to remove + var numberOfMembersToRemove = 1; + + // Let's change the configuration set and update the replicaset + newConfig = JSON.parse(JSON.stringify(RS.config)); + var members = newConfig.members; + // Remove one of the secondaries + for(var i = 0; i < members.length; i++) { + if(members[i].arbiterOnly == null && (servers.primary.host + ":" + servers.primary.port) != members[i].host) { + numberOfMembersToRemove = numberOfMembersToRemove - 1; + members.splice(i, 1); + + // Stop removing members + if(numberOfMembersToRemove == 0) { + break; + } + } + } + + // Reassign the array + newConfig.members = members; + // Adjust version + newConfig.version = newConfig.version + 1; + + // Issue replicaset reconfig command to server + _db.admin().command({replSetReconfig:newConfig}, function(err, result) { + test.equal(null, err); + // Close the db connection + _db.close(); + // Let's do some queries + setTimeout(checking, timeoutInterval); + }); + }); + } else if(numberOfStepsDone < 10) { + replDb.collection('somecollection').find().toArray(function(err, items) { + numberOfStepsDone = numberOfStepsDone + 1; + setTimeout(checking, timeoutInterval); + }); + } else if(numberOfStepsDone == 10) { + // Check that we have the right setup + test.ok(replDb.serverConfig._state.master != null); + test.equal(1, Object.keys(replDb.serverConfig._state.arbiters).length); + test.equal(2, Object.keys(replDb.serverConfig._state.secondaries).length + + Object.keys(replDb.serverConfig._state.passives).length) + + // Restore the original setup + numberOfStepsDone = numberOfStepsDone + 1; + // Connect directly to the primary server to change the config setup + var _server = new Server(servers.primary.host, servers.primary.port, {auto_reconnect: true}); + // Create db instance + var _db = new Db('integration_test_', _server, {native_parser: (process.env['TEST_NATIVE'] != null)}); + _db.open(function(err, _db) { + var version = newConfig.version; + // Let's change the configuration set and update the replicaset + newConfig = JSON.parse(JSON.stringify(RS.config)); + // Adjust version + newConfig.version = version + 1; + + // Issue replicaset reconfig command to server + _db.admin().command({replSetReconfig:newConfig}, function(err, result) { + test.equal(null, err); + // Close the db connection + _db.close(); + // Let's do some queries + setTimeout(checking, timeoutInterval); + }); + }); + } else if(numberOfStepsDone < 20) { + replDb.collection('somecollection').find().toArray(function(err, items) { + numberOfStepsDone = numberOfStepsDone + 1; + setTimeout(checking, timeoutInterval); + }); + } else { + // Check that we have the right setup + test.ok(replDb.serverConfig._state.master != null); + test.equal(1, Object.keys(replDb.serverConfig._state.arbiters).length); + test.equal(3, Object.keys(replDb.serverConfig._state.secondaries).length + + Object.keys(replDb.serverConfig._state.passives).length) + + replDb.close(); + test.done(); + } + } + + // Let's boot up a checking loop + var intervalId = setTimeout(checking, timeoutInterval); + }) + }); + + // // Execute flag + // var executedCorrectly = false; + // + // // Create db instance + // var db = new Db('integration_test_', replSet, {native_parser: (process.env['TEST_NATIVE'] != null)}); + // // Connect to the db + // db.open(function(err, p_db) { + // // Let's get the primary server and wrap the checkout Method to ensure it's the one called for read + // var checkoutWriterMethod = p_db.serverConfig._state.master.checkoutWriter; + // // Set up checkoutWriter to catch correct write request + // p_db.serverConfig._state.master.checkoutWriter = function() { + // executedCorrectly = true; + // return checkoutWriterMethod.apply(this); + // } + // + // // Grab the collection + // db.collection("read_preference_replicaset_test_0", function(err, collection) { + // // Attempt to read (should fail due to the server not being a primary); + // collection.find().toArray(function(err, items) { + // // Does not get called or we don't care + // test.ok(executedCorrectly); + // p_db.close(); + // test.done(); + // }); + // }); + // }); + }, + + noGlobalsLeaked : function(test) { + var leaks = gleak.detectNew(); + test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', ')); + test.done(); + } +}) + + + + + + + + + + + + + + + + diff --git a/test/tools/replica_set_manager.js b/test/tools/replica_set_manager.js index 80896da680..13a45187cf 100644 --- a/test/tools/replica_set_manager.js +++ b/test/tools/replica_set_manager.js @@ -17,7 +17,7 @@ var ReplicaSetManager = exports.ReplicaSetManager = function(options) { this.name = options["name"] != null ? options["name"] : "replica-set-foo"; this.host = options["host"] != null ? options["host"] : "localhost"; this.retries = options["retries"] != null ? options["retries"] : 60; - this.config = {"_id": this.name, "members": []}; + this.config = {"_id": this.name, "version": 1, "members": []}; this.durable = options["durable"] != null ? options["durable"] : false; this.auth = options['auth'] != null ? options['auth'] : false; this.path = path.resolve("data"); @@ -144,11 +144,7 @@ ReplicaSetManager.prototype.initiate = function(callback) { var done = false; // Get master connection self.getConnection(function(err, connection) { - if(err != null) return callback(err, null); - - // debug("=================================================== replicaset config") - // debug(inspect(self.config)) - + if(err != null) return callback(err, null); // Set replica configuration connection.admin().command({replSetInitiate:self.config}, function(err, result) { // Close connection