Skip to content

Commit

Permalink
Merge 043d638 into 63adccc
Browse files Browse the repository at this point in the history
  • Loading branch information
scommisso committed Oct 28, 2014
2 parents 63adccc + 043d638 commit 64ca45d
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 393 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ priam

A simple [Cassandra](http://cassandra.apache.org/) driver for [NodeJS](http://nodejs.org). It wraps the
[helenus](https://github.com/simplereach/helenus) and
[node-cassandra-cql](https://github.com/jorgebay/node-cassandra-cql) drivers with additional error/retry handling, external
[cassandra-driver](https://github.com/datastax/nodejs-driver) modules with additional error/retry handling, external
`.cql` file support, and connection option resolution from an external source, among other improvements.

By default, the driver uses [node-cassandra-cql](https://github.com/jorgebay/node-cassandra-cql) over binary connection.
By default, the driver uses [cassandra-driver](https://github.com/datastax/nodejs-driver) over a binary-protocol connection.
If a Thrift connection is desired, simply specify the [helenus](https://github.com/simplereach/helenus) driver option
in config. [Priam](https://github.com/godaddy/node-priam) uses internal aliases to map
[helenus](https://github.com/simplereach/helenus) and [node-cassandra-cql](https://github.com/jorgebay/node-cassandra-cql)
[helenus](https://github.com/simplereach/helenus) and [cassandra-driver](https://github.com/datastax/nodejs-driver)
options to facilitate easily switching between the two drivers. Specifying the appropriate `cqlVersion` for your database
will ensure that the appropriate driver is selected.

Expand All @@ -42,7 +42,8 @@ var db = require('priam')({
timeout: 4000, /* optional, defaults to 4000 */
poolSize: 2, /* optional, defaults to 1 */
consistencyLevel: 'one', /* optional, defaults to one. Will throw if not a valid Cassandra consistency level*/
driver: 'thrift', /* optional, defaults to 'node-cassandra-cql' */,
driver: 'helenus', /* optional, defaults to 'datastax' */,
protocol: 'thrift', /* optional, defaults to 'binary' */,
numRetries: 3, /* optional, defaults to 0. Retries occur on connection failures. */
retryDelay: 100, /* optional, defaults to 100ms. Used on error retry or consistency fallback retry */
enableConsistencyFailover: true, /* optional, defaults to true */
Expand Down Expand Up @@ -537,7 +538,8 @@ var db = require('priam')({
Release Notes
-------------
- `0.8.17`: Batch.execute no longer yields an error when the batch is empty
- `0.9.0`: Removed `node-cassandra-cql` in favor of `cassandra-driver`.
- `0.8.17`: Batch.execute no longer yields an error when the batch is empty.
- `0.8.16`: Simplified result set transformation for `node-cassandra-cql` drivers.
- `0.8.15`: Add isBatch and isQuery methods to base driver.
- `0.8.14`: Fix `resultTransformer` bug when query generates an error.
Expand Down
3 changes: 2 additions & 1 deletion example/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ var http = require('http')
, metrics = new MetricsClient({ logger: logger })
, db = require('../index' /*"priam"*/)({
config: {
driver: 'node-cassandra-cql', //"helenus"
protocol: 'binary',
cqlVersion: '3.0',
queryDirectory: path.join(__dirname, 'cql'),

// If using config-based connection, use these options
Expand Down
13 changes: 4 additions & 9 deletions lib/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,21 @@ function Driver(context) {
context.config = context.config || {};
context.config.version = context.config.cqlVersion = context.config.cqlVersion || context.config.version || '3.1.0';
var version = context.config.parsedCqlVersion = parseVersion(context.config.cqlVersion);
var driver = context.config.driver || 'node-cassandra-cql';
var driver = context.config.driver || 'datastax';

var protocol = 'binaryV2';
var protocol = 'binary';
if (driver === 'helenus' || driver === 'thrift' || version.major < 3) {
// thrift is only supported by Helenus driver, and binary is not supported until Cassandra 1.2 (CQL3)
// thrift is only supported by Helenus driver
protocol = 'thrift';
}
else if (version.major < 3 || (version.major === 3 && version.minor < 1)) {
// binaryV2 protocol supported in latest node-cassandra-cql did not exist until Cassandra 2.0 (CQL3.1)
protocol = 'binaryV1';
}
context.config.protocol = protocol;

if (protocol === 'thrift') {
context.config.driver = 'helenus';
return new (require('./drivers/helenus'))(context);
}
else {
context.config.driver = (protocol === 'binaryV2' ? 'node-cassandra-cql' : 'priam-cassandra-cql');
return new (require('./drivers/node-cassandra-cql'))(context);
return new (require('./drivers/datastax'))(context);
}
}

Expand Down
7 changes: 5 additions & 2 deletions lib/drivers/base-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function BaseDriver() {
objectAscii: -1,
objectText: -2
};
this.hostConfigKey = 'hosts';
}

BaseDriver.prototype.initProviderOptions = function initProviderOptions(config) {
Expand Down Expand Up @@ -125,7 +126,9 @@ BaseDriver.prototype.getConnectionPool = function getConnectionPool(keyspace, wa
cb(null, pool);
}

var poolConfig = _.extend({hosts: []}, self.poolConfig);
var emptyConfig = {hosts: []};
self.remapConnectionOptions(emptyConfig);
var poolConfig = _.extend(emptyConfig, self.poolConfig);
if (self.connectionResolver) {
var resolutionRequestId = uuid.v4();
self.emit('connectionResolving', resolutionRequestId);
Expand All @@ -144,7 +147,7 @@ BaseDriver.prototype.getConnectionPool = function getConnectionPool(keyspace, wa
self.remapConnectionOptions(connectionData);
var portMap = poolConfig.connectionResolverPortMap;
if (portMap && portMap.from && portMap.to) {
connectionData.hosts = changePorts(connectionData.hosts, portMap.from, portMap.to);
connectionData[self.hostConfigKey] = changePorts(connectionData[self.hostConfigKey], portMap.from, portMap.to);
}

poolConfig = _.extend(poolConfig, connectionData);
Expand Down
250 changes: 250 additions & 0 deletions lib/drivers/datastax.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
'use strict';

var _ = require('lodash')
, util = require('util')
, uuid = require('uuid')
, BaseDriver = require('./base-driver')
, cqlDriver = require('cassandra-driver');


function DatastaxDriver() {
BaseDriver.call(this);
var consistencies = cqlDriver.types.consistencies;
this.hostConfigKey = 'contactPoints';
this.dataType = _.extend(this.dataType, cqlDriver.types.dataTypes);
this.consistencyLevel = {
ONE: consistencies.one,
one: consistencies.one,
TWO: consistencies.two,
two: consistencies.two,
THREE: consistencies.three,
three: consistencies.three,
QUORUM: consistencies.quorum,
quorum: consistencies.quorum,
LOCAL_QUORUM: consistencies.localQuorum,
localQuorum: consistencies.localQuorum,
LOCAL_ONE: consistencies.localOne,
localOne: consistencies.localOne,
EACH_QUORUM: consistencies.eachQuorum,
eachQuorum: consistencies.eachQuorum,
ALL: consistencies.all,
all: consistencies.all,
ANY: consistencies.any,
any: consistencies.any
};
}
util.inherits(DatastaxDriver, BaseDriver);

module.exports = function (context) {
var driver = new DatastaxDriver();
driver.init(context);
return driver;
};
module.exports.DatastaxDriver = DatastaxDriver;

DatastaxDriver.prototype.initProviderOptions = function init(config) {
setHelenusOptions(config);
config.supportsPreparedStatements = true;
};

DatastaxDriver.prototype.createConnectionPool = function createConnectionPool(poolConfig, waitForConnect, callback) {
var self = this
, openRequestId = uuid.v4()
, pool;

self.logger.debug('priam.Driver: Creating new pool', {
poolConfig: {
keyspace: poolConfig.keyspace,
contactPoints: poolConfig.contactPoints
}
});

var dsPoolConfig = _.cloneDeep(poolConfig);
if (dsPoolConfig.username && dsPoolConfig.password) {
dsPoolConfig.authProvider = new cqlDriver.auth.PlainTextAuthProvider(dsPoolConfig.username, dsPoolConfig.password);
}

dsPoolConfig.queryOptions = dsPoolConfig.queryOptions || {};
dsPoolConfig.queryOptions.fetchSize = dsPoolConfig.limit;
dsPoolConfig.queryOptions.prepare = false;
if (dsPoolConfig.consistencyLevel) {
dsPoolConfig.queryOptions.consistency = dsPoolConfig.consistencyLevel;
}
var port = null;
if (Array.isArray(dsPoolConfig.contactPoints)) {
for (var i = 0; i < dsPoolConfig.contactPoints.length; i++) {
var split = dsPoolConfig.contactPoints[i].split(':');
dsPoolConfig.contactPoints[i] = split[0].trim();
if (split.length > 1) {
port = parseInt(split[1].trim(), 10);
}
}
if (port !== null) {
dsPoolConfig.protocolOptions = dsPoolConfig.protocolOptions || {};
dsPoolConfig.protocolOptions.port = port;
}
}

if (poolConfig.getAConnectionTimeout) {
dsPoolConfig.socketOptions = dsPoolConfig.socketOptions || {};
dsPoolConfig.socketOptions.connectTimeout = poolConfig.getAConnectionTimeout;
}

if (poolConfig.poolSize) {
dsPoolConfig.pooling = dsPoolConfig.pooling || {};
dsPoolConfig.pooling.coreConnectionsPerHost = dsPoolConfig.pooling.coreConnectionsPerHost || {};
dsPoolConfig.pooling.coreConnectionsPerHost[cqlDriver.types.distance.local.toString()] = poolConfig.poolSize;
dsPoolConfig.pooling.coreConnectionsPerHost[cqlDriver.types.distance.remote.toString()] = Math.ceil(poolConfig.poolSize / 2);
}

pool = new cqlDriver.Client(dsPoolConfig);
pool.storeConfig = poolConfig;
pool.waiters = [];
pool.isReady = false;
pool.on('log', function (level, message, data) {
self.emit('connectionLogged', level, message, data);
// unrecoverable errors will yield error on execution, so treat these as warnings since they'll be retried
// treat everything else as debug information.
var logMethod = (level === 'error' || level === 'warning') ? 'warn' : 'debug';

var metaData = {
datastaxLogLevel: level,
data: data
};
self.logger[logMethod]('priam.Driver: ' + message, metaData);
});

this.emit('connectionOpening', openRequestId);
pool.connect(function (err) {
if (err) {
self.emit('connectionFailed', openRequestId, err);
self.logger.error('priam.Driver: Pool Connect Error',
{ name: err.name, error: err.message, inner: err.innerErrors });
if (waitForConnect) {
callback(err, pool);
}
self.callWaiters(err, pool);
return void self.closePool(pool);
}
pool.isReady = true;
self.emit('connectionOpened', openRequestId);
if (waitForConnect) {
callback(null, pool);
}
self.callWaiters(null, pool);
});
if (!waitForConnect) {
callback(null, pool);
}
};

DatastaxDriver.prototype.remapConnectionOptions = function remapConnectionOptions(connectionData) {
remapOption(connectionData, 'user', 'username');
remapOption(connectionData, 'hosts', 'contactPoints');
};

DatastaxDriver.prototype.closePool = function closePool(pool, callback) {
if (pool.isReady && !pool.isClosed) {
pool.isClosed = true;
pool.isReady = false;
pool.shutdown(callback);
}
else if (_.isFunction(callback)) {
pool.isClosed = true;
pool.isReady = false;
process.nextTick(callback);
}
this.emit('connectionClosed');
};

DatastaxDriver.prototype.executeCqlOnDriver = function executeCqlOnDriver(pool, cqlStatement, params, consistency, options, callback) {
var self = this;
var execOptions = _.assign({
prepare: !!options.executeAsPrepared,
consistency: consistency
}, options);
var hints = [];
_.forEach(params, function (param, index) {
if (param && param.hasOwnProperty('value') && param.hasOwnProperty('hint')) {
params[index] = param.value;
if (param.hint) {
if (_.isString(param.hint)) {
param.hint = self.dataType.getByName(param.hint).type;
}
hints[index] = param.hint;
}
}
});
if (hints.length) {
execOptions.hints = hints;
}

pool.execute(cqlStatement, params, execOptions, function (err, data) {
if (err) {
return void callback(err);
}
var result = (data && data.rows) ? data.rows : [];
return void callback(null, result);
});
};

DatastaxDriver.prototype.getNormalizedResults = function getNormalizedResults(original, options) {
var self = this;
var results = _.map(original, function (row) {
var result = {};
_.forOwn(row, function (value, name) {
if (name === 'columns' && _.isObject(value)) { return; } // skip metadata
if (typeof value === 'string') {
value = self.checkObjectResult(value, name, options);
}
result[name] = value;
});
return result;
});
return results;
};

var numberRegex = /^[0-9]+$/;
DatastaxDriver.prototype.dataToCql = function dataToCql(val) {
if (val && val.hasOwnProperty('value') && val.hasOwnProperty('hint')) {

// Transform timestamp values into Date objects if number or string
if (val.hint === this.dataType.timestamp) {
if (typeof val.value === 'number') {
val.value = new Date(val.value);
}
else if (typeof val.value === 'string') {
if (numberRegex.test(val.value)) {
val.value = new Date(parseInt(val.value, 10)); // string of numbers
}
else {
val.value = new Date(val.value); // assume ISO string
}
}
}

return val; // {value,hint} style parameter - hint will be extracted out on the execute step
}

if (!Buffer.isBuffer(val) && (util.isArray(val) || typeof val === 'object')) {
// arrays and objects should be JSON'ized
return JSON.stringify(val);
}

return val; // use as-is
};

function remapOption(config, from, to) {
if (config.hasOwnProperty(from)) {
config[to] = config[from];
delete config[from];
}
}

function setHelenusOptions(config) {
remapOption(config, 'timeout', 'getAConnectionTimeout');
remapOption(config, 'hostPoolSize', 'poolSize');
remapOption(config, 'cqlVersion', 'version');
remapOption(config, 'user', 'username');
remapOption(config, 'hosts', 'contactPoints');
}

0 comments on commit 64ca45d

Please sign in to comment.