Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "redis-clustr",
"version": "1.6.0",
"version": "1.7.0",
"description": "Redis cluster client",
"main": "src/RedisClustr.js",
"keywords": [
Expand Down
54 changes: 42 additions & 12 deletions src/RedisClustr.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ RedisClustr.prototype.getClient = function(port, host, master) {
if (
err.code === 'CONNECTION_BROKEN' ||
err.code === 'UNCERTAIN_STATE' ||
err.code === 'NR_CLOSED' ||
/Redis connection to .* failed.*/.test(err.message)
) {
// broken connection so force a new client to be created (node_redis will reconnect other errors)
if (err.code === 'CONNECTION_BROKEN') self.connections[name] = null;
// broken/closed connection so force a new client to be created (node_redis should reconnect other errors)
if (err.code === 'CONNECTION_BROKEN' || err.code === 'NR_CLOSED') self.connections[name] = null;
self.emit('connectionError', err, cli);
self.getSlots();
return;
Expand All @@ -104,6 +105,9 @@ RedisClustr.prototype.getClient = function(port, host, master) {
});
if (!self.connected && wasConnected) self.emit('disconnect');

// set connection to null so we create a new client if we want to reconnect
if (cli.closing) self.connections[name] = null;

// setImmediate as node_redis sets emitted_end after emitting end
setImmediate(function() {
var wasEnded = self.ended;
Expand All @@ -122,14 +126,21 @@ RedisClustr.prototype.getClient = function(port, host, master) {
/**
* Get a random Redis connection
* @date 2015-02-18
* @param {array} exclude List of addresses to exclude (falsy to ignore none)
* @return {Redis} A random, ready, Redis connection.
* @param {array} exclude List of addresses to exclude (falsy to ignore none)
* @param {boolean} forceSlaves Include slaves, regardless of configuration
* @return {Redis} A random, ready, Redis connection.
*/
RedisClustr.prototype.getRandomConnection = function(exclude) {
RedisClustr.prototype.getRandomConnection = function(exclude, forceSlaves) {
var self = this;

var masterOnly = !forceSlaves && self.config.slaves === 'never';

var available = Object.keys(self.connections).filter(function(f) {
return self.connections[f] && self.connections[f].ready && (!exclude || exclude.indexOf(f) === -1);
var con = self.connections[f];
return con &&
con.ready &&
(!exclude || exclude.indexOf(f) === -1) &&
(!masterOnly || con.master);
});

var randomIndex = Math.floor(Math.random() * available.length);
Expand Down Expand Up @@ -178,7 +189,7 @@ RedisClustr.prototype.getSlots = function(cb) {
if (typeof readyTimeout !== 'undefined') clearTimeout(readyTimeout);
if (self.quitting) return runCbs(new Error('cluster is quitting'));

var client = self.getRandomConnection(exclude);
var client = self.getRandomConnection(exclude, true);
if (!client) {
var err = new Error('couldn\'t get slot allocation');
err.errors = tryErrors;
Expand Down Expand Up @@ -269,7 +280,6 @@ RedisClustr.prototype.selectClient = function(key, conf) {
var self = this;

// this command doesnt have keys, return any connection
// NOTE: this means slaves may be used for no key commands regardless of slave config
if (conf.keyless) return self.getRandomConnection();

if (Array.isArray(key)) key = key[0];
Expand All @@ -296,6 +306,13 @@ RedisClustr.prototype.selectClient = function(key, conf) {
}

var cli = clients[index];

if (!cli.ready) {
self.getSlots();
// this could be improved to select another slave
return self.getRandomConnection();
}

if (index === 0 && cli.readOnly) {
cli.send_command('readwrite', []);
cli.readOnly = false;
Expand Down Expand Up @@ -435,7 +452,7 @@ RedisClustr.prototype.commandCallback = function(cli, cmd, args, cb) {
return;
}

if (msg.substr(0, 8) === 'TRYAGAIN' || err.code === 'CLUSTERDOWN') {
if (err.code === 'CLUSTERDOWN' || msg.substr(0, 8) === 'TRYAGAIN') {
// TRYAGAIN response or cluster down, retry with backoff up to 1280ms
setTimeout(function() {
cli[cmd].apply(cli, args);
Expand Down Expand Up @@ -563,10 +580,10 @@ RedisClustr.prototype.subscribeAll = function(exclude) {
var cli = self.subscribeClient = self.createClient(con.connection_options.port, con.connection_options.host);

cli.on('error', function(err) {
console.log(err);
if (
err.code === 'CONNECTION_BROKEN' ||
err.code === 'UNCERTAIN_STATE' ||
err.code === 'NR_CLOSED' ||
/Redis connection to .* failed.*/.test(err.message)
) {
self.emit('connectionError', err, cli);
Expand Down Expand Up @@ -614,9 +631,22 @@ setupCommands(RedisClustr);
* @date 2014-11-19
* @return {RedisBatch} A RedisBatch which has a very similar interface to redis/
*/
RedisClustr.prototype.batch = RedisClustr.prototype.multi = function() {
RedisClustr.prototype.batch = RedisClustr.prototype.multi = function(commands) {
var self = this;
return new RedisBatch(self);
var batch = new RedisBatch(self);

if (Array.isArray(commands) && commands.length > 0) {
commands.forEach(function (command) {
var args = [];
if (command.length > 1) {
args = command.slice(1);
}

batch[command[0]].apply(batch, args);
});
}

return batch;
};

/**
Expand Down