Permalink
Browse files

Cleaning up codE

  • Loading branch information...
1 parent 7fb81d6 commit 30ba06fd2298d339c3f3f0b5d6d71def7f15e516 @christkv committed Apr 1, 2012
Showing with 56 additions and 126 deletions.
  1. +24 −100 lib/mongodb/connection/repl_set.js
  2. +1 −1 lib/mongodb/db.js
  3. +31 −25 test/manual_tests/replicaset_manual_kill_test.js
@@ -21,12 +21,22 @@ const STATE_DOWN = 8;
const STATE_ROLLBACK = 9;
/**
-* ReplSet constructor provides master-slave functionality
-*
-* @param serverArr{Array of type Server}
-* @return constructor of ServerCluster
-*
-*/
+ * ReplSet constructor provides replicaset functionality
+ *
+ * Options
+ * - **ha** {Boolean, default:false}, turn on high availability.
+ * - **haInterval** {Number, default:2000}, time between each replicaset status check.
+ * - **reconnectWait** {Number, default:1000}, time to wait in miliseconds before attempting reconnect.
+ * - **retries** {Number, default:30}, number of times to attempt a replicaset reconnect.
+ * - **rs_name** {String}, the name of the replicaset to connect to.
+ * - **readPreference** {String}, the prefered read preference (Server.READ_PRIMARY, Server.READ_SECONDARY, Server.READ_SECONDARY_ONLY).
+ * - **read_secondary** {Boolean, deprecated}, allow reads from secondary.
+ * - **strategy** {String}, selection strategy for reads choose between (ping and statistical)
+ *
+ * @class Represents a Replicaset Configuration
+ * @param {Array} list of server objects participating in the replicaset.
+ * @param {Object} [options] additional options for the collection.
+ */
var ReplSet = exports.ReplSet = function(servers, options) {
// Set up basic
if(!(this instanceof ReplSet))
@@ -226,9 +236,9 @@ var ReplSet = exports.ReplSet = function(servers, options) {
});
// Enabled ha
- this.haEnabled = true;
+ this.haEnabled = this.options['ha'] == null ? false : this.options['ha'];
// How often are we checking for new servers in the replicaset
- this.replicasetStatusCheckInterval = 2000;
+ this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 2000 : this.options['haInterval'];
this._replicasetTimeoutId = null;
};
@@ -367,8 +377,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
if(options == null) options = {};
if(!('function' === typeof callback)) callback = null;
- // console.dir(callback)
-
// Keep reference to parent
this.db = parent;
// Set server state to connecting
@@ -391,23 +399,10 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Remove a server from the list of intialized servers we need to perform
self._numberOfServersLeftToInitialize = self._numberOfServersLeftToInitialize - 1;
- // console.log("=============================== connection established :: " + self._numberOfServersLeftToInitialize)
- // console.log(err)
- // console.log(result)
-
- // If we have encountered the error before return, otherwise add to the list of
- // errors
- // if(err != null && self._state.errors[instanceServer.name] != null) {
- // // console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 0")
- // return;
- // } else {
-
if(err != null) {
self._state.errors[instanceServer.name] = instanceServer;
}
- // Don't attempt to connect if we are done
- // if(replSetSelf._serverState === 'disconnected') return;
// Add enable query information
instanceServer.enableRecordQueryStats(replSetSelf.recordQueryStats);
@@ -427,12 +422,10 @@ ReplSet.prototype.connect = function(parent, options, callback) {
var primary = document.primary;
// Ensure we are keying on the same name for lookups as mongodb might return
- // dns name and the driver is using ip's
-
+ // dns name and the driver is using ip's
// Rename the connection so we are keying on the name used by mongod
var userProvidedServerString = instanceServer.host + ":" + instanceServer.port;
var me = document.me;
- // console.dir(document)
// If we have user provided entries already, switch them to avoid additional
// open connections
@@ -464,7 +457,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Only add server to our internal list if it's a master, secondary or arbiter
if(isMaster == true || secondary == true || arbiterOnly == true) {
// Handle a closed connection
- // replSetSelf.closeHandler = null != replSetSelf.closeHandler ? replSetSelf.closeHandler : function(err, server) {
replSetSelf.closeHandler = function(err, server) {
var closeServers = function() {
// Set the state to disconnected
@@ -479,7 +471,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Single callback only
var internalCallback = callback;
callback = null;
- // console.log("--------------------------------------- 0")
// Return the error
internalCallback(err, null);
} else {
@@ -502,8 +493,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
if(primaryAddress == errorServerAddress) {
closeServers();
} else {
- // console.log("=================================== REMOVE SERVER :: " + errorServerAddress)
-
// Remove from the list of servers
delete replSetSelf._state.addresses[errorServerAddress];
// Locate one of the lists and remove
@@ -515,9 +504,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
delete replSetSelf._state.passives[errorServerAddress];
}
- // console.log("================================================= CLOSE::ADDRESSES")
- // console.dir(Object.keys(replSetSelf._state.addresses));
-
// Check if we are reading from Secondary only
if(replSetSelf._readPreference == Server.READ_SECONDARY_ONLY && Object.keys(replSetSelf._state.secondaries).length == 0) {
closeServers();
@@ -529,7 +515,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
}
// Handle a connection timeout
- // replSetSelf.timeoutHandler = null != replSetSelf.timeoutHandler ? replSetSelf.timeoutHandler : function(err, server) {
replSetSelf.timeoutHandler = function(err, server) {
var closeServers = function() {
// Set the state to disconnected
@@ -544,7 +529,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Single callback only
var internalCallback = callback;
callback = null;
- // console.log("--------------------------------------- 1")
// Return the error
internalCallback(new Error("connection timed out"), null);
} else {
@@ -561,7 +545,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
if(replSetSelf._state.master != null) {
var primaryAddress = replSetSelf._state.master.host + ":" + replSetSelf._state.master.port;
var errorServerAddress = server.name;
- // var errorServerAddress = server.host + ":" + server.port;
// Only shut down the set if we have a primary server error
if(primaryAddress == errorServerAddress) {
@@ -589,7 +572,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
}
// Handle an error
- // replSetSelf.errorHandler = null != replSetSelf.errorHandler ? replSetSelf.errorHandler : function(err, server) {
replSetSelf.errorHandler = function(err, server) {
var closeServers = function() {
// Set the state to disconnected
@@ -604,7 +586,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Single callback only
var internalCallback = callback;
callback = null;
- // console.log("--------------------------------------- 2")
// Return the error
internalCallback(err, null);
} else {
@@ -706,13 +687,9 @@ ReplSet.prototype.connect = function(parent, options, callback) {
var internalCallback = callback;
callback = null;
// Return error message ignoring rest of calls
- // console.log("--------------------------------------- 4")
return internalCallback(replSetSelf._state.errorMessages[0], parent);
}
- // console.dir(me)
- // console.dir(Object.keys(replSetSelf._state.addresses))
-
// Let's add the server to our list of server types
if(secondary == true && (passive == false || passive == null)) {
replSetSelf._state.secondaries[me] = instanceServer;
@@ -736,8 +713,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Add the server if it's not defined and not already errored out
if(null == replSetSelf._state.addresses[candidateServerString]
&& null == replSetSelf._state.errors[candidateServerString]) {
- // console.log("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ adding new server :: " + candidateServerString)
-
// Split the server string
var parts = candidateServerString.split(/:/);
if(parts.length == 1) {
@@ -803,26 +778,20 @@ ReplSet.prototype.connect = function(parent, options, callback) {
callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
- // console.log("============================ START :: 1")
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
}
// Perform callback
- // console.log("--------------------------------------- 5")
internalCallback(null, parent);
})
} else {
- // console.log("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
- // console.dir(callback)
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
- // console.log("============================ START :: 2")
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
}
// Perform callback
- // console.log("--------------------------------------- 6")
internalCallback(null, parent);
}
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length > 0) {
@@ -837,11 +806,9 @@ ReplSet.prototype.connect = function(parent, options, callback) {
callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
- // console.log("============================ START :: 3")
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
}
// Perform callback
- // console.log("--------------------------------------- 7")
internalCallback(null, parent);
})
} else {
@@ -850,11 +817,9 @@ ReplSet.prototype.connect = function(parent, options, callback) {
callback = null;
// Start up ha
if(replSetSelf.haEnabled && null == replSetSelf._replicasetTimeoutId) {
- // console.log("============================ START :: 4")
replSetSelf._replicasetTimeoutId = setTimeout(replSetSelf.replicasetCheckFunction, replSetSelf.replicasetStatusCheckInterval);
}
// Perform callback
- // console.log("--------------------------------------- 8")
internalCallback(null, parent);
}
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) {
@@ -865,7 +830,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Force close all server instances
replSetSelf.close();
// Perform callback
- // console.log("--------------------------------------- 9")
internalCallback(new Error("no secondary server found"), null);
} else if(typeof callback === 'function'){
replSetSelf._serverState = 'disconnected';
@@ -875,8 +839,7 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Force close all server instances
replSetSelf.close();
// Perform callback
- // console.log("--------------------------------------- 10")
- internalCallback(new Error("no primary server found"), null);
+ internalCallback(new Error("no primary server found"), null);
}
} else if((self._numberOfServersLeftToInitialize == 0) && replSetSelf._state.errorMessages.length > 0 && replSetSelf._serverState != 'disconnected') {
// Set done
@@ -887,7 +850,6 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Force close all server instances
replSetSelf.close();
// Callback to signal we are done
- // console.log("--------------------------------------- 11")
internalCallback(replSetSelf._state.errorMessages[0], null);
}
}
@@ -924,16 +886,7 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Connect to server
serverConnections[i].connect(parent, {returnIsMasterResults: true, eventReceiver:serverConnections[i]}, self.connectionHandler(serverConnections[i]));
}
-
- // // Check if we have an error in the inital set of servers and callback with error
- // if(replSetSelf._state.errorMessages.length > 0 && typeof callback === 'function') {
- // // ensure no callbacks get called twice
- // var internalCallback = callback;
- // callback = null;
- // // Perform callback
- // internalCallback(replSetSelf._state.errorMessages[0], null);
- // }
-
+
// The checking function
this.replicasetCheckFunction = function() {
try {
@@ -942,16 +895,9 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// If we have a connection and we have a db object
if(con != null && Array.isArray(self.dbInstances) && self.dbInstances.length > 0) {
var dbInstance = self.dbInstances[0];
- dbInstance.admin().command({replSetGetStatus:1}, {connection:con}, function(err, result) {
- // console.dir("-----------------------------")
- // console.dir(err)
- // console.dir(result)
-
+ dbInstance.admin().command({replSetGetStatus:1}, {connection:con}, function(err, result) {
// Paranoid android
if(null == err && null != result && null != result["documents"] && result["documents"].length > 0) {
- // console.log("---------------------------------------------------------------------");
- // console.dir(result["documents"][0]);
-
// For each member we need to check if we have a new connection that needs to be established
var members = result['documents'][0]['members'];
@@ -962,20 +908,11 @@ ReplSet.prototype.connect = function(parent, options, callback) {
for(var i = 0, jlen = members.length; i < jlen; i++) {
// Get a member
var member = members[i];
-
- console.log("================================================= HA::ADDRESSES")
- console.dir(Object.keys(self._state.addresses));
- // console.dir(member)
- // console.log("null != self._state :: " + (null != self._state))
- // console.log("0 != member['health'] :: " + (0 != member['health']))
- // console.log("null == self._state['addresses'][member['name']] :: " + (null == self._state['addresses'][member['name']]))
// If the node is healthy and it does not exist in the current replicaset, add it to the
// current setup
if(null != self._state && 0 != member['health'] && null == self._state['addresses'][member['name']]) {
// We need to add a server to the connection, this means going through the notions of establishing
// A completely new connection
- // console.log("=============================== new server detected")
- // console.dir(member)
// Found a new server
newServers = newServers + 1;
@@ -1008,16 +945,11 @@ ReplSet.prototype.connect = function(parent, options, callback) {
newServer.on("timeout", self.timeoutHandler);
newServer.on("error", self.errorHandler);
- // // Add server to list, ensuring we don't get a cascade of request to the same server
- // replSetSelf._state.addresses[candidateServerString] = newServer;
-
- // // Add a new server to the total number of servers that need to initialized before we are done
- // numberOfServersLeftToInitialize = numberOfServersLeftToInitialize + 1;
+ // Add a new server to the total number of servers that need to initialized before we are done
var newServerCallback = self.connectionHandler(newServer);
// Let's set up a new server instance
newServer.connect(self.db, {returnIsMasterResults: true, eventReceiver:newServer}, function(err, result) {
- // console.log("=========================== reconnected")
// Remove from number of newServers
newServers = newServers - 1;
// Call the setup
@@ -1026,13 +958,10 @@ ReplSet.prototype.connect = function(parent, options, callback) {
if(newServers <= 0) {
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
}
-
- // setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
});
}
}
- // console.log("================================== newServers :: " + newServers)
// If we have no new servers check status again
if(newServers == 0) {
setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
@@ -1042,9 +971,7 @@ ReplSet.prototype.connect = function(parent, options, callback) {
});
}
} catch(err) {
- // console.log("============================================== threw error")
- // console.dir(err)
- // setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
+ setTimeout(self.replicasetCheckFunction, self.replicasetStatusCheckInterval);
}
};
}
@@ -1101,9 +1028,6 @@ ReplSet.prototype.checkoutReader = function() {
} else {
connection = this.checkoutWriter();
}
- // console.log("============================== checkout reader = " + (connection.socketOptions.host + ":" + connection.socketOptions.port));
- // console.dir(Object.keys(this._state.addresses))
- // console.dir(Object.keys(this._state.secondaries))
// Return the connection
return connection;
Oops, something went wrong.

0 comments on commit 30ba06f

Please sign in to comment.