Permalink
Browse files

* introduce ConnectionError for distinguishing pooling errors that sh…

…ould get reported back to the user and errors that the driver should try to address.

* Connection uses a timeout to handle instances where a query never finishes (this has been observed with some transport errors).
* ConnectionInPool extends Connection and keeps track of connection state and health.
* PooledConnection2 is API compatible with PooledConnection (will replace it in next commit).
  • Loading branch information...
gdusbabek committed Jan 16, 2012
1 parent 77ea752 commit fa928c4ff9a5bd25715c1f6ca64ca7ffc95fd4d6
Showing with 252 additions and 5 deletions.
  1. +232 −1 lib/driver.js
  2. +20 −4 test/test_driver.js
@@ -22,6 +22,7 @@ var logCql = require('logmagic').local('node-cassandra-client.driver.cql');
var logTiming = require('logmagic').local('node-cassandra-client.driver.timing');
var sys = require('sys');
var util = require('util');
var constants = require('constants');
var Buffer = require('buffer').Buffer;
var EventEmitter = require('events').EventEmitter;
@@ -370,6 +371,7 @@ PooledConnection.prototype.shutdown = function(callback) {
});
};
/**
* @param options: valid parts are:
* user, pass, host, port, keyspace, use_bigints, timeout, log_time
@@ -378,6 +380,7 @@ var Connection = module.exports.Connection = function(options) {
options = options || {};
log.info('connecting ' + options.host + ':' + options.port);
this.validators = {};
// todo: build con during connect().
this.con = thrift.createConnection(options.host, options.port);
this.client = null;
this.connectionInfo = options;
@@ -547,7 +550,19 @@ Connection.prototype.execute = function(query, args, callback) {
start = new Date().getTime();
logCql.trace('CQL QUERY', {'query': query, 'parameterized_query': cqlString, 'args': args});
this.client.execute_cql_query(cql, ttypes.Compression.NONE, function(err, res) {
// if a connection dies at the right place, execute_cql_query never returns. make sure the callback gets called.
var timeoutId = setTimeout(function() {
callback(new Error('Connection timed out'));
timeoutId = null;
}, this.timeout); // todo: should we disambiguate connection timeout vs query timeout?
self.client.execute_cql_query(cql, ttypes.Compression.NONE, function(err, res) {
if (!timeoutId) {
log.warn('query returned after timeout: ' + cql);
return;
} else {
clearTimeout(timeoutId);
}
end = new Date().getTime();
diff = (end - start);
if (self.connectionInfo.log_time) {
@@ -579,8 +594,224 @@ Connection.prototype.execute = function(query, args, callback) {
callback(null, res.num);
} else if (res.type === ttypes.CqlResultType.VOID) {
callback(null);
} else {
callback(new Error('Execution unexpectedly got here. Result type is ' + res.type));
}
}
});
}
};
/**
* pooled connection behave a bit different but offer the same service interface as regular connections.
* This constructor behaves differently from the normal Connection since Connection() does some socket work.
* that work is delayed to connect() here.
*/
var ConnectionInPool = module.exports.ConnectionInPool = function(options) {
options.staleThreshold = options.staleThreshold || 10000;
// cache options so that thrift setup can happen later.
this._options = options;
this.taken = false; // true when being used in a query.
this.connected = false; // true when connected.
this.unhealthyAt = 0; // timestamp this connection went bad.
}
util.inherits(ConnectionInPool, Connection);
/**
* connects to the remote endpoint.
* @param callback
*/
ConnectionInPool.prototype.connect = function(callback) {
var self = this;
Connection.call(this, this._options);
Connection.prototype.connect.call(this, function(err) {
self.connected = !err;
self.unhealthyAt = err ? new Date().getTime() : 0;
callback(err);
});
};
ConnectionInPool.prototype.isHealthy = function() {
return this.unhealthyAt === 0;
}
/**
* a 'stale unhealthy' node is a node that has been bad for some period of time. After that
* period, it is safe to retry the connection.
*/
ConnectionInPool.prototype.isStaleUnhealthy = function() {
return !this.isHealthy() && new Date().getTime() - this.unhealthyAt > this._options.staleThreshold;
}
// this will replace pooled connection and generic-collections at some point.
var PooledConnection2 = module.exports.PooledConnection2 = function(config) {
var self = this;
config = config || {};
this.connections = [];
this.current_node = 0;
this.use_bigints = config.use_bigints ? true : false;
this.timeout = config.timeout || DEFAULT_CONNECTION_TIMEOUT;
this.log_time = config.log_time || false;
// Construct a list of nodes from hosts in <host>:<port> form
for (var i = 0; i < config.hosts.length; i++) {
var hostSpec = config.hosts[i];
if (!hostSpec) { continue; }
var host = hostSpec.split(':');
if (host.length > 2) {
log.warn('malformed host entry "' + hostSpec + '" (skipping)');
continue;
}
log.debug("adding " + hostSpec + " to working node list");
this.connections.push(new ConnectionInPool({
host: host[0],
port: (isNaN(host[1])) ? 9160 : host[1],
keyspace: config.keyspace,
user: config.user,
pass: config.pass,
use_bigints: self.use_bigints,
timeout: self.timeout,
log_time: self.log_time
}));
}
};
/**
* increment the current node pointer, skipping over any bad nodes. has a side-effect of resetting
* unhealthy nodes that are stale (but not reconnecting them).
* @return boolean indicating if all nodes are unhealthy.
*/
PooledConnection2.prototype._incr = function() {
var incrCount = 0;
while (incrCount < this.connections.length) {
incrCount += 1;
this.current_node = (this.current_node + 1) % this.connections.length;
if (this.connections[this.current_node]) {
if (this.connections[this.current_node].isHealthy()) {
break;
} else if (this.connections[this.current_node].isStaleUnhealthy()) {
// unhealthy and stale, so let reset the node (appears as if unconnected).
this.connections[this.current_node].taken = false;
this.connections[this.current_node].connected = false;
this.connections[this.current_node].unhealthyAt = 0;
break;
} else {
//`console.log('not healthy ' + this.current_node + ',' + incrCount);
}
}
}
// all nodes are unhealthy if we looped around and no healthy nodes were found.
return incrCount >= this.connections.length && !this.connections[this.current_node].isHealthy();
};
/**
* execute a query. retries on other nodes when connection failures are detected.
*/
PooledConnection2.prototype.execute = function(query, args, callback) {
var self = this;
self._getNextCon(function(err, con) {
if (err) {
callback(err, null);
} else {
try {
con.taken = true;
con.execute(query, args, function(err, result) {
con.taken = false;
var recoverableError = null;
if (err) {
if (err.hasOwnProperty('name') && contains(appExceptions, err.name)) {
callback(err, null);
return;
} else {
recoverableError = err;
}
if (recoverableError) {
con.unhealthyAt = new Date().getTime();
con.taken = false;
log.warn('setting unhealthy from execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port);
// try again.
self.execute(query, args, callback);
}
} else {
callback(null, result);
}
});
} catch (err) {
// individual connection has failed.
con.unhealthyAt = new Date().getTime();
con.taken = false;
log.warn('setting unhealthy from catch outside execute ' + con.connectionInfo.host + ':' + con.connectionInfo.port);
// try again.
self.execute(query, args, callback);
}
}
});
};
/** gets the next untaken connection. errors when all connections are bad, or loop times out. */
PooledConnection2.prototype._getNextCon = function(callback) {
var self = this;
var tryStart = new Date().getTime();
var con = null;
var allBad = false;
var takens = [];
async.whilst(function truthTest() {
// should the timeout of getting a single connection be the sum of all connections? Think of a scenario where the
// timeout is N, but the first X nodes are unresponsive. You still want to allow access to the subsequent good
// nodes.
return !allBad && con === null && (new Date().getTime() - tryStart) < (self.timeout * self.connections.length);
}, function tryConnect(callback) {
var c = self.connections[self.current_node];
allBad = self._incr();
if (c.taken) {
takens[self.current_node] = takens[self.current_node] === undefined ? 1 : takens[self.current_node] + 1;
if (takens[self.current_node] > 0) {
// we've tried this node > 1 times and it still isn't available, this means that all other nodes are occupied
// or down (we've looped around all nodes). Continually checking will blow the stack, so lets wait
// 10 ms. before checking again.
setTimeout(callback, 10);
} else {
callback();
}
} else if (c.unhealthyAt > 0) {
callback();
} else if (!c.connected) {
c.connect(function(err) {
if (c.connected) {
con = c;
}
// some errors we pass back. some we swallow and iterate over.
if (err instanceof ttypes.NotFoundException) {
callback(err, null);
} else if (err && err.errno && err.errno === constants.ETIMEDOUT) {
callback();
} else {
callback();
}
});
} else {
con = c;
callback();
}
}, function whenDone(err) {
if (allBad && !err) {
err = new Error('All connections are unhealthy.');
} else if (!con && !err) {
err = new Error('connection was not set');
}
callback(err, con);
});
};
PooledConnection2.prototype.shutdown = function(callback) {
this.connections.forEach(function(con) {
if (con.connected) {
con.close();
}
});
if (callback) {
callback();
}
};
@@ -27,7 +27,8 @@ var async = require('async');
var BigInteger = require('../lib/bigint').BigInteger;
var Connection = require('../lib/driver').Connection;
var PooledConnection = require('../lib/driver').PooledConnection;
var PooledConnection = require('../lib/driver').PooledConnection2;
var ConnectionInPool = require('../lib/driver').ConnectionInPool;
var ttypes = require('../lib/gen-nodejs/cassandra_types');
var Keyspace = require('../node-cassandra-client').Keyspace;
var System = require('../lib/system').System;
@@ -278,8 +279,8 @@ exports.testPooledConnectionKeyspaceDoesNotExistConnect = function(test, assert)
use_bigints: false});
con.execute('SELECT * FROM foo', [], function(err) {
assert.ok(err);
assert.equal(err.name, 'NotFoundException')
assert.equal(err.message, 'ColumnFamily or Keyspace does not exist');
assert.equal(err.name, 'NotFoundException')
test.finish();
});
};
@@ -847,7 +848,7 @@ exports.testCustomValidators = function(test, assert) {
exports.testPooledConnectionFailover = function(test, assert) {
var hosts = ['google.com:8000', '127.0.0.1:6567', '127.0.0.2', '127.0.0.1:19170'];
var hosts = ['google.com:8000', '127.0.0.1:6567', '127.0.0.1:19170', '127.0.0.2'];
var conn = new PooledConnection({'hosts': hosts, 'keyspace': 'Keyspace1', use_bigints: true, 'timeout': 5000});
async.series([
@@ -989,4 +990,19 @@ exports.testTimeLogging = function(test, assert) {
};
exports.testConnectionInPool = function(test, assert) {
var con = new ConnectionInPool({
host: '127.0.0.1',
port: 19170,
keyspace: 'Keyspace1',
use_bigints: true
});
con.connect(function(err) {
if (err) {
assert.ifError(err);
} else {
con.close();
test.finish();
}
});
};

0 comments on commit fa928c4

Please sign in to comment.