Skip to content

Commit

Permalink
Merge branch 'master' into ha
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Mar 29, 2012
2 parents 2c4e411 + fc6eeba commit 7145c47
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 39 deletions.
2 changes: 2 additions & 0 deletions HISTORY
Expand Up @@ -3,6 +3,8 @@
- Fix for reading a GridStore from arbitrary, non-chunk aligned offsets, added test (Issue #563, https://github.com/subroutine)
- Modified limitRequest to allow negative limits to pass through to Mongo, added test (Issue #561)
- Corrupt GridFS files when chunkSize < fileSize, fixed concurrency issue (Issue #555)
- Handle dead tailable cursors (Issue #568, https://github.com/aheckmann)
- Connection pools handles closing themselves down and clearing the state

0.9.9.7 2012-03-16
------------------
Expand Down
4 changes: 2 additions & 2 deletions dev/benchmark/hammer.js
Expand Up @@ -12,7 +12,7 @@ var BSON = require('../../lib/mongodb').BSONPure.BSON,
ObjectID = require('../../lib/mongodb').BSONPure.ObjectID;

// Open the db connection
new Db('hammer_db', new Server("127.0.0.1", 27018, {auto_reconnect: true, poolSize: 20}), {native_parser: false}).open(function(err, db) {
new Db('hammer_db', new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize: 20}), {native_parser: false}).open(function(err, db) {
db.dropCollection('hammer_collection', function(err, result) {
db.admin().authenticate('admin', 'admin', function(err, result) {
var i = 0;
Expand Down Expand Up @@ -69,7 +69,7 @@ new Db('hammer_db', new Server("127.0.0.1", 27018, {auto_reconnect: true, poolSi
})
})
}
}, 0);
}, 1000);
})
});
});
Expand Down
112 changes: 112 additions & 0 deletions dev/tools/connection_proxy_emulator.js
@@ -0,0 +1,112 @@
/**
* Parameters for the proxy
**/
var inputHost = 'localhost';
var inputPort = 27017;
var outputHost = 'localhost';
var outputPort = 27018;
var webServerPort = 8080;

/**
* Proxy handling
**/
var net = require('net'),
http = require('http'),
format = require('util').format;
var connectionNumber = 0,
incomingConnections = {},
outgoingConnections = {};

// Server instance
var server = net.createServer(function(connection) {
console.log("=============================================== server connected");
// console.dir(connection)
// Set the id
connection.id = connectionNumber++;
// Outgoing connection
var outgoingConnection = net.createConnection(outputPort, outputHost);
outgoingConnection.id = connection.id;
// Create a connection
outgoingConnections[connection.id] = outgoingConnection;
incomingConnections[connection.id] = connection;
// Listen to incoming data
connection.on("data", function(data) {
outgoingConnections[this.id].write(data);
});

connection.on("close", function() {
console.log("===================================== closing incoming connection :: " + this.id)
if(outgoingConnections[this.id]) outgoingConnections[this.id].destroy();
delete outgoingConnections[this.id];
})

outgoingConnections[connection.id].on("data", function(data) {
incomingConnections[this.id].write(data);
});

outgoingConnections[connection.id].on("close", function(data) {
console.log("===================================== closing outgoing connection :: " + this.id)
if(incomingConnections[this.id]) incomingConnections[this.id].destroy();
delete incomingConnections[this.id];
});
});

// Boot up server letting you control the connection
var webserver = http.createServer(function(request, response) {
console.log("----------------------------------------------------------- 8080")
// console.dir(request.url.)
if(request.url == '/sockets') {
renderSocketList(incomingConnections, response);
} else if(request.url.indexOf('/sockets/close') != -1) {
// Get the id and kill it
var id = request.url.split("/").pop();
id = id != null ? parseInt(id) : null;
if(id != null && incomingConnections[id] != null) {
}
// Render the socket list
renderSocketList(incomingConnections, response);
} else if(request.url.indexOf('/rest/kill_random_socket')) {
// Grab all the connection ids
var keys = Object.keys(incomingConnections);
// Grab a random one in the space
var id = keys[Math.floor(Math.random(keys.length))];
// Terminate the connection

} else {
// Write 401 error out
response.writeHead(401, { 'Content-Type': 'text/plain'});
response.write("No such page found");
response.end();
}
});
// Listen
webserver.listen(webServerPort);

var renderSocketList = function(_incomingConnections, _response) {
// Write out the list of available sockets we can kill if we wish
_response.writeHead(200, { 'Content-Type': 'text/html'});
// Map the array
var socketids = Object.keys(_incomingConnections).map(function(item) {
return format("<li>Socket %s <a href='/sockets/close/%s'>[Close]</a></li>", item, item);
});
// Write out the data
_response.write(format("<head></head><body><ul>%s</ul></body>", socketids.join("")))
_response.end();
}

var terminateConnection = function(id) {
// Get the connections
var incomingConnection = incomingConnections[id];
var outgoingConnection = outgoingConnections[id];
// Remove from the list
delete incomingConnections[id];
delete outgoingConnections[id];
// Kill them
incomingConnection.destroy();
outgoingConnection.destroy();
}

// Listen to port
server.listen(inputPort, inputHost, function() {
console.log("server bound")
});
6 changes: 5 additions & 1 deletion lib/mongodb/connection/connection.js
Expand Up @@ -37,7 +37,7 @@ var Connection = exports.Connection = function(id, socketOptions) {
}

// Set max bson size
Connection.DEFAULT_MAX_BSON_SIZE = 4 * 1024 * 1024 * 4 * 3;
Connection.DEFAULT_MAX_BSON_SIZE = 16777216;

// Inherit event emitter so we can emit stuff wohoo
inherits(Connection, EventEmitter);
Expand Down Expand Up @@ -122,6 +122,9 @@ Connection.prototype.isConnected = function() {

// Write the data out to the socket
Connection.prototype.write = function(command, callback) {
// console.log("=====================================================")
// console.log("this.maxBsonSize = " + this.maxBsonSize)

try {
// If we have a list off commands to be executed on the same socket
if(Array.isArray(command)) {
Expand Down Expand Up @@ -388,6 +391,7 @@ var errorHandler = function(self) {

var closeHandler = function(self) {
return function(hadError) {
// console.log("++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
// If we have an error during the connection phase
if(hadError && !self.connected) {
// Set disconnected
Expand Down
55 changes: 37 additions & 18 deletions lib/mongodb/connection/connection_pool.js
Expand Up @@ -18,6 +18,7 @@ var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bso
this.bson = bson;
// PoolSize is always + 1 for special reserved "measurment" socket (like ping, stats etc)
this.poolSize = poolSize;
this.minPoolSize = Math.floor(this.poolSize / 2) + 1;

// Set default settings for the socket options
utils.setIntegerParameter(this.socketOptions, 'timeout', 0);
Expand All @@ -32,7 +33,7 @@ var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bso

// Internal structures
this.openConnections = [];
this.connections = [];
// this.connections = [];

// Assign connection id's
this.connectionId = 0;
Expand Down Expand Up @@ -69,15 +70,13 @@ var _connect = function(_self) {
connection.on("connect", function(err, connection) {
// Add connection to list of open connections
_self.openConnections.push(connection);
_self.connections.push(connection)

// If the number of open connections is equal to the poolSize signal ready pool
if(_self.connections.length === _self.poolSize && _self._poolState !== 'disconnected') {
if(_self.openConnections.length === _self.poolSize && _self._poolState !== 'disconnected') {
// Set connected
_self._poolState = 'connected';
// Emit pool ready
_self.emit("poolReady");
} else if(_self.connections.length < _self.poolSize) {
} else if(_self.openConnections.length < _self.poolSize) {
// We need to open another connection, make sure it's in the next
// tick so we don't get a cascade of errors
process.nextTick(function() {
Expand All @@ -90,6 +89,9 @@ var _connect = function(_self) {

// Error handler
connection.on("error", function(err, connection) {
// console.log("-------------------------------------------------------- error :: " + _self.openConnections.indexOf(this))
// console.dir(this)

numberOfErrors++;
// If we are already disconnected ignore the event
if(connectionStatus != 'disconnected' && _self.listeners("error").length > 0) {
Expand All @@ -100,13 +102,21 @@ var _connect = function(_self) {
connectionStatus = 'disconnected';
// Set disconnected
_self._poolState = 'disconnected';
// Clean up
_self.openConnections = [];
_self.connections = [];
// Stop
_self.stop();

// // Clean up
// _self.openConnections = [];
// _self.connections = [];
});

// Close handler
connection.on("close", function() {
// console.log("-------------------------------------------------------- close")
// console.dir(this)
// Only close when we have no more connections
// if(_self.minPoolSize)

// If we are already disconnected ignore the event
if(connectionStatus !== 'disconnected' && _self.listeners("close").length > 0) {
_self.emit("close");
Expand All @@ -116,13 +126,17 @@ var _connect = function(_self) {
connectionStatus = 'disconnected';
// Set disconnected
_self._poolState = 'disconnected';
// Clean up
_self.openConnections = [];
_self.connections = [];
// Stop
_self.stop();
// // Clean up
// _self.openConnections = [];
// _self.connections = [];
});

// Timeout handler
connection.on("timeout", function(err, connection) {
// console.log("-------------------------------------------------------- timeout")
// console.dir(this)
// If we are already disconnected ignore the event
if(connectionStatus !== 'disconnected' && _self.listeners("timeout").length > 0) {
_self.emit("timeout", err);
Expand All @@ -132,13 +146,17 @@ var _connect = function(_self) {
connectionStatus = 'disconnected';
// Set disconnected
_self._poolState = 'disconnected';
// Clean up
_self.openConnections = [];
_self.connections = [];
// Stop
_self.stop();
// // Clean up
// _self.openConnections = [];
// _self.connections = [];
});

// Parse error, needs a complete shutdown of the pool
connection.on("parseError", function() {
// console.log("-------------------------------------------------------- parseError")
// console.dir(this)
// If we are already disconnected ignore the event
if(connectionStatus !== 'disconnected' && _self.listeners("parseError").length > 0) {
// if(connectionStatus == 'connected') {
Expand Down Expand Up @@ -198,14 +216,14 @@ ConnectionPool.prototype.stop = function(removeListeners) {
}

// Close all connections
for(var i = 0; i < this.connections.length; i++) {
this.connections[i].close();
for(var i = 0; i < this.openConnections.length; i++) {
this.openConnections[i].close();
}

// Clean up
// this.connectionsWithErrors = [];
this.openConnections = [];
this.connections = [];
// this.connections = [];
}

// Check the status of the connection
Expand All @@ -221,7 +239,8 @@ ConnectionPool.prototype.checkoutConnection = function(id) {
}

ConnectionPool.prototype.getAllConnections = function() {
return this.connections;
// return this.connections;
return this.openConnections;
}

// Remove all non-needed event listeners
Expand Down
18 changes: 9 additions & 9 deletions lib/mongodb/connection/server.js
Expand Up @@ -148,7 +148,7 @@ Server.prototype.close = function(callback) {
// Remove all the listeners on the pool so it does not fire messages all over the place
this.connectionPool.removeAllEventListeners();
// Close the connection if it's open
this.connectionPool.stop();
this.connectionPool.stop(true);
}

// Set server status as disconnected
Expand Down Expand Up @@ -367,8 +367,8 @@ Server.prototype.connect = function(dbInstance, options, callback) {
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
server._serverState = 'disconnected';
// Close the pool
connectionPool.stop();
// // Close the pool
// connectionPool.stop();
// If we have a callback return the error
if(typeof callback === 'function') {
// ensure no callbacks get called twice
Expand Down Expand Up @@ -397,8 +397,8 @@ Server.prototype.connect = function(dbInstance, options, callback) {
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
server._serverState = 'disconnected';
// Close the pool
connectionPool.stop();
// // Close the pool
// connectionPool.stop();
// If we have a callback return the error
if(typeof callback === 'function') {// && !server.isSetMember()) {
// ensure no callbacks get called twice
Expand Down Expand Up @@ -427,8 +427,8 @@ Server.prototype.connect = function(dbInstance, options, callback) {
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
server._serverState = 'disconnected';
// Close the pool
connectionPool.stop(true);
// // Close the pool
// connectionPool.stop(true);
// If we have a callback return the error
if(typeof callback == 'function') {
// ensure no callbacks get called twice
Expand Down Expand Up @@ -458,8 +458,8 @@ Server.prototype.connect = function(dbInstance, options, callback) {
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
server._serverState = 'disconnected';
// Close the pool
connectionPool.stop();
// // Close the pool
// connectionPool.stop();
// If we have a callback return the error
if(typeof callback === 'function') {
// ensure no callbacks get called twice
Expand Down

0 comments on commit 7145c47

Please sign in to comment.