Skip to content

Commit

Permalink
All replicaset tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Nov 5, 2011
1 parent a28132b commit 6eaf7b2
Show file tree
Hide file tree
Showing 13 changed files with 887 additions and 765 deletions.
23 changes: 17 additions & 6 deletions lib/mongodb/collection.js
Expand Up @@ -572,19 +572,30 @@ Collection.prototype.findAndModify = function findAndModify (query, sort, doc, o

// Checkout a writer and fetch the raw connection
var connection = this.db.serverConfig.checkoutWriter();

// Ensure we execute against the same raw connection so we can get the correct error
// result after the execution of the findAndModify finishes
this.db.executeDbCommand(queryObject, {writer:connection, allReturn:true, safe:errorOptions}, function (err, result) {
result = result && result.documents;
this.db.executeDbCommand(queryObject, {writer:connection}, function (err, firstResult) {
firstResult = firstResult && firstResult.documents;
if(!callback) return;

if(err) {
callback(err);
} else if(!result[0].ok) {
callback(self.db.wrap(result[0]));
} else if(!firstResult[0].ok) {
callback(self.db.wrap(firstResult[0]));
} else {
return callback(null, result[0].value);
// If we have a request for a last error command
if(errorOptions != null && errorOptions != false) {
self.db.lastError(errorOptions, {writer:connection}, function(err, secondResult) {
if(secondResult[0].err != null) {
callback(self.db.wrap(secondResult[0]));
} else {
return callback(null, firstResult[0].value);
}
})
} else {
return callback(null, firstResult[0].value);
}
}
});
}
Expand Down
16 changes: 13 additions & 3 deletions lib/mongodb/connection/connection.js
Expand Up @@ -59,6 +59,11 @@ Connection.prototype.start = function() {
this.connection.on("close", closeHandler(this));
}

// Check if the sockets are live
Connection.prototype.isConnected = function() {
return this.connected;
}

// Write the data out to the socket
Connection.prototype.write = function(command, callback) {
// console.log(" +++++++++++ Connection.prototype.write :: 0 :: " + this.connection.readable + " :: " + this.connection.writable + " :: " + this.connection.destroyed);
Expand Down Expand Up @@ -334,6 +339,9 @@ var createDataHandler = exports.Connection.createDataHandler = function(self) {

var endHandler = function(self) {
return function() {
// Set connected to false
self.connected = false;
// Emit end event
self.emit("end", {err: 'connection received Fin packet from [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
}
}
Expand All @@ -352,15 +360,17 @@ var drainHandler = function(self) {

var errorHandler = function(self) {
return function(err) {
// console.log("------------------------------------------------ error thrown")
// console.log(err.stack)
//
// Set connected to false
self.connected = false;
// Emit error
self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
}
}

var closeHandler = function(self) {
return function(hadError) {
// Set connected to false
self.connected = false;
// If we have an error during the connection phase
if(hadError && !self.connected) {
self.emit("error", {err: 'failed to connect to [' + self.socketOptions.host + ':' + self.socketOptions.port + ']'}, self);
Expand Down
14 changes: 7 additions & 7 deletions lib/mongodb/connection/connection_pool.js
Expand Up @@ -7,7 +7,7 @@ var utils = require('./connection_utils'),
Connection = require("./connection").Connection;

var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bson, socketOptions) {
if(typeof host !== 'string' || typeof port !== 'number') throw "host and port must be specified";
if(typeof host !== 'string' || typeof port !== 'number') throw "host and port must be specified [" + host + ":" + port + "]";
// Set up event emitter
EventEmitter.call(this);
// Keep all options for the socket in a specific collection allowing the user to specify the
Expand Down Expand Up @@ -65,6 +65,9 @@ ConnectionPool.prototype.setMaxBsonSize = function(maxBsonSize) {
// Creates handlers
var connectHandler = function(self) {
return function(err, connection) {
// console.log("((((((((((((((((((((((((((((((((((((((((((((((((())))))))))))))))))))))))))))))))))))))))))))))))) : 0")
// console.dir(err)

// Ensure we don't fire same error message multiple times
var fireError = true;
var performedOperation = false;
Expand Down Expand Up @@ -123,16 +126,12 @@ ConnectionPool.prototype.start = function(listener) {
if(this.listeners("poolReady").length == 0) {
throw "pool must have at least one listener ready that responds to the [poolReady] event";
}

// if(this.eventHandlers["poolReady"].length == 0) {
// throw "pool must have at least one listener ready that responds to the [poolReady] event";
// }

// Set pool state to connecting
this._poolState = 'connecting';

// Let's boot up all the instances
for(var i = 0; i < this.socketOptions.poolSize; i++) {
for(var i = 0; i < this.socketOptions.poolSize; i++) {
// Create a new connection instance
var connection = new Connection(this.connectionId++, this.socketOptions);
// Add connection to list of waiting connections
Expand Down Expand Up @@ -253,7 +252,8 @@ ConnectionPool.prototype.isConnected = function() {

return Object.keys(this.waitingToOpen).length == 0
&& Object.keys(this.connectionsWithErrors).length == 0
&& Object.keys(this.openConnections).length > 0 && this._poolState === 'connected';
&& Object.keys(this.openConnections).length > 0 && this._poolState === 'connected'
&& this.openConnections[Object.keys(this.openConnections)[0]].isConnected();
}

// Checkout a connection from the pool for usage, or grab a specific pool instance
Expand Down
86 changes: 79 additions & 7 deletions lib/mongodb/connections/repl_set_servers.js
Expand Up @@ -129,7 +129,7 @@ var ReplSetServers = exports.ReplSetServers = function(servers, options) {
// Master connection property
Object.defineProperty(this, "primary", { enumerable: true
, get: function () {
return this._state.master;
return this._state != null ? this._state.master : null;
}
});
};
Expand All @@ -139,7 +139,7 @@ ReplSetServers.prototype.setTarget = function(target) {
};

ReplSetServers.prototype.isConnected = function() {
return this.primary != null && this._state.master.isConnected();
return this.primary != null && this._state.master != null && this._state.master.isConnected();
}

ReplSetServers.prototype.isPrimary = function(config) {
Expand All @@ -148,6 +148,20 @@ ReplSetServers.prototype.isPrimary = function(config) {

ReplSetServers.prototype.isReadPrimary = ReplSetServers.prototype.isPrimary;

// Clean up dead connections
var cleanupConnections = function(connections, addresses) {
// Ensure we don't have entries in our set with dead connections
var keys = Object.keys(connections);
for(var i = 0; i < keys.length; i++) {
var server = connections[keys[i]];
// If it's not connected remove it from the list
if(!server.isConnected()) {
delete connections[keys[i]];
delete addresses[keys[i]];
}
}
}

ReplSetServers.prototype.connect = function(parent, options, callback) {
// console.log("#########################################################################################################")
// console.log(callback.toString())
Expand All @@ -169,7 +183,23 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
var done = false;

// If it's the first call let's reset our state
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[]};
if(firstCall || replSetSelf._state == null) {
replSetSelf._state = {'master':null, 'secondaries':{}, 'arbiters':{}, 'passives':{}, 'errors':{}, 'addresses':{}, 'setName':null, 'errorMessages':[]};
} else {
// console.log("----------------------------------------------------------------------------------- 0")
// Clean out dead connections
cleanupConnections(replSetSelf._state.arbiters, replSetSelf._state.addresses);
cleanupConnections(replSetSelf._state.passives, replSetSelf._state.addresses);
cleanupConnections(replSetSelf._state.secondaries, replSetSelf._state.addresses);
// Get master
var master = replSetSelf._state.master;
if(master != null) {
// Remove master from list
replSetSelf._state.addresses[master.host + ":" + master.port];
// Clean up master
replSetSelf._state.master = null;
}
}

// Add a close event handler to ourselves to notify the parent
this.on("close", function() {
Expand Down Expand Up @@ -200,6 +230,9 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Error handler for the servers, this handles unexpected errors coming from
// a wrong callback or something else
var errorHandler = function(err) {
// console.log("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ received error")
// console.dir(err)

if(err.stack != null) console.log(err.stack)
// Shut down the server and emit the error to the dbInstance
replSetSelf.close();
Expand Down Expand Up @@ -269,27 +302,51 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
newServer.connect(parent, {firstCall:true, returnIsMasterResults: true, eventReceiver:newServer}, connectionHandler(newServer));
}
}
// } else {
// console.log("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ GOT ERROR")
// console.dir(err)
}

// Set done, only done once
if(numberOfServersLeftToInitialize == 0) {
done = true;
}

// console.log("----------------------------------------------------------------------- CONNECTING :: " + numberOfServersLeftToInitialize)
// console.dir(replSetSelf._state)
// console.log("----------------------------------------------------------------------- CONNECTING 1")
// console.log("done = " + done)
// console.log("done = " + replSetSelf._serverState)


// If done finish up
if(done && replSetSelf._serverState === 'connecting' && replSetSelf._state.errorMessages.length == 0) {
// Set db as connected
replSetSelf._serverState = 'connected';

// console.log("----------------------------------------------------------------------- CONNECTING")
// console.dir(replSetSelf.servers)
// console.dir(replSetSelf.readSecondary)
// console.dir(replSetSelf._state)

// If we don't expect a master let's call back, otherwise we need a master before
// the connection is successful
if(replSetSelf.masterNotNeeded || replSetSelf._state.master != null) {
// console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 0 :: " + firstCall + " " + done)
// console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 0 :: ")
callback(null, parent);
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length > 0) {
// console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1 :: ")
callback(null, parent);
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) {
// console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 2 :: ")
callback(new Error("no secondary server found"), null);
} else {
// console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 1 :: " + firstCall + " " + done)
// console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 3 :: ")
callback(new Error("no primary server found"), null);
}
}
} else if(done && replSetSelf._state.errorMessages.length > 0) {
callback(replSetSelf._state.errorMessages[0], null);
}
}
}

Expand All @@ -300,7 +357,22 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {

// Initialize all the connections
for(var i = 0; i < serverConnections.length; i++) {
serverConnections[i].connect(parent, {'firstCall':true, returnIsMasterResults: true, eventReceiver:serverConnections[i]}, connectionHandler(serverConnections[i]));
try {
serverConnections[i].connect(parent, {'firstCall':true, returnIsMasterResults: true, eventReceiver:serverConnections[i]}, connectionHandler(serverConnections[i]));
} catch (err) {
// console.log("}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}")
// console.dir(err)
numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1;
// Remove from list off addresses, close down and fire error
replSetSelf._state.addresses[serverConnections[i].host + ':' + serverConnections[i].port]
// Close connections
replSetSelf.close();
// Add error message
replSetSelf._state.errorMessages.push(err);

// Emit error
// replSetSelf.emit("error", err);
}
}
}

Expand Down
20 changes: 16 additions & 4 deletions lib/mongodb/connections/server.js
Expand Up @@ -62,14 +62,28 @@ Server.prototype.connect = function(dbInstance, options, callback) {
if('function' === typeof options) callback = options, options = {};
if(options == null) options = {};
if(!('function' === typeof callback)) callback = null;

// Let's connect
var server = this;
// Let's us override the main receiver of events
var eventReceiver = options.eventReceiver != null ? options.eventReceiver : dbInstance;

// // If we don't have a valid host and port
// if(this.host == null || isNaN(parseInt(this.port))) {
// console.log("*********************************************************************************************")
// console.log("host = " + this.host)
// console.log("port = " + this.port)
//
// return server.emit("error", new Error("Illegal server address [" + this.host + ":" + this.port + "]"));
// }

// Set server state to connecting
this._serverState = 'connecting';
// Ensure dbInstance can do a slave query if it's set
dbInstance.slaveOk = this.slaveOk ? this.slaveOk : dbInstance.slaveOk;
// Let's connect
var server = this;
// Create connection Pool instance with the current BSON serializer
var connectionPool = new ConnectionPool(this.host, this.port, this.poolSize, dbInstance.bson_deserializer);

// Set up a new pool using default settings
server.connectionPool = connectionPool;

Expand Down Expand Up @@ -97,8 +111,6 @@ Server.prototype.connect = function(dbInstance, options, callback) {

// Let's us override the main connect callback
var connectHandler = options.connectHandler == null ? connectCallback : options.connectHandler;
// Let's us override the main receiver of events
var eventReceiver = options.eventReceiver != null ? options.eventReceiver : dbInstance;

// Set up on connect method
connectionPool.on("poolReady", function() {
Expand Down

0 comments on commit 6eaf7b2

Please sign in to comment.