Skip to content

Commit

Permalink
Save and clear client state. Restore it after the connection is ready.
Browse files Browse the repository at this point in the history
This change stores the connection state regarding subscriptions,
selected db and monitoring. When the connection to Redis drops, the state
is reestablished after a succesful reconnect. Fixes #241. Fixes #210.

Signed-off-by: DTrejo <david.trejo@voxer.com>
  • Loading branch information
ignacio authored and DTrejo committed Jul 13, 2012
1 parent a5ea716 commit 50914ba
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 1 deletion.
33 changes: 32 additions & 1 deletion index.js
Expand Up @@ -66,6 +66,8 @@ function RedisClient(stream, options) {
this.parser_module = null;
this.selected_db = null; // save the selected db here, used when reconnecting

this.old_state = null;

var self = this;

this.stream.on("connect", function () {
Expand Down Expand Up @@ -272,18 +274,35 @@ RedisClient.prototype.on_ready = function () {

this.ready = true;

if (this.old_state !== null) {
this.monitoring = this.old_state.monitoring;
this.pub_sub_mode = this.old_state.pub_sub_mode;
this.selected_db = this.old_state.selected_db;
this.old_state = null;
}

// magically restore any modal commands from a previous connection
if (this.selected_db !== null) {
this.send_command('select', [this.selected_db]);
}
if (this.pub_sub_mode === true) {
// only emit "ready" when all subscriptions were made again
var callback_count = 0;
var callback = function() {
callback_count--;
if (callback_count == 0) {
self.emit("ready");
}
}
Object.keys(this.subscription_set).forEach(function (key) {
var parts = key.split(" ");
if (exports.debug_mode) {
console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
}
self.send_command(parts[0], [parts[1]]);
callback_count++;
self.send_command(parts[0] + "scribe", [parts[1]], callback);
});
return;
} else if (this.monitoring) {
this.send_command("monitor");
} else {
Expand Down Expand Up @@ -382,6 +401,18 @@ RedisClient.prototype.connection_gone = function (why) {
this.connected = false;
this.ready = false;

if (this.old_state === null) {
var state = {
monitoring: this.monitoring,
pub_sub_mode: this.pub_sub_mode,
selected_db: this.selected_db
};
this.old_state = state;
this.monitoring = false;
this.pub_sub_mode = false;
this.selected_db = null;
}

// since we are collapsing end and close, users don't expect to be called twice
if (! this.emitted_end) {
this.emit("end");
Expand Down
60 changes: 60 additions & 0 deletions test.js
Expand Up @@ -619,6 +619,66 @@ tests.SUBSCRIBE_QUIT = function () {
client3.subscribe("chan3");
};

tests.SUBSCRIBE_CLOSE_RESUBSCRIBE = function () {
var name = "SUBSCRIBE_CLOSE_RESUBSCRIBE";
var c1 = redis.createClient();
var c2 = redis.createClient();
var count = 0;

/* Create two clients. c1 subscribes to two channels, c2 will publish to them.
c2 publishes the first message.
c1 gets the message and drops its connection. It must resubscribe itself.
When it resubscribes, c2 publishes the second message, on the same channel
c1 gets the message and drops its connection. It must resubscribe itself, again.
When it resubscribes, c2 publishes the third message, on the second channel
c1 gets the message and drops its connection. When it reconnects, the test ends.
*/

c1.on("message", function(channel, message) {
if (channel === "chan1") {
assert.strictEqual(message, "hi on channel 1");
c1.stream.end();

} else if (channel === "chan2") {
assert.strictEqual(message, "hi on channel 2");
c1.stream.end();

} else {
c1.quit();
c2.quit();
assert.fail("test failed");
}
})

c1.subscribe("chan1", "chan2");

c2.once("ready", function() {
console.log("c2 is ready");
c1.on("ready", function(err, results) {
console.log("c1 is ready", count);

count++;
if (count == 1) {
c2.publish("chan1", "hi on channel 1");
return;

} else if (count == 2) {
c2.publish("chan2", "hi on channel 2");

} else {
c1.quit(function() {
c2.quit(function() {
next(name);
});
});
}
});

c2.publish("chan1", "hi on channel 1");

});
};

tests.EXISTS = function () {
var name = "EXISTS";
client.del("foo", "foo2", require_number_any(name));
Expand Down

0 comments on commit 50914ba

Please sign in to comment.