Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixed hard-to-reproduce PUBSUB bug.

Also, changed internals to not queue callback functions but the entire
command.  This makes debugging a lot easier, and does not change the
external API/interface at all.
  • Loading branch information...
commit 1f28998a44eb8e1953365dca1059d278f6f55c9b 1 parent fb44daa
@fictorial authored
Showing with 78 additions and 79 deletions.
  1. +74 −75 lib/redis-client.js
  2. +4 −4 test/test.js
View
149 lib/redis-client.js
@@ -254,7 +254,7 @@ ReplyParser.prototype.feed = function (inbound) {
};
function Client(stream) {
- this.callbacks = [];
+ this.originalCommands = [];
this.channelCallbacks = {};
this.replyParser = new ReplyParser(this.onReply_, this);
@@ -316,73 +316,83 @@ Client.prototype.close = function () {
};
Client.prototype.onReply_ = function (reply) {
- if (reply.type == ERROR) {
- var errorMessage = reply.value.utf8Slice(0, reply.value.length);
- if (exports.debugMode) sys.debug("error: " + errorMessage);
- this.callbacks.shift()(errorMessage, null); // err, reply
+ if (this.handlePublishedMessage_(reply))
return;
- }
-
- // PUBSUB message?
-
- if (reply.type === MULTIBULK &&
- reply.value instanceof Array &&
- reply.value.length === 3 && // ['message', channel, payload]
- reply.value[0].value.length === 7 && // 'message'
- reply.value[0].value.asciiSlice(0, 7) === 'message' &&
- Object.getOwnPropertyNames(this.channelCallbacks).length > 0) {
-
- var channelNameOrPattern = reply.value[1].value;
- var channelCallback = this.channelCallbacks[channelNameOrPattern];
-
- if (typeof(channelCallback) == 'undefined') {
- // No 1:1 channel name match.
- //
- // Perhaps the subscription was for a pattern (PSUBSCRIBE)?
- // Redis does not send the pattern that matched from an
- // original PSUBSCRIBE request. It sends the (fn)matching
- // channel name instead. Thus, let's try to fnmatch the
- // channel the message was published to/on to a subscribed
- // pattern, and callback the associated function.
- //
- // A -> Redis PSUBSCRIBE foo.*
- // B -> Redis PUBLISH foo.bar hello
- // Redis -> A MESSAGE foo.bar hello (no pattern specified)
-
- var channelNamesOrPatterns =
- Object.getOwnPropertyNames(this.channelCallbacks);
-
- for (var i=0; i < channelNamesOrPatterns.length; ++i) {
- var thisNameOrPattern = channelNamesOrPatterns[i];
- if (fnmatch(thisNameOrPattern, channelNameOrPattern)) {
- channelCallback = this.channelCallbacks[thisNameOrPattern];
- break;
- }
- }
- }
- if (typeof(channelCallback) === 'function') {
- // Good, we found a function to callback.
+ var originalCommand = this.originalCommands.shift();
+ var callback = originalCommand[originalCommand.length - 1];
- var payload = reply.value[2].value;
- channelCallback(channelNameOrPattern, payload);
- }
+ // Callbacks expect (err, reply) as args.
- return;
+ if (typeof callback == "function") {
+ if (reply.type == ERROR) {
+ callback(reply.value.utf8Slice(0, reply.value.length), null);
+ } else {
+ callback(null, maybeConvertReplyValue(originalCommand[0], reply));
+ }
}
+};
- // Non-PUBSUB reply (e.g. GET command reply).
+Client.prototype.handlePublishedMessage_ = function (reply) {
+ // We're looking for a multibulk like:
+ // ["message", "channelName", messageBuffer]
+
+ if (reply.type != MULTIBULK ||
+ !(reply.value instanceof Array) ||
+ reply.value.length != 3 ||
+ reply.value[0].value.length != 7 ||
+ reply.value[0].value.asciiSlice(0, 7) != 'message')
+ return false;
+
+ // This is tricky. We are returning true even though there
+ // might not be any callback called! This may happen when a
+ // caller subscribes then unsubscribes while a published
+ // message is in transit to us. When the message arrives, no
+ // one is there to consume it. In essence, as long as the
+ // reply type is a published message (see above), then we've
+ // "handled" the reply.
+
+ if (Object.getOwnPropertyNames(this.channelCallbacks).length == 0)
+ return true;
- var callback = this.callbacks.shift();
- reply.value = maybeConvertReplyValue(callback.commandName, reply);
+ var channelNameOrPattern = reply.value[1].value;
+ var channelCallback = this.channelCallbacks[channelNameOrPattern];
+ if (typeof channelCallback == 'undefined') {
+ // No 1:1 channel name match.
+ //
+ // Perhaps the subscription was for a pattern (PSUBSCRIBE)?
+ // Redis does not send the pattern that matched from an
+ // original PSUBSCRIBE request. It sends the (fn)matching
+ // channel name instead. Thus, let's try to fnmatch the
+ // channel the message was published to/on to a subscribed
+ // pattern, and callback the associated function.
+ //
+ // A -> Redis PSUBSCRIBE foo.*
+ // B -> Redis PUBLISH foo.bar hello
+ // Redis -> A MESSAGE foo.bar hello (no pattern specified)
+
+ var channelNamesOrPatterns =
+ Object.getOwnPropertyNames(this.channelCallbacks);
+
+ for (var i=0; i < channelNamesOrPatterns.length; ++i) {
+ var thisNameOrPattern = channelNamesOrPatterns[i];
+ if (fnmatch(thisNameOrPattern, channelNameOrPattern)) {
+ channelCallback = this.channelCallbacks[thisNameOrPattern];
+ break;
+ }
+ }
+ }
+
+ if (typeof(channelCallback) === 'function') {
+ // Good, we found a function to callback.
- if (exports.debugMode) {
- sys.debug("reply: " + JSON.stringify(reply));
- sys.debug("from command: " + callback.commandName);
+ var payload = reply.value[2].value;
+ channelCallback(channelNameOrPattern, payload);
+ return true;
}
- callback(null, reply.value); // null => no Redis error.
-};
+ return false;
+}
function maybeAsNumber(str) {
var value = parseInt(str, 10);
@@ -574,22 +584,15 @@ var commands = [
Client.prototype.sendCommand = function () {
var commandName = arguments[0].toLowerCase();
+ var originalCommand = Array.prototype.slice.call(arguments);
// Invariant: number of queued callbacks == number of commands sent to
// Redis whose replies have not yet been received and processed. Thus,
// if no callback was given, we create a dummy callback.
var argCount = arguments.length;
- var callback = null;
-
- if (typeof(arguments[argCount - 1]) == 'function') {
- callback = arguments[argCount - 1];
+ if (typeof arguments[argCount - 1] == 'function')
--argCount;
- } else {
- callback = function () {};
- }
-
- callback.commandName = commandName;
// All requests are formatted as multi-bulk.
// The first line of a multi-bulk request is "*<number of parts to follow>\r\n".
@@ -654,15 +657,11 @@ Client.prototype.sendCommand = function () {
}
}
- // Store the callback and write the buffer to Redis.
-
- if (exports.debugMode) callback.commandName = commandName;
- this.callbacks.push(callback);
+ this.originalCommands.push(originalCommand);
this.stream.write(requestBuffer);
if (exports.debugMode)
- sys.debug("[SEND (" + offset + " bytes)] " +
- debugFilter(requestBuffer, requestBuffer.length));
+ sys.debug("[SEND] " + debugFilter(requestBuffer, requestBuffer.length));
};
commands.forEach(function (commandName) {
@@ -693,7 +692,7 @@ commands.forEach(function (commandName) {
// issue other commands, use a second client instance.
Client.prototype.subscribeTo = function (nameOrPattern, callback) {
- if (typeof(this.channelCallbacks[nameOrPattern]) === 'function')
+ if (typeof this.channelCallbacks[nameOrPattern] === 'function')
return;
if (typeof(callback) !== 'function')
@@ -709,7 +708,7 @@ Client.prototype.subscribeTo = function (nameOrPattern, callback) {
};
Client.prototype.unsubscribeFrom = function (nameOrPattern) {
- if (typeof(this.channelCallbacks[nameOrPattern]) === 'undefined')
+ if (typeof this.channelCallbacks[nameOrPattern] === 'undefined')
return;
delete this.channelCallbacks[nameOrPattern];
View
8 test/test.js
@@ -223,7 +223,7 @@ function testParseMultiBulkReply() {
checkEqual(reply.value[1].value.utf8Slice(0, reply.value[1].value.length), '#redis', "testParseMultiBulkReply d-10");
checkEqual(reply.value[2].value, 1, "testParseMultiBulkReply d-11");
});
- d.feed(bufferFromString("*3\r\n$9\r\nsubscribe\r\n$6\r\n#redis\r\n:1\r\n"));
+ d.feed(bufferFromString("*3\r\n$9\r\nsubscribe\r\n$6\r\n#redis\r\n:1\r\n*3\r\n$7\r\nmessage\r\n"));
@fictorial Owner

Just adding a partial command to the end. I thought the hard to reproduce bug had to do with partial commands in the input.

Alas, it did not. But, this is still a good case to handle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
var e = new redisclient.ReplyParser(function (reply) {
checkEqual(reply.type, redisclient.MULTIBULK, "testParseMultiBulkReply e-0");
@@ -1681,7 +1681,7 @@ var allTestFunctions = [
];
function checkIfDone() {
- if (client.callbacks.length == 0) {
+ if (client.originalCommands.length == 0) {
testSUBSCRIBEandPUBLISH();
var checks = 0;
@@ -1696,7 +1696,7 @@ function checkIfDone() {
}, 100);
} else {
if (verbose)
- log('info', client.callbacks.length + " callbacks still pending...");
+ log('info', client.originalCommands.length + " replies still pending...");
else if (!quiet)
sys.print("+");
}
@@ -1716,7 +1716,7 @@ function runAllTests() {
testFunction();
});
- setInterval(checkIfDone, 100);
+ setInterval(checkIfDone, 1500);
}
var connectionFailed = false;
@fictorial

Just adding a partial command to the end. I thought the hard to reproduce bug had to do with partial commands in the input.

Alas, it did not. But, this is still a good case to handle.

Please sign in to comment.
Something went wrong with that request. Please try again.