Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix for issue 516, rewriting the command vector to correctly repliate…

… BRPOPLPUSH. Still to test everything, especially edge cases
  • Loading branch information...
commit 20867e8009876e50ae1536203a097f4338598943 1 parent 10e987c
Salvatore Sanfilippo authored
Showing with 56 additions and 7 deletions.
  1. +56 −7 src/t_list.c
63 src/t_list.c
View
@@ -619,6 +619,31 @@ void lremCommand(redisClient *c) {
if (removed) touchWatchedKey(c->db,c->argv[1]);
}
+void rewriteClientCommandVector(redisClient *c, int argc, ...) {
+ va_list ap;
+ int j;
+ robj **argv; /* The new argument vector */
+
+ argv = zmalloc(sizeof(robj*)*argc);
+ va_start(ap,argc);
+ for (j = 0; j < argc; j++) {
+ robj *a;
+
+ a = va_arg(ap, robj*);
+ argv[j] = a;
+ incrRefCount(a);
+ }
+ /* We free the objects in the original vector at the end, so we are
+ * sure that if the same objects are reused in the new vector the
+ * refcount gets incremented before it gets decremented. */
+ for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
+ zfree(c->argv);
+ /* Replace argv and argc with our new versions. */
+ c->argv = argv;
+ c->argc = argc;
+ va_end(ap);
+}
+
/* This is the semantic of this command:
* RPOPLPUSH srclist dstlist:
* IF LLEN(srclist) > 0
@@ -635,7 +660,9 @@ void lremCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
-void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+ robj *aux;
+
if (!handleClientsWaitingListPush(c,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
@@ -643,9 +670,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
dbAdd(c->db,dstkey,dstobj);
} else {
touchWatchedKey(c->db,dstkey);
- server.dirty++;
}
listTypePush(dstobj,value,REDIS_HEAD);
+ /* If we are pushing as a result of LPUSH against a key
+ * watched by BLPOPLPUSH, we need to rewrite the command vector.
+ * But if this is called directly by RPOPLPUSH (either directly
+ * or via a BRPOPLPUSH where the popped list exists)
+ * we should replicate the BRPOPLPUSH command itself. */
+ if (c != origclient) {
+ aux = createStringObject("LPUSH",5);
+ rewriteClientCommandVector(origclient,3,aux,dstkey,value);
+ decrRefCount(aux);
+ } else {
+ /* Make sure to always use RPOPLPUSH in the replication / AOF,
+ * even if the original command was BRPOPLPUSH. */
+ aux = createStringObject("RPOPLPUSH",9);
+ rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
+ decrRefCount(aux);
+ }
+ server.dirty++;
}
/* Always send the pushed value to the client. */
@@ -661,16 +704,22 @@ void rpoplpushCommand(redisClient *c) {
addReply(c,shared.nullbulk);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+ robj *touchedkey = c->argv[1];
+
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
- rpoplpushHandlePush(c,c->argv[2],dobj,value);
+ /* We saved touched key, and protect it, since rpoplpushHandlePush
+ * may change the client command argument vector. */
+ incrRefCount(touchedkey);
+ rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
/* Delete the source list when it is empty */
- if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
- touchWatchedKey(c->db,c->argv[1]);
+ if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
+ touchWatchedKey(c->db,touchedkey);
+ decrRefCount(touchedkey);
server.dirty++;
}
}
@@ -823,14 +872,14 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
- return 1;
+ return 1; /* Serve just the first client as in B[RL]POP semantics */
} else {
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
dstobj = lookupKeyWrite(receiver->db,dstkey);
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
decrRefCount(dstkey);
} else {
- rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
+ rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
decrRefCount(dstkey);
return 1;
}
Please sign in to comment.
Something went wrong with that request. Please try again.