Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Generic locking mechanism. #41

Closed
wants to merge 5 commits into from

3 participants

@urkle

Based on the "BPOP" commands this adds a generic locking command (similar to advisory locks in mysql).

Basic usage is

C1: GRAB sesskey.AE123 0

C1: OK

C2: GRAB sesskey.AE123 0

C1: RELEASE sesskey.AE123

C1: OK

C2: OK

Also if a client disconnects all of it's locks are released.

A client MAY lock multiple keys.

Some things I don't like about how it's implemented right now.
I've kludged in some "reserving" of the Key in (doing a dbAdd of a 0 value) and checking to make sure it's a REDIS_STRING (so as to not conflict with BPOP). However this has a few issues.

the lock does nothing to prevent working with the actual key (and really shouldn't). So another client can delete and readd the key as a REDIS_LIST and screw things up.

Ideally I need to create a separate blocking system from the BPOP blocks, or adjust how they are stored so that BPOP will ignore GRAB blocks and GRAB will ignore BPOP locks.

Any thoughts on this are very welcome.

@antirez
Owner

Hello,

I think WATCH and SETNX are already two primitives where it's trivially possible to do this without need for further commands. And we are minimalist ;)

Cheers,
Salvatore

@urkle

How exactly do I use those two command to accomplish the same result? As it does not seem to do the same thing at all?

@urkle

I've redone the implementation so it doesn't conflict at all with BPOP, so a GRAB/RELESAE and BPOP can operate simultaneously on the same key..

urkle/redis@b579675

@mattsta

Composing locks out of primitives have been researched multiple times since this issue first came up.

Current best practices are:

@mattsta mattsta closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
3  src/Makefile
@@ -25,7 +25,7 @@ PREFIX= /usr/local
INSTALL_BIN= $(PREFIX)/bin
INSTALL= cp -p
-OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o vm.o pubsub.o multi.o debug.o sort.o intset.o syncio.o
+OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o vm.o pubsub.o multi.o debug.o sort.o intset.o syncio.o locking.o
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@ -54,6 +54,7 @@ aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
chprgname.o: chprgname.c
config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
+locking.o: locking.c redis.h
db.o: db.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
debug.o: debug.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
View
3  src/dict.h
@@ -144,6 +144,9 @@ void dictDisableResize(void);
int dictRehash(dict *d, int n);
int dictRehashMilliseconds(dict *d, int ms);
+/* Hash Functions */
+unsigned int dictIdentityHashFunction(unsigned int key);
+
/* Hash table types */
extern dictType dictTypeHeapStringCopyKey;
extern dictType dictTypeHeapStrings;
View
15 src/help.h
@@ -13,7 +13,8 @@ static char *commandGroups[] = {
"pubsub",
"transactions",
"connection",
- "server"
+ "server",
+ "lock"
};
struct commandHelp {
@@ -632,7 +633,17 @@ struct commandHelp {
"destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]",
"Add multiple sorted sets and store the resulting sorted set in a new key",
4,
- "1.3.10" }
+ "1.3.10" },
+ { "GRAB",
+ "key timeout",
+ "Grabs a lock on key and blocks until all other clients have released the lock or disconnected.",
+ 10,
+ "2.2.X" },
+ { "RELEASE",
+ "key",
+ "Releases a lock on key allowing other clients to continue",
+ 10,
+ "2.2.X" },
};
#endif
View
203 src/locking.c
@@ -0,0 +1,203 @@
+#include "redis.h"
+
+
+#include <assert.h>
+
+dictType lockDictType = {
+ dictIdentityHashFunction, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ NULL, /* key compare */
+ NULL, /* key destructor */
+ dictListDestructor /* val destructor */
+};
+
+/*-----------------------------------------------------------------------------
+ * Generic locking (grab and release) commands
+ *----------------------------------------------------------------------------*/
+
+int grabLockForKey(redisClient *c, robj *key)
+{
+ dictEntry *de;
+
+ de = dictFind(c->db->locked_keys,key);
+ if (de == NULL) {
+ int retval;
+ robj *kt;
+ list *l;
+
+ kt = dupStringObject(key);
+ /* Add the client structure to the locked_keys dict */
+ retval = dictAdd(c->db->locked_keys,key,c);
+ incrRefCount(key);
+ redisAssert(retval == DICT_OK);
+
+ if (c->lock.keys == NULL) {
+ c->lock.keys = dictCreate(&lockDictType,NULL);
+ }
+ de = dictFind(c->lock.keys,(void *)(long)c->db->id);
+ if (de == NULL) {
+ l = listCreate();
+ listSetMatchMethod(l,listMatchObjects);
+ retval = dictAdd(c->lock.keys,(void *)(long)c->db->id,l);
+ redisAssert(retval == DICT_OK);
+ } else {
+ l = dictGetEntryVal(de);
+ }
+ listAddNodeTail(l,kt);
+ return 1;
+ } else {
+ redisClient *locker = dictGetEntryVal(de);
+ if (c == locker) {
+ /* Silently allow locking an already locked key */
+ return 1;
+ }
+ return 0;
+ }
+}
+
+int releaseLockForKey(redisClient* c, robj* key) {
+ dictEntry *de;
+ listNode *ln;
+ list *l;
+
+ if (c->lock.keys == NULL) {
+ /* If no keys are locked, return */
+ return 0;
+ } else {
+ de = dictFind(c->db->locked_keys,key);
+ if (de == NULL)
+ return 0;
+
+ redisClient *locker = dictGetEntryVal(de);
+ if (locker == c) {
+ de = dictFind(c->lock.keys,(void *)(long)c->db->id);
+ if (de == NULL) {
+ /* we don't have a lock in this DB */
+ return 0;
+ }
+ l = dictGetEntryVal(de);
+ redisAssert(l != NULL);
+ /* We are the locker, so release the lock */
+ dictDelete(c->db->locked_keys,key);
+ ln = listSearchKey(l,key);
+ redisAssert(ln != NULL);
+ decrRefCount(listNodeValue(ln));
+ listDelNode(l,ln);
+ if (listLength(l) == 0)
+ dictDelete(c->lock.keys,(void *)(long)c->db->id);
+ if (dictSize(c->lock.keys) == 0) {
+ dictRelease(c->lock.keys);
+ c->lock.keys = NULL;
+ }
+ return 1;
+ } else {
+ /* This is not our lock */
+ return 0;
+ }
+ }
+}
+
+void handOffLock(redisClient *c, robj *key) {
+ struct dictEntry *de;
+ int retval;
+ redisClient *receiver;
+ list *clients;
+ listIter *iter;
+ listNode *ln;
+
+ de = dictFind(c->db->blocking_keys,key);
+ if (de != NULL) {
+
+ clients = dictGetEntryVal(de);
+
+ iter = listGetIterator(clients, AL_START_HEAD);
+ while ((ln = listNext(iter)) != NULL) {
+ receiver = listNodeValue(ln);
+ if (receiver->block.type != REDIS_BLOCK_LOCK)
+ continue;
+
+ /* Hand off the lock to the next client */
+ retval = grabLockForKey(receiver,key);
+ redisAssert(retval == 1);
+
+ unblockClientWaitingData(receiver, ln);
+
+ /* Tell the waiting client it is unblocked */
+ addReply(receiver, shared.ok);
+ break;
+ }
+ listReleaseIterator(iter);
+ }
+}
+void releaseClientLocks(redisClient *c) {
+ if (c->lock.keys) {
+ dictEntry *de;
+ dictIterator *iter;
+ int origid = c->db->id;
+
+ /* Iterate all DBs that have locks */
+ iter = dictGetIterator(c->lock.keys);
+ while ((de = dictNext(iter)) != NULL) {
+ listNode *ln;
+ list *l = dictGetEntryVal(de);
+ int dbnum = (long)dictGetEntryKey(de);
+
+ /* Switch the current DB */
+ selectDb(c,dbnum);
+ /* Iterate through all locks in THIS db and release them */
+ listIter *liter = listGetIterator(l, AL_START_HEAD);
+ while ((ln = listNext(liter)) != NULL) {
+ robj *key = listNodeValue(ln);
+ incrRefCount(key);
+ releaseLockForKey(c, key);
+ handOffLock(c, key);
+ decrRefCount(key);
+ }
+ listReleaseIterator(liter);
+ }
+ dictReleaseIterator(iter);
+ selectDb(c,origid);
+ /* The last lock release should clean this up */
+ redisAssert(c->lock.keys == NULL);
+ }
+}
+
+void grabCommand(redisClient *c) {
+ time_t timeout;
+ robj *key;
+
+ if (c->flags & REDIS_MULTI) {
+ addReplyError(c,"GRAB inside MULTI is not allowed");
+ return;
+ }
+
+ if (getTimeoutFromObjectOrReply(c, c->argv[2],&timeout) != REDIS_OK)
+ return;
+
+ key = c->argv[1];
+
+ if (grabLockForKey(c, key)==1) {
+ addReply(c,shared.ok);
+ } else {
+ blockForKeys(c, c->argv + 1, 1, timeout, NULL, REDIS_BLOCK_LOCK);
+ }
+}
+
+void releaseCommand(redisClient* c) {
+ robj *key;
+
+ if (c->flags & REDIS_MULTI) {
+ addReplyError(c,"RELEASE inside MULTI is not allowed");
+ return;
+ }
+
+ key = c->argv[1];
+
+ if (releaseLockForKey(c,key)==1) {
+ handOffLock(c, key);
+ addReply(c,shared.ok);
+ } else {
+ addReplyError(c,"RELEASE failed! Key not Locked by us");
+ }
+}
View
16 src/networking.c
@@ -41,10 +41,10 @@ redisClient *createClient(int fd) {
c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
- c->bpop.keys = NULL;
- c->bpop.count = 0;
- c->bpop.timeout = 0;
- c->bpop.target = NULL;
+ c->block.keys = NULL;
+ c->block.count = 0;
+ c->block.timeout = 0;
+ c->block.target = NULL;
c->io_keys = listCreate();
c->watched_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
@@ -437,8 +437,10 @@ void freeClient(redisClient *c) {
sdsfree(c->querybuf);
c->querybuf = NULL;
if (c->flags & REDIS_BLOCKED)
- unblockClientWaitingData(c);
+ unblockClientWaitingData(c, NULL);
+ /* Release Client Locks */
+ releaseClientLocks(c);
/* UNWATCH all the keys */
unwatchAllKeys(c);
listRelease(c->watched_keys);
@@ -620,9 +622,9 @@ void closeTimedoutClients(void) {
redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
- if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
+ if (c->block.timeout != 0 && c->block.timeout < now) {
addReply(c,shared.nullmultibulk);
- unblockClientWaitingData(c);
+ unblockClientWaitingData(c, NULL);
}
}
}
View
22 src/redis.c
@@ -187,7 +187,9 @@ struct redisCommand readonlyCommandTable[] = {
{"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0},
{"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0},
{"watch",watchCommand,-2,0,NULL,0,0,0},
- {"unwatch",unwatchCommand,1,0,NULL,0,0,0}
+ {"unwatch",unwatchCommand,1,0,NULL,0,0,0},
+ {"grab",grabCommand,3,0,NULL,1,1,1},
+ {"release",releaseCommand,2,0,NULL,1,1,1},
};
/*============================ Utility functions ============================ */
@@ -417,6 +419,17 @@ dictType keylistDictType = {
dictListDestructor /* val destructor */
};
+/* key has table type has unencoded redis objects as keys and
+ * generic pointers as values. It's used for locking keys on a client */
+dictType keyDictType = {
+ dictObjHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictObjKeyCompare, /* key compare */
+ dictRedisObjectDestructor, /* key destructor */
+ NULL /* val destructor */
+};
+
int htNeedsResize(dict *dict) {
long long size, used;
@@ -580,7 +593,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
/* Close connections of timedout clients */
- if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
+ if ((server.maxidletime && !(loops % 100)) || server.blocked_clients)
closeTimedoutClients();
/* Check if a background saving or AOF rewrite in progress terminated */
@@ -784,7 +797,7 @@ void initServerConfig() {
server.rdbcompression = 1;
server.activerehashing = 1;
server.maxclients = 0;
- server.bpop_blocked_clients = 0;
+ server.blocked_clients = 0;
server.maxmemory = 0;
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
server.maxmemory_samples = 3;
@@ -876,6 +889,7 @@ void initServer() {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].locked_keys = dictCreate(&keyDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
if (server.vm_enabled)
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
@@ -1212,7 +1226,7 @@ sds genRedisInfoString(void) {
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
lol, bib,
- server.bpop_blocked_clients,
+ server.blocked_clients,
zmalloc_used_memory(),
hmem,
zmalloc_get_rss(),
View
28 src/redis.h
@@ -144,6 +144,10 @@
#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in
server.unblocked_clients */
+/* Client Blocking state type */
+#define REDIS_BLOCK_BPOP 0 /* a BPOP Block */
+#define REDIS_BLOCK_LOCK 1 /* a LOCK Block */
+
/* Client request types */
#define REDIS_REQ_INLINE 1
#define REDIS_REQ_MULTIBULK 2
@@ -275,6 +279,7 @@ typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
+ dict *locked_keys; /* Keys that clients have a lock on (GRAB/RELEASE) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
@@ -296,12 +301,17 @@ typedef struct blockingState {
robj **keys; /* The key we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
int count; /* Number of blocking keys */
+ int type; /* Type of block */
time_t timeout; /* Blocking operation timeout. If UNIX current time
* is >= timeout then the operation timed out. */
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
} blockingState;
+typedef struct lockState {
+ dict *keys; /* The list of keys we have locked for each DB. Otherwise NULL*/
+} lockState;
+
/* With multiplexing we need to take per-clinet state.
* Clients are taken in a liked list. */
typedef struct redisClient {
@@ -325,7 +335,8 @@ typedef struct redisClient {
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
- blockingState bpop; /* blocking state */
+ blockingState block; /* blocking state */
+ lockState lock; /* lock state */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
@@ -435,7 +446,7 @@ struct redisServer {
int maxmemory_policy;
int maxmemory_samples;
/* Blocked clients */
- unsigned int bpop_blocked_clients;
+ unsigned int blocked_clients;
unsigned int vm_blocked_clients;
list *unblocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we
@@ -660,6 +671,7 @@ void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
+int listMatchObjects(void *a, void *b);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
@@ -684,10 +696,18 @@ void listTypeInsert(listTypeEntry *entry, robj *value, int where);
int listTypeEqual(listTypeEntry *entry, robj *o);
void listTypeDelete(listTypeEntry *entry);
void listTypeConvert(robj *subject, int enc);
-void unblockClientWaitingData(redisClient *c);
+void unblockClientWaitingData(redisClient *c, listNode *ln);
int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
void popGenericCommand(redisClient *c, int where);
+/* locking.c - Locking API */
+void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target, int type);
+int grabLockForKey(redisClient *c, robj *key);
+int releaseLockForKey(redisClient *c, robj *key);
+void releaseClientLocks(redisClient *c);
+int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout);
+void dictListDestructor(void *privdata, void *val);
+
/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(redisClient *c);
void initClientMultiState(redisClient *c);
@@ -1008,6 +1028,8 @@ void punsubscribeCommand(redisClient *c);
void publishCommand(redisClient *c);
void watchCommand(redisClient *c);
void unwatchCommand(redisClient *c);
+void grabCommand(redisClient *c);
+void releaseCommand(redisClient *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
View
73 src/t_list.c
@@ -710,15 +710,16 @@ void rpoplpushCommand(redisClient *c) {
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
-void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
+void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target, int type) {
dictEntry *de;
list *l;
int j;
- c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
- c->bpop.count = numkeys;
- c->bpop.timeout = timeout;
- c->bpop.target = target;
+ c->block.keys = zmalloc(sizeof(robj*)*numkeys);
+ c->block.count = numkeys;
+ c->block.timeout = timeout;
+ c->block.target = target;
+ c->block.type = type;
if (target != NULL) {
incrRefCount(target);
@@ -726,7 +727,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
for (j = 0; j < numkeys; j++) {
/* Add the key in the client structure, to map clients -> keys */
- c->bpop.keys[j] = keys[j];
+ c->block.keys[j] = keys[j];
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
@@ -746,36 +747,40 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
}
/* Mark the client as a blocked client */
c->flags |= REDIS_BLOCKED;
- server.bpop_blocked_clients++;
+ server.blocked_clients++;
}
/* Unblock a client that's waiting in a blocking operation such as BLPOP */
-void unblockClientWaitingData(redisClient *c) {
+void unblockClientWaitingData(redisClient *c, listNode *ln) {
dictEntry *de;
list *l;
int j;
- redisAssert(c->bpop.keys != NULL);
+ redisAssert(c->block.keys != NULL);
/* The client may wait for multiple keys, so unblock it for every key. */
- for (j = 0; j < c->bpop.count; j++) {
+ for (j = 0; j < c->block.count; j++) {
/* Remove this client from the list of clients waiting for this key. */
- de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
+ de = dictFind(c->db->blocking_keys,c->block.keys[j]);
redisAssert(de != NULL);
l = dictGetEntryVal(de);
- listDelNode(l,listSearchKey(l,c));
+ if (ln) {
+ listDelNode(l,ln);
+ } else {
+ listDelNode(l,listSearchKey(l,c));
+ }
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
- dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
- decrRefCount(c->bpop.keys[j]);
+ dictDelete(c->db->blocking_keys,c->block.keys[j]);
+ decrRefCount(c->block.keys[j]);
}
/* Cleanup the client structure */
- zfree(c->bpop.keys);
- c->bpop.keys = NULL;
- c->bpop.target = NULL;
+ zfree(c->block.keys);
+ c->block.keys = NULL;
+ c->block.target = NULL;
c->flags &= ~REDIS_BLOCKED;
c->flags |= REDIS_UNBLOCKED;
- server.bpop_blocked_clients--;
+ server.blocked_clients--;
listAddNodeTail(server.unblocked_clients,c);
}
@@ -792,15 +797,15 @@ void unblockClientWaitingData(redisClient *c) {
int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
struct dictEntry *de;
redisClient *receiver;
- int numclients;
list *clients;
+ listIter *iter;
listNode *ln;
+ int retval = 0;
robj *dstkey, *dstobj;
de = dictFind(c->db->blocking_keys,key);
if (de == NULL) return 0;
clients = dictGetEntryVal(de);
- numclients = listLength(clients);
/* Try to handle the push as long as there are clients waiting for a push.
* Note that "numclients" is used because the list of clients waiting for a
@@ -809,22 +814,22 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
* This loop will have more than 1 iteration when there is a BRPOPLPUSH
* that cannot push the target list because it does not contain a list. If
* this happens, it simply tries the next client waiting for a push. */
- while (numclients--) {
- ln = listFirst(clients);
- redisAssert(ln != NULL);
- receiver = ln->value;
- dstkey = receiver->bpop.target;
+ iter = listGetIterator(clients, AL_START_HEAD);
+ while ((ln = listNext(iter)) != NULL) {
+ receiver = listNodeValue(ln);
+ if (receiver->block.type != REDIS_BLOCK_BPOP)
+ continue;
+ dstkey = receiver->block.target;
- /* This should remove the first element of the "clients" list. */
- unblockClientWaitingData(receiver);
- redisAssert(ln != listFirst(clients));
+ unblockClientWaitingData(receiver, ln);
if (dstkey == NULL) {
/* BRPOP/BLPOP */
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
- return 1;
+ retval = 1;
+ break;
} else {
/* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
dstobj = lookupKeyWrite(receiver->db,dstkey);
@@ -833,12 +838,14 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
} else {
rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
decrRefCount(dstkey);
- return 1;
+ retval = 1;
+ break;
}
}
}
+ listReleaseIterator(iter);
- return 0;
+ return retval;
}
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
@@ -917,7 +924,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
}
/* If the list is empty or the key does not exists we must block */
- blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
+ blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL, REDIS_BLOCK_BPOP);
}
void blpopCommand(redisClient *c) {
@@ -944,7 +951,7 @@ void brpoplpushCommand(redisClient *c) {
addReply(c, shared.nullbulk);
} else {
/* The list is empty and the client blocks. */
- blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
+ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2], REDIS_BLOCK_BPOP);
}
} else {
if (key->type != REDIS_LIST) {
Something went wrong with that request. Please try again.