Permalink
Browse files

Making progress

  • Loading branch information...
1 parent c7f3f80 commit c69c388331b1788657b84a241f55c2ef10a7160e Carlos Rodriguez committed Jun 15, 2012
Showing with 132 additions and 14 deletions.
  1. +100 −9 index.js
  2. +23 −0 lib/uuid.js
  3. +2 −1 package.json
  4. +7 −4 test/basic.js
View
109 index.js
@@ -5,7 +5,10 @@ var redis = require('redis')
, default_port = 6379
, default_host = '127.0.0.1'
, default_retry_timeout = 1000
+ , default_failover_wait = 10000
, commands = require('./node_modules/redis/lib/commands')
+ , async = require('async')
+ , uuid = require('./lib/uuid')
;
function createClient(nodes, options) {
@@ -22,12 +25,17 @@ function RedisHAClient(nodeList, options) {
this.ready = false;
this.on('ready', function() {
this.ready = true;
+ this.failoverInProgress = false;
this.drainQueue();
});
this.parseNodeList(nodeList, options);
}
util.inherits(RedisHAClient, EventEmitter);
+commands.forEach(function(k) {
+ commands.push(k.toUpperCase());
+});
+
commands.forEach(function(k) {
RedisHAClient.prototype[k] = function() {
var args = Array.prototype.slice.call(arguments);
@@ -83,7 +91,7 @@ commands.forEach(function(k) {
break;
}
if (preferSlave) {
- node = this.randomSlave();
+ //node = this.randomSlave();
}
if (!node) {
node = this.master;
@@ -136,7 +144,8 @@ RedisHAClient.prototype.parseNodeList = function(nodeList, clientOptions) {
}
var node = new Node(options, clientOptions);
node.on('error', function(err) {
- console.warn(err, 'error on ' + this.host + ':' + this.port);
+ console.warn(err);
+ // Label node as down, possibly failover
this.setStatus('down');
if (this.role == 'master') {
self.failover();
@@ -147,21 +156,18 @@ RedisHAClient.prototype.parseNodeList = function(nodeList, clientOptions) {
if (self.master && self.master.status != 'down') {
// Already have a good master, so there is a duplicate. Make it a slave
// of our master.
- self.makeSlave(this);
+ return self.makeSlave(this);
}
else {
- console.log(this.toString() + ' is master');
self.master = this;
self.emit('ready');
}
}
+ console.log(this.toString() + ' is ' + this.role);
});
node.on('status', function() {
console.log(this.toString() + ' is ' + this.status);
});
- node.on('ping', function(latency) {
- console.log(this.toString() + ' pong with ' + latency + 'ms latency');
- });
self.nodes.push(node);
});
};
@@ -172,16 +178,101 @@ RedisHAClient.prototype.makeSlave = function(node) {
if (err) {
return console.error(err, 'error setting slaveof');
}
+ console.log(node.toString() + ' is slave');
});
};
RedisHAClient.prototype.failover = function() {
- console.log('failing over!');
+ if (this.failoverInProgress) {
+ return;
+ }
+ if (this.slaves.length == 0) {
+ return console.log('no slaves to fail over to!');
+ }
+ this.failoverInProgress = true;
+ console.log('attempting failover!');
this.ready = false;
+ var tasks = [];
+ var id = uuid();
+ var self = this;
+ this.slaves.forEach(function(node) {
+ tasks.push(function(cb) {
+ node.client.MULTI
+ .SETNX('haredis:failover', id)
+ .GET('haredis:failover', function(err, reply) {
+ if (reply != id) {
+ return cb(new Error('failover already in progress'));
+ }
+ // set a shortish ttl on the lock
+ node.client.EXPIRE('haredis:failover', 5000, function(err) {
+ // Ignore errors
+ });
+ })
+ .EXEC(function(err, replies) {
+ if (err) {
+ return cb(err);
+ }
+ node.client.INFO(function(err, info) {
+ if (err) {
+ return cb(err);
+ }
+ node.info = info;
+ cb(null, node);
+ });
+ });
+ });
+ });
+ async.parallel(tasks, function(err, results) {
+ if (err) {
+ console.warn(err, 'error failing over');
+ if (results) {
+ console.log('rolling back locked nodes...');
+ results.forEach(function(node) {
+ if (node) {
+ node.client.DEL('haredis:failover', function(err) {
+ // Ignore errors
+ });
+ }
+ });
+ }
+ return self.retryFailover();
+ }
+ else {
+ // We've succeeded in locking all the slaves. Now elect our new master...
+ var freshest;
+ results.forEach(function(node) {
+ if (!freshest || node.info.master_last_io_seconds_ago < freshest.info.master_last_io_seconds_ago) {
+ freshest = node;
+ }
+ });
+ freshest.client.SLAVEOF('NO', 'ONE', function(err) {
+ if (err) {
+ console.error(err, 'error electing master');
+ return self.retryFailover();
+ }
+ self.master = freshest;
+ self.master.setStatus('up');
+ self.slaves.forEach(function(node) {
+ self.makeSlave(node);
+ });
+ });
+ }
+ });
+};
+
+RedisHAClient.prototype.retryFailover = function() {
+ var self = this;
+ // Try failing over again if we still don't have a resolution after waiting.
+ setTimeout(function() {
+ if (!self.master || self.master != 'up') {
+ self.failoverInProgress = false;
+ self.failover();
+ }
+ }, default_failover_wait);
};
RedisHAClient.prototype.__defineGetter__('slaves', function() {
- return this.nodes.every(function(node) {
+ return this.nodes.filter(function(node) {
return node.role == 'slave' && node.status == 'up';
});
});
View
@@ -0,0 +1,23 @@
+/**
+ * uuid generator
+ * --------------
+ *
+ * @exports {Function} uuid generator function
+ */
+
+ /**
+ * @param [len] {Number} Length of the ID to generate.
+ * @return {String} A unique alphanumeric string.
+ */
+ module.exports = function(len) {
+ len = (len || 8);
+ var ret = ''
+ , choices = 'ABCDEFGHIJKLMNOPQRSTUVWYXZabcdefghijklmnopqrstuvwyxz0123456789'
+ , range = choices.length - 1
+ , len_left = len
+ ;
+ while (len_left--) {
+ ret += choices[Math.round(Math.random() * range)];
+ }
+ return ret;
+};
View
@@ -12,7 +12,8 @@
"test": "make test"
},
"dependencies": {
- "redis": "~0.7.2"
+ "redis": "~0.7.2",
+ "async": "~0.1.22"
},
"devDependencies": {},
"optionalDependencies": {},
View
@@ -1,7 +1,10 @@
-var redis = require('../');
+var redis = require('../')
+ , uuid = require('../lib/uuid')
+ ;
var client = redis.createClient([6379, 6380, 6381]);
-client.get('carlos', function(err, reply) {
- console.log(reply);
-});
+setInterval(function() {
+ var id = uuid();
+ client.SET('test', id);
+ }, 20);

0 comments on commit c69c388

Please sign in to comment.