Navigation Menu

Skip to content

Commit

Permalink
Merge remote branch 'origin/2.2' into 2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
antirez committed Jun 9, 2011
2 parents 09d9879 + dfc7405 commit be87e76
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 21 deletions.
25 changes: 25 additions & 0 deletions src/networking.c
Expand Up @@ -872,3 +872,28 @@ void getClientsMaxBuffers(unsigned long *longest_output_list,
*biggest_input_buffer = bib;
}

void rewriteClientCommandVector(redisClient *c, int argc, ...) {
va_list ap;
int j;
robj **argv; /* The new argument vector */

argv = zmalloc(sizeof(robj*)*argc);
va_start(ap,argc);
for (j = 0; j < argc; j++) {
robj *a;

a = va_arg(ap, robj*);
argv[j] = a;
incrRefCount(a);
}
/* We free the objects in the original vector at the end, so we are
* sure that if the same objects are reused in the new vector the
* refcount gets incremented before it gets decremented. */
for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
zfree(c->argv);
/* Replace argv and argc with our new versions. */
c->argv = argv;
c->argc = argc;
va_end(ap);
}

1 change: 1 addition & 0 deletions src/redis.h
Expand Up @@ -661,6 +661,7 @@ void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
void rewriteClientCommandVector(redisClient *c, int argc, ...);

#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
Expand Down
47 changes: 37 additions & 10 deletions src/t_list.c
Expand Up @@ -635,17 +635,35 @@ void lremCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/

void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
robj *aux;

if (!handleClientsWaitingListPush(c,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createZiplistObject();
dbAdd(c->db,dstkey,dstobj);
} else {
touchWatchedKey(c->db,dstkey);
server.dirty++;
}
listTypePush(dstobj,value,REDIS_HEAD);
/* If we are pushing as a result of LPUSH against a key
* watched by BLPOPLPUSH, we need to rewrite the command vector.
* But if this is called directly by RPOPLPUSH (either directly
* or via a BRPOPLPUSH where the popped list exists)
* we should replicate the BRPOPLPUSH command itself. */
if (c != origclient) {
aux = createStringObject("LPUSH",5);
rewriteClientCommandVector(origclient,3,aux,dstkey,value);
decrRefCount(aux);
} else {
/* Make sure to always use RPOPLPUSH in the replication / AOF,
* even if the original command was BRPOPLPUSH. */
aux = createStringObject("RPOPLPUSH",9);
rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
decrRefCount(aux);
}
server.dirty++;
}

/* Always send the pushed value to the client. */
Expand All @@ -661,16 +679,22 @@ void rpoplpushCommand(redisClient *c) {
addReply(c,shared.nullbulk);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *touchedkey = c->argv[1];

if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* We saved touched key, and protect it, since rpoplpushHandlePush
* may change the client command argument vector. */
incrRefCount(touchedkey);
rpoplpushHandlePush(c,c,c->argv[2],dobj,value);

/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);

/* Delete the source list when it is empty */
if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
touchWatchedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
}
}
Expand Down Expand Up @@ -772,6 +796,7 @@ void unblockClientWaitingData(redisClient *c) {
/* Cleanup the client structure */
zfree(c->bpop.keys);
c->bpop.keys = NULL;
if (c->bpop.target) decrRefCount(c->bpop.target);
c->bpop.target = NULL;
c->flags &= ~REDIS_BLOCKED;
c->flags |= REDIS_UNBLOCKED;
Expand Down Expand Up @@ -815,6 +840,9 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
receiver = ln->value;
dstkey = receiver->bpop.target;

/* Protect receiver->bpop.target, that will be freed by
* the next unblockClientWaitingData() call. */
if (dstkey) incrRefCount(dstkey);
/* This should remove the first element of the "clients" list. */
unblockClientWaitingData(receiver);

Expand All @@ -823,17 +851,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
return 1;
return 1; /* Serve just the first client as in B[RL]POP semantics */
} else {
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
dstobj = lookupKeyWrite(receiver->db,dstkey);
if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
decrRefCount(dstkey);
} else {
rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
decrRefCount(dstkey);
return 1;
}
decrRefCount(dstkey);
}
}

Expand Down
17 changes: 6 additions & 11 deletions src/t_set.c
Expand Up @@ -325,7 +325,7 @@ void scardCommand(redisClient *c) {
}

void spopCommand(redisClient *c) {
robj *set, *ele;
robj *set, *ele, *aux;
int64_t llele;
int encoding;

Expand All @@ -341,16 +341,11 @@ void spopCommand(redisClient *c) {
setTypeRemove(set,ele);
}

/* Change argv to replicate as SREM */
c->argc = 3;
c->argv = zrealloc(c->argv,sizeof(robj*)*(c->argc));

/* Overwrite SREM with SPOP (same length) */
redisAssert(sdslen(c->argv[0]->ptr) == 4);
memcpy(c->argv[0]->ptr, "SREM", 4);

/* Popped element already has incremented refcount */
c->argv[2] = ele;
/* Replicate/AOF this command as an SREM operation */
aux = createStringObject("SREM",4);
rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
decrRefCount(ele);
decrRefCount(aux);

addReplyBulk(c,ele);
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
Expand Down
18 changes: 18 additions & 0 deletions tests/integration/replication.tcl
Expand Up @@ -6,6 +6,24 @@ start_server {tags {"repl"}} {
s -1 role
} {slave}

test {BRPOPLPUSH replication, when blocking against empty list} {
set rd [redis_deferring_client]
$rd brpoplpush a b 5
r lpush a foo
after 1000
assert_equal [r debug digest] [r -1 debug digest]
}

test {BRPOPLPUSH replication, list exists} {
set rd [redis_deferring_client]
r lpush c 1
r lpush c 2
r lpush c 3
$rd brpoplpush c d 5
after 1000
assert_equal [r debug digest] [r -1 debug digest]
}

test {MASTER and SLAVE dataset should be identical after complex ops} {
createComplexDataset r 10000
after 500
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/type/list.tcl
Expand Up @@ -262,6 +262,14 @@ start_server {
r exec
} {foo bar {} {} {bar foo}}

test {BRPOPLPUSH timeout} {
set rd [redis_deferring_client]

$rd brpoplpush foo_list bar_list 1
after 2000
$rd read
} {}

foreach {pop} {BLPOP BRPOP} {
test "$pop: with single empty list argument" {
set rd [redis_deferring_client]
Expand Down

0 comments on commit be87e76

Please sign in to comment.