Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Working some more on HA ability using setTimeout

  • Loading branch information...
commit a39d2610679fa4d7a7be5b1a300c3aed85b1f84c 1 parent e043042
Christian Amor Kvalheim authored
236 lib/mongodb/connection/repl_set_servers.js
View
@@ -44,10 +44,11 @@ var ReplSetServers = exports.ReplSetServers = function(servers, options) {
this.readSecondary = this.options["read_secondary"];
this.slaveOk = true;
this.closedConnectionCount = 0;
- this._used = false;
+ this._used = false;
// Default poolSize for new server instances
this.poolSize = this.options.poolSize == null ? 1 : this.options.poolSize;
+ this._currentServerChoice = 0;
// Set up ssl connections
this.ssl = this.options.ssl == null ? false : this.options.ssl;
@@ -77,7 +78,8 @@ var ReplSetServers = exports.ReplSetServers = function(servers, options) {
}
// Strategy for picking a secondary
- this.strategy = this.options['strategy'] == null ? 'statistical' : this.options['strategy'];
+ // this.strategy = this.options['strategy'] == null ? 'statistical' : this.options['strategy'];
+ this.strategy = this.options['strategy'];
// Make sure strategy is one of the two allowed
if(this.strategy != null && (this.strategy != 'ping' && this.strategy != 'statistical')) throw new Error("Only ping or statistical strategies allowed");
// Let's set up our strategy object for picking secodaries
@@ -218,6 +220,12 @@ var ReplSetServers = exports.ReplSetServers = function(servers, options) {
return this._state != null ? this._state.master : null;
}
});
+
+ // Enabled ha
+ this.haEnabled = true;
+ // How often are we checking for new servers in the replicaset
+ this.replicasetStatusCheckInterval = 2000;
+ this._replicasetTimeoutId = null;
};
inherits(ReplSetServers, EventEmitter);
@@ -371,8 +379,9 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'byTags':{}, 'setName':null, 'errorMessages':[], 'members':[]};
// Create a connection handler
- var connectionHandler = function(instanceServer) {
+ self.connectionHandler = null != self.connectionHandler ? sellf.connectionHandler : function(instanceServer) {
return function(err, result) {
+ // console.log("=============================== connection established")
// Don't attempt to connect if we are done
// if(replSetSelf._serverState === 'disconnected') return;
// Remove a server from the list of intialized servers we need to perform
@@ -394,12 +403,45 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
var passives = Array.isArray(document.passives) ? document.passives : [];
var tags = document.tags ? document.tags : {};
var primary = document.primary;
+
+ // Ensure we are keying on the same name for lookups as mongodb might return
+ // dns name and the driver is using ip's
+
+ // Rename the connection so we are keying on the name used by mongod
+ var userProvidedServerString = instanceServer.host + ":" + instanceServer.port;
var me = document.me;
-
+
+ // If we have user provided entries already, switch them to avoid additional
+ // open connections
+ if(replSetSelf._state['addresses'][userProvidedServerString]) {
+ // Fetch server
+ var server = replSetSelf._state['addresses'][userProvidedServerString];
+ // Remove entry
+ delete replSetSelf._state['addresses'][userProvidedServerString];
+ // Remove other entries
+ if(replSetSelf._state['secondaries'][userProvidedServerString]) {
+ delete replSetSelf._state['secondaries'][userProvidedServerString];
+ replSetSelf._state['secondaries'][me] = server;
+ } else if(replSetSelf._state['passives'][userProvidedServerString]) {
+ delete replSetSelf._state['passives'][userProvidedServerString];
+ replSetSelf._state['passives'][me] = server;
+ } else if(replSetSelf._state['arbiters'][userProvidedServerString]) {
+ delete replSetSelf._state['arbiters'][userProvidedServerString];
+ replSetSelf._state['arbiters'][me] = server;
+ }
+
+ // Set name of the server
+ server.name = me;
+ // Add the existing one to the replicaset list of addresses
+ replSetSelf._state['addresses'][me] = server;
+ } else {
+ instanceServer.name = me;
+ }
+
// Only add server to our internal list if it's a master, secondary or arbiter
if(isMaster == true || secondary == true || arbiterOnly == true) {
// Handle a closed connection
- var closeHandler = function(err, server) {
+ replSetSelf.closeHandler = null != replSetSelf.closeHandler ? replSetSelf.closeHandler : function(err, server) {
var closeServers = function() {
// Set the state to disconnected
parent._state = 'disconnected';
@@ -428,12 +470,15 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Check if this is the primary server, then disconnect otherwise keep going
if(replSetSelf._state.master != null) {
var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
- var errorServerAddress = server.host + ":" + server.port;
+ // var errorServerAddress = server.host + ":" + server.port;
+ var errorServerAddress = server.name;
// Only shut down the set if we have a primary server error
if(primaryAddress == errorServerAddress) {
closeServers();
} else {
+ // console.log("=================================== REMOVE SERVER :: " + errorServerAddress)
+
// Remove from the list of servers
delete replSetSelf._state.addresses[errorServerAddress];
// Locate one of the lists and remove
@@ -445,6 +490,9 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
delete replSetSelf._state.passives[errorServerAddress];
}
+ // console.log("================================================= CLOSE::ADDRESSES")
+ // console.dir(Object.keys(replSetSelf._state.addresses));
+
// Check if we are reading from Secondary only
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
closeServers();
@@ -456,7 +504,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
}
// Handle a connection timeout
- var timeoutHandler = function(err, server) {
+ replSetSelf.timeoutHandler = null != replSetSelf.timeoutHandler ? replSetSelf.timeoutHandler : function(err, server) {
var closeServers = function() {
// Set the state to disconnected
parent._state = 'disconnected';
@@ -485,7 +533,8 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Check if this is the primary server, then disconnect otherwise keep going
if(replSetSelf._state.master != null) {
var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
- var errorServerAddress = server.host + ":" + server.port;
+ var errorServerAddress = server.name;
+ // var errorServerAddress = server.host + ":" + server.port;
// Only shut down the set if we have a primary server error
if(primaryAddress == errorServerAddress) {
@@ -513,7 +562,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
}
// Handle an error
- var errorHandler = function(err, server) {
+ replSetSelf.errorHandler = null != replSetSelf.errorHandler ? replSetSelf.errorHandler : function(err, server) {
var closeServers = function() {
// Set the state to disconnected
parent._state = 'disconnected';
@@ -542,7 +591,8 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Check if this is the primary server, then disconnect otherwise keep going
if(replSetSelf._state.master != null) {
var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
- var errorServerAddress = server.host + ":" + server.port;
+ var errorServerAddress = server.name;
+ // var errorServerAddress = server.host + ":" + server.port;
// Only shut down the set if we have a primary server error
if(primaryAddress == errorServerAddress) {
@@ -575,11 +625,11 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
instanceServer.removeAllListeners("timeout");
// Add error handler to the instance of the server
- instanceServer.on("close", closeHandler);
+ instanceServer.on("close", replSetSelf.closeHandler);
// Add error handler to the instance of the server
- instanceServer.on("error", errorHandler);
+ instanceServer.on("error", replSetSelf.errorHandler);
// instanceServer.on("timeout", errorHandler);
- instanceServer.on("timeout", timeoutHandler);
+ instanceServer.on("timeout", replSetSelf.timeoutHandler);
// Add tag info
instanceServer.tags = tags;
@@ -629,6 +679,9 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Return error message ignoring rest of calls
return internalCallback(replSetSelf._state.errorMessages[0], parent);
}
+
+ // console.dir(me)
+ // console.dir(Object.keys(replSetSelf._state.addresses))
// Let's add the server to our list of server types
if(secondary == true && (passive == false || passive == null)) {
@@ -677,10 +730,10 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
newServer.replicasetInstance = replSetSelf;
// Add handlers
- newServer.on("close", closeHandler);
- newServer.on("timeout", timeoutHandler);
- newServer.on("error", errorHandler);
-
+ newServer.on("close", replSetSelf.closeHandler);
+ newServer.on("timeout", replSetSelf.timeoutHandler);
+ newServer.on("error", replSetSelf.errorHandler);
+
// Add server to list, ensuring we don't get a cascade of request to the same server
replSetSelf._state.addresses[candidateServerString] = newServer;
@@ -688,13 +741,15 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
numberOfServersLeftToInitialize = numberOfServersLeftToInitialize + 1;
// Let's set up a new server instance
- newServer.connect(parent, {returnIsMasterResults: true, eventReceiver:newServer}, connectionHandler(newServer));
+ newServer.connect(parent, {returnIsMasterResults: true, eventReceiver:newServer}, self.connectionHandler(newServer));
}
}
} else {
// Remove the instance from out list of servers
delete replSetSelf._state.addresses[me];
}
+ } else {
+ delete replSetSelf._state.addresses[instanceServer.host + ":" + instanceServer.port];
}
// If done finish up
@@ -713,6 +768,11 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
+ // Start up ha
+ if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
+ // console.log("============================ START :: 1")
+ replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
+ }
// Perform callback
internalCallback(null, parent);
})
@@ -720,6 +780,11 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
+ // Start up ha
+ if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
+ // console.log("============================ START :: 2")
+ replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
+ }
// Perform callback
internalCallback(null, parent);
}
@@ -733,6 +798,11 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
+ // Start up ha
+ if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
+ // console.log("============================ START :: 3")
+ replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
+ }
// Perform callback
internalCallback(null, parent);
})
@@ -740,6 +810,11 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
+ // Start up ha
+ if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
+ // console.log("============================ START :: 4")
+ replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
+ }
// Perform callback
internalCallback(null, parent);
}
@@ -777,7 +852,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
}
// Ensure we have all registered servers in our set
- for(var i = 0; i < serverConnections.length; i++) {
+ for(var i = 0; i < serverConnections.length; i++) {
replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port] = serverConnections[i];
}
@@ -805,7 +880,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Set the replicaset instance
serverConnections[i].replicasetInstance = replSetSelf;
// Connect to server
- serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, connectionHandler(serverConnections[i]));
+ serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, self.connectionHandler(serverConnections[i]));
}
// Check if we have an error in the inital set of servers and callback with error
@@ -816,6 +891,120 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Perform callback
internalCallback(replSetSelf._state.errorMessages[0], null);
}
+
+ // The checking function
+ this.replicasetCheckFunction = function() {
+ try {
+ // Retrieve a reader connection
+ var con = self.checkoutReader();
+ // If we have a connection and we have a db object
+ if(con != null && Array.isArray(self.dbInstances) && self.dbInstances.length > 0) {
+ var dbInstance = self.dbInstances[0];
+ dbInstance.admin().command({replSetGetStatus:1}, {connection:con}, function(err, result) {
+ // console.dir("-----------------------------")
+ // console.dir(err)
+ // console.dir(result)
+
+ // Paranoid android
+ if(null == err && null != result && null != result["documents"] && result["documents"].length > 0) {
+ // console.log("---------------------------------------------------------------------");
+ // console.dir(result["documents"][0]);
+
+ // For each member we need to check if we have a new connection that needs to be established
+ var members = result['documents'][0]['members'];
+
+ if(null != members) {
+ // The total members we check
+ var newServers = 0;
+ // Iterate over all existing members
+ for(var i = 0, jlen = members.length; i < jlen; i++) {
+ // Get a member
+ var member = members[i];
+
+ // console.log("================================================= HA::ADDRESSES")
+ // console.dir(Object.keys(self._state.addresses));
+ // console.dir(member)
+ // console.log("null != self._state :: " + (null != self._state))
+ // console.log("0 != member['health'] :: " + (0 != member['health']))
+ // console.log("null == self._state['addresses'][member['name']] :: " + (null == self._state['addresses'][member['name']]))
+ // If the node is healthy and it does not exist in the current replicaset, add it to the
+ // current setup
+ if(null != self._state && 0 != member['health'] && null == self._state['addresses'][member['name']]) {
+ // We need to add a server to the connection, this means going through the notions of establishing
+ // A completely new connection
+ // console.log("=============================== new server detected")
+ // console.dir(member)
+ // Found a new server
+ newServers = newServers + 1;
+
+ // Split the server string
+ var parts = member.name.split(/:/);
+ if(parts.length == 1) {
+ parts = [parts[0], Connection.DEFAULT_PORT];
+ }
+
+ // Default empty socket options object
+ var socketOptions = {};
+ // If a socket option object exists clone it
+ if(self.socketOptions != null) {
+ var keys = Object.keys(self.socketOptions);
+ for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = self.socketOptions[keys[i]];
+ }
+
+ // Add host information to socket options
+ socketOptions['host'] = parts[0];
+ socketOptions['port'] = parseInt(parts[1]);
+
+ // Create a new server instance
+ var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:false, 'socketOptions':socketOptions
+ , logger:self.logger, ssl:self.ssl, poolSize:self.poolSize});
+ // Set the replicaset instance
+ newServer.replicasetInstance = self;
+
+ // Add handlers
+ newServer.on("close", self.closeHandler);
+ newServer.on("timeout", self.timeoutHandler);
+ newServer.on("error", self.errorHandler);
+
+ // // Add server to list, ensuring we don't get a cascade of request to the same server
+ // replSetSelf._state.addresses[candidateServerString] = newServer;
+
+ // // Add a new server to the total number of servers that need to initialized before we are done
+ // numberOfServersLeftToInitialize = numberOfServersLeftToInitialize + 1;
+ var newServerCallback = self.connectionHandler(newServer);
+
+ // Let's set up a new server instance
+ newServer.connect(self.db, {returnIsMasterResults: true, eventReceiver:newServer}, function(err, result) {
+ // console.log("=========================== reconnected")
+ // Remove from number of newServers
+ newServers = newServers - 1;
+ // Call the setup
+ newServerCallback(err, result);
+ // If we have 0 new servers let's go back to rechecking
+ if(newServers <= 0) {
+ setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
+ }
+
+ // setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
+ });
+ }
+ }
+
+ // console.log("================================== newServers :: " + newServers)
+ // If we have no new servers check status again
+ if(newServers == 0) {
+ setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
+ }
+ }
+ }
+ });
+ }
+ } catch(err) {
+ // console.log("============================================== threw error")
+ // console.dir(err)
+ // setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
+ }
+ };
}
ReplSetServers.prototype.checkoutWriter = function() {
@@ -836,7 +1025,8 @@ ReplSetServers.prototype.checkoutReader = function() {
} else {
// Pick a random key
var keys = Object.keys(this._state.secondaries);
- var key = keys[Math.floor(Math.random() * keys.length)];
+ this._currentServerChoice = this._currentServerChoice % keys.length;
+ var key = keys[this._currentServerChoice++];
connection = this._state.secondaries[key].checkoutReader();
}
} else if(this._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(this._state.secondaries).length == 0) {
@@ -869,6 +1059,10 @@ ReplSetServers.prototype.checkoutReader = function() {
} else {
connection = this.checkoutWriter();
}
+ // console.log("============================== checkout reader = " + (connection.socketOptions.host + ":" + connection.socketOptions.port));
+ // console.dir(Object.keys(this._state.addresses))
+ // console.dir(Object.keys(this._state.secondaries))
+
// Return the connection
return connection;
}
2  lib/mongodb/db.js
View
@@ -910,7 +910,7 @@ Db.prototype.executeDbCommand = function(command_hash, options, callback) {
*/
Db.prototype.executeDbAdminCommand = function(command_hash, options, callback) {
if(callback == null) { callback = options; options = {}; }
- this._executeQueryCommand(DbCommand.createAdminDbCommand(this, command_hash, options), callback);
+ this._executeQueryCommand(DbCommand.createAdminDbCommand(this, command_hash), options, callback);
};
/**
1  test/index_test.js
View
@@ -768,6 +768,7 @@ exports.shouldCorrectlyUseMinMaxForSettingRangeInEnsureIndex = function(test) {
});
}
+
/**
* Retrieve the server information for the current
* instance of the db client
Please sign in to comment.
Something went wrong with that request. Please try again.