Skip to content

Commit

Permalink
Getting more of the connect tests for the replicaset to pass correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Oct 29, 2011
1 parent 09578c3 commit 908f774
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 270 deletions.
5 changes: 5 additions & 0 deletions HISTORY
@@ -1,3 +1,8 @@
* Added priority setting to replicaset manager
* Added correct handling of passive servers in replicaset
* Reworked socket code for simpler clearer handling
* Added master_not_needed option to replicaset server configuration when connecting to a read only set

0.9.6-22 2011-10-15
* Fixed bug in js bson parser that could cause wrong object size on serialization, Issue #370
* Fixed bug in findAndModify that did not throw error on replicaset timeout, Issue #373
Expand Down
18 changes: 16 additions & 2 deletions lib/mongodb/connection/connection_pool.js
Expand Up @@ -199,13 +199,27 @@ ConnectionPool.prototype.stop = function() {
// console.log("==================================== connection pool stop :: 1")
// Set not connected
this._poolState = 'not connected';

// Get all open connections
var keys = Object.keys(this.openConnections);
// Force close any sockets that are not already closed
// Force close all open sockets
for(var i = 0; i < keys.length; i++) {
this.openConnections[keys[i]].close();
}
// console.log("==================================== connection pool stop :: 2")

// Get all error connections
var keys = Object.keys(this.connectionsWithErrors);
// Force close all error sockets
for(var i = 0; i < keys.length; i++) {
this.connectionsWithErrors[keys[i]].close();
}

// Get all waiting to open connections
var keys = Object.keys(this.waitingToOpen);
// Force close all waiting sockets
for(var i = 0; i < keys.length; i++) {
this.waitingToOpen[keys[i]].close();
}

// Clear out all the connection variables
this.waitingToOpen = {};
Expand Down
174 changes: 92 additions & 82 deletions lib/mongodb/connections/repl_set_servers.js
Expand Up @@ -28,15 +28,13 @@ var ReplSetServers = exports.ReplSetServers = function(servers, options) {

// Are we allowing reads from secondaries ?
this.readSecondary = this.options["read_secondary"];
this.masterNotNeeded = this.options["master_not_needed"];
this.slaveOk = this.readSecondary;
this.closedConnectionCount = 0;

// Just keeps list of events we allow
this.eventHandlers = {error:[], parseError:[], poolReady:[], message:[], close:[]};

// State for the replicaset
// this._state = {master:null, secondaries:{}, arbiters:{}, errors:{}, addresses:{}}

if(!Array.isArray(servers) || servers.length == 0) {
throw Error("The parameter must be an array of servers and contain at least one server");
} else if(Array.isArray(servers) || servers.length > 0) {
Expand Down Expand Up @@ -108,7 +106,7 @@ ReplSetServers.prototype.setTarget = function(target) {
};

ReplSetServers.prototype.isConnected = function() {
return this.primary != null && this.primary.isConnected();
return this.primary != null && this._state.master.isConnected();
}

ReplSetServers.prototype.isPrimary = function(config) {
Expand All @@ -132,18 +130,23 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
var numberOfServersLeftToInitialize = serverConnections.length;

// If it's the first call let's reset our state
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[]};
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[]};

// console.log("###############################################################################################")
// console.log("###############################################################################################")
// console.log("###############################################################################################")

// Initialize server
var initServer = function(server) {

console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 2")
// console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 2")

// Handles the connections off the individual servers
var connectHandler = function(err, result) {
console.log("====================================================================== replicaset connect")
console.dir(err)
console.dir(result)
// console.log("====================================================================== replicaset connect")
// console.dir(err)
// console.dir(result)
// console.dir(server.connectionPool.getAllConnections())

// Remove a server from the list of intialized servers we need to perform
numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1;
Expand All @@ -156,22 +159,25 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
var setName = document.setName;
var isMaster = document.ismaster;
var secondary = document.secondary;
var passive = document.passive;
var arbiterOnly = document.arbiterOnly;
var hosts = document.hosts;
var arbiters = document.arbiters;
var hosts = Array.isArray(document.hosts) ? document.hosts : [];
var arbiters = Array.isArray(document.arbiters) ? document.arbiters : [];
var passives = Array.isArray(document.passives) ? document.passives : [];
var primary = document.primary;
var me = document.me;

// Print info
console.log("--------------------------------------------------------- replicaset server :: " + me)
console.log(" setName = " + setName)
console.log(" isMaster = " + isMaster)
console.log(" primary = " + primary)
console.log(" secondary = " + secondary)
console.log(" hosts------------------------")
console.dir(hosts)
console.log(" arbiters------------------------")
console.dir(arbiters)
// console.log("--------------------------------------------------------- replicaset server :: " + me)
// console.log(" setName = " + setName)
// console.log(" isMaster = " + isMaster)
// console.log(" primary = " + primary)
// console.log(" secondary = " + secondary)
// console.log(" passive = " + passive)
// console.log(" hosts------------------------")
// console.dir(hosts)
// console.log(" arbiters------------------------")
// console.dir(arbiters)

// Check if the server does not exist in our connected list
// Add server to list of connected servers
Expand All @@ -188,38 +194,42 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Assign the set name
if(replSetSelf.replicaSet == null) {
replSetSelf._state.setName = setName;
} else if(replSetSelf.replSetSelf != setName) {
} else if(replSetSelf.replicaSet != setName) {
replSetSelf._state.errorMessages.push(new Error("configured mongodb replicaset does not match provided replicaset [" + setName + "] != [" + replSetSelf.replicaSet + "]"))
}

// Let's add the server to our list of server types
if(secondary == true) {
if(secondary == true && (passive == false || passive == null)) {
replSetSelf._state.secondaries[server.host + ":" + server.port] = server;
} else if(arbiterOnly == true) {
replSetSelf._state.arbiters[server.host + ":" + server.port] = server;
} else if(secondary == true && passive == true) {
replSetSelf._state.passives[server.host + ":" + server.port] = server;
} else if(isMaster == true) {
replSetSelf._state.master = server;
} else if(isMaster == false && (server.host + ":" + server.port) === primary) {
replSetSelf._state.master = server;
}

// Let's go throught all the "possible" servers in the replicaset
var candidateServers = hosts.concat(arbiters);
console.log("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
console.dir(candidateServers)
var candidateServers = hosts.concat(arbiters).concat(passives);
// console.log("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
// console.dir(candidateServers)

// If we have new servers let's add them
for(var i = 0; i < candidateServers.length; i++) {
// Fetch the server string
var candidateServerString = candidateServers[i];

console.log("---------------------------- candidateServer :: " + candidateServerString)
console.dir(replSetSelf._state.addresses)
// console.log("---------------------------- candidateServer :: " + candidateServerString)
// console.dir(replSetSelf._state.addresses)

// Skip this server if it's alreay defined
if(replSetSelf._state.addresses[candidateServerString] == 1) continue;
// Add server to list, ensuring we don't get a cascade of request to the same server
replSetSelf._state.addresses[candidateServerString] = 1;

console.log("--------------------------------------- ++++++++++++++++++++ adding new server")
// console.log("--------------------------------------- ++++++++++++++++++++ adding new server")

// Split the server string
var parts = candidateServerString.split(/:/);
Expand All @@ -232,79 +242,51 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {

// Let's set up a new server instance
process.nextTick(function() {
console.log("----------------------------------------------------- adding new server")
// console.log("----------------------------------------------------- adding new server")
var newServer = new Server(parts[0], parseInt(parts[1]), {auto_reconnect:true});
newServer.connect(parent, {firstCall:true, returnIsMasterResults: true}, connectHandler);
});
}
}
} else {
// Force a close, make sure we don't leave the connection hanging on a dead socket
server.close();
}

console.log("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ numberOfServersLeftToInitialize :: " + numberOfServersLeftToInitialize)
// console.log("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ numberOfServersLeftToInitialize :: " + numberOfServersLeftToInitialize)

if(numberOfServersLeftToInitialize == 0) {
console.log("---------------------------------------------------------- callback")
// console.dir(replSetSelf._state)
// console.dir(Object.keys(replSetSelf._state.secondaries).length)
// console.dir(Object.keys(replSetSelf._state.arbiters).length)
// console.dir(Object.keys(replSetSelf._state.errors).length)
// console.dir(Object.keys(replSetSelf._state.errorMessages).length)
callback(null, parent);
// console.log("---------------------------------------------------------- callback")
// console.dir(replSetSelf._state.errors)
// console.dir(replSetSelf._state.errorMessages)

// Check if we have errors
if(replSetSelf._state.errorMessages.length > 0) {
callback(replSetSelf._state.errorMessages[0], parent);
} else {
// If we don't expect a master let's call back, otherwise we need a master before
// the connection is successful
if(replSetSelf.masterNotNeeded || replSetSelf._state.master != null) {
callback(null, parent)
} else {
callback(new Error("no primary server found"), null);
}
}
}


//
// // else {
// // // Add a new server to the total number of servers that need to initialized before we are done
// // numberOfServersLeftToInitialize = numberOfServersLeftToInitialize + 1;
// // // Schedule the new server to be saved to our set of replicasets, use next tick to flatten
// // // out call stack
// // process.nextTick(function() {
// // var newServer = new Server()
// // });
// // }
// //
// // } else {
// //
// // }
// //
// //
// //
// // This references the calling object
// // var serverInstance = this;
//
// // Let's create an isMaster command so we can get the data back
// // var db_command = DbCommand.createIsMasterCommand(parent);
// // parent.executeDbCommand(db_command, {}, function(err, result) {
// // console.log("------------------------------------------------ executeDbCommand")
// // console.dir(err)
// // console.dir(result)
// //
// // });
// //
// //
// //
// //
// //
// // console.log("------------------------------------------------ connect message")
// // console.dir(err)
// // console.dir(serverInstance)
// // console.dir(this)
// // console.dir(server)
};

// Start the server connection, having the callback return the ismaster results for
// each server we are connecting to
server.connect(parent, {'firstCall':true, returnIsMasterResults: true}, connectHandler);
};

console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 0")
// console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 0")

// Ensure we have all registered servers in our set
for(var i = 0; i < serverConnections.length; i++) {
replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port] = 1;
}

console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 1")
// console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 1")

// Initialize all the connections
for(var i = 0; i < serverConnections.length; i++) {
Expand Down Expand Up @@ -344,9 +326,20 @@ ReplSetServers.prototype.disconnect = function(callback) {
}

ReplSetServers.prototype.close = function(callback) {
// console.log("-------------------------------------------------------------- ReplSetServers :: close")

var self = this;

// console.log("---------------------------------------------------------------- arbiters")
// console.dir(self._state.arbiters)
// console.log("---------------------------------------------------------------- secondaries")
// console.dir(self._state.secondaries)
// console.log("---------------------------------------------------------------- passives")
// console.dir(self._state.passives)

// Close all the servers (concatenate entire list of servers first for ease)
var allServers = [self._state.master];
var allServers = self._state.master != null ? [self._state.master] : [];
// console.log("-------------------------------------------------------------- ReplSetServers :: close :: 0")

// Secondary keys
var keys = Object.keys(self._state.secondaries);
Expand All @@ -355,31 +348,48 @@ ReplSetServers.prototype.close = function(callback) {
allServers.push(self._state.secondaries[keys[i]]);
}

// console.log("-------------------------------------------------------------- ReplSetServers :: close :: 1")

// Arbiter keys
var keys = Object.keys(self._state.arbiters);
// Add all arbiters
for(var i = 0; i < keys.length; i++) {
allServers.push(self._state.arbiters[keys[i]]);
}

// Passive keys
var keys = Object.keys(self._state.passives);
// Add all arbiters
for(var i = 0; i < keys.length; i++) {
allServers.push(self._state.passives[keys[i]]);
}

// console.log("-------------------------------------------------------------- ReplSetServers :: close :: 2")
// console.dir(allServers)

// Let's process all the closing
var numberOfServersToClose = allServers.length;

// Close the servers
for(var i = 0; i < allServers.length; i++) {
// console.log("-------------------------------------------------------------- ReplSetServers :: close :: 3")
var server = allServers[i];
// console.dir(server)
// Close each server
server.close(function() {
// console.log("-------------------------------------------------------------- ReplSetServers :: close :: 4")
numberOfServersToClose = numberOfServersToClose - 1;
// Clear out state if we are done
if(numberOfServersToClose == 0) {
self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[]};
self._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[]};
}

// If we are finished perform the call back
if(numberOfServersToClose == 0 && typeof callback === 'function') {
// console.log("-------------------------------------------------------------- ReplSetServers :: close :: 5")
callback(null);
} else if(numberOfServersToClose == 0) {
// console.log("-------------------------------------------------------------- ReplSetServers :: close :: 6")
self.emit("close");
}
})
Expand Down
4 changes: 2 additions & 2 deletions lib/mongodb/connections/server.js
Expand Up @@ -113,7 +113,7 @@ Server.prototype.connect = function(parent, options, callback) {

// Set up item connection
connectionPool.on("message", function(message) {
console.log("=========================================== message")
// console.log("=========================================== message")
// console.dir(message)

// Locate the callback, do the cleanup and move on
Expand All @@ -130,7 +130,7 @@ Server.prototype.connect = function(parent, options, callback) {

// Handle errors
connectionPool.on("error", function(message) {
console.log("=========================================== message")
// console.log("=========================================== message")
// console.log(server._serverState)
// console.dir(message)
// console.log(message != null ? message.stack : '')
Expand Down
6 changes: 6 additions & 0 deletions lib/mongodb/db.js
Expand Up @@ -117,13 +117,19 @@ Db.prototype.open = function(callback) {
};

Db.prototype.close = function(callback) {
// console.log("------------------------------------------------------------ DB.close")

// Remove all listeners and close the connection
this.serverConfig.close(callback);
// console.log("------------------------------------------------------------ DB.close :: 0")
// Clear out state of the connection
this.state = "notConnected";
// console.log("------------------------------------------------------------ DB.close :: 2")
// Emit to all the close listeners attached to the instance
this.emit("close")
// console.log("------------------------------------------------------------ DB.close :: 3")
// Remove all listeners
// console.log("------------------------------------------------------------ DB.close :: 4")
this.removeAllListeners();
};

Expand Down

0 comments on commit 908f774

Please sign in to comment.