From a6d133afb39c1351799b8cd0e85ed1b865c90918 Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Sun, 1 Jan 2023 23:35:42 +0200 Subject: [PATCH] reprocess command when client is unblocked on keys (#11012) *TL;DR* --------------------------------------- Following the discussion over the issue [#7551](https://github.com/redis/redis/issues/7551) We decided to refactor the client blocking code to eliminate some of the code duplications and to rebuild the infrastructure better for future key blocking cases. *In this PR* --------------------------------------- 1. reprocess the command once a client becomes unblocked on key (instead of running custom code for the unblocked path that's different than the one that would have run if blocking wasn't needed) 2. eliminate some (now) irrelevant code for handling unblocking lists/zsets/streams etc... 3. modify some tests to intercept the error in cases of error on reprocess after unblock (see details in the notes section below) 4. replace '$' on the client argv with current stream id. Since once we reprocess the stream XREAD we need to read from the last msg and not wait for new msg in order to prevent endless block loop. 5. Added statistics to the info "Clients" section to report the: * `total_blocking_keys` - number of blocking keys * `total_blocking_keys_on_nokey` - number of blocking keys which have at least 1 client which would like to be unblocked on when the key is deleted. 6. Avoid expiring unblocked key during unblock. Previously we used to lookup the unblocked key which might have been expired during the lookup. Now we lookup the key using NOTOUCH and NOEXPIRE to avoid deleting it at this point, so propagating commands in blocked.c is no longer needed. 7. deprecated command flags. We decided to remove the CMD_CALL_STATS and CMD_CALL_SLOWLOG and make an explicit verification in the call() function in order to decide if stats update should take place. This should simplify the logic and also mitigate existing issues: for example module calls which are triggered as part of AOF loading might still report stats even though they are called during AOF loading. *Behavior changes* --------------------------------------------------- 1. As this implementation prevents writing dedicated code handling unblocked streams/lists/zsets, since we now re-process the command once the client is unblocked some errors will be reported differently. The old implementation used to issue ``UNBLOCKED the stream key no longer exists`` in the following cases: - The stream key has been deleted (ie. calling DEL) - The stream and group existed but the key type was changed by overriding it (ie. with set command) - The key not longer exists after we swapdb with a db which does not contains this key - After swapdb when the new db has this key but with different type. In the new implementation the reported errors will be the same as if the command was processed after effect: **NOGROUP** - in case key no longer exists, or **WRONGTYPE** in case the key was overridden with a different type. 2. Reprocessing the command means that some checks will be reevaluated once the client is unblocked. For example, ACL rules might change since the command originally was executed and will fail once the client is unblocked. Another example is OOM condition checks which might enable the command to run and block but fail the command reprocess once the client is unblocked. 3. One of the changes in this PR is that no command stats are being updated once the command is blocked (all stats will be updated once the client is unblocked). This implies that when we have many clients blocked, users will no longer be able to get that information from the command stats. However the information can still be gathered from the client list. **Client blocking** --------------------------------------------------- the blocking on key will still be triggered the same way as it is done today. in order to block the current client on list of keys, the call to blockForKeys will still need to be made which will perform the same as it is today: * add the client to the list of blocked clients on each key * keep the key with a matching list node (position in the global blocking clients list for that key) in the client private blocking key dict. * flag the client with CLIENT_BLOCKED * update blocking statistics * register the client on the timeout table **Key Unblock** --------------------------------------------------- Unblocking a specific key will be triggered (same as today) by calling signalKeyAsReady. the implementation in that part will stay the same as today - adding the key to the global readyList. The reason to maintain the readyList (as apposed to iterating over all clients blocked on the specific key) is in order to keep the signal operation as short as possible, since it is called during the command processing. The main change is that instead of going through a dedicated code path that operates the blocked command we will just call processPendingCommandsAndResetClient. **ClientUnblock (keys)** --------------------------------------------------- 1. Unblocking clients on keys will be triggered after command is processed and during the beforeSleep 8. the general schema is: 9. For each key *k* in the readyList: ``` For each client *c* which is blocked on *k*: in case either: 1. *k* exists AND the *k* type matches the current client blocking type OR 2. *k* exists and *c* is blocked on module command OR 3. *k* does not exists and *c* was blocked with the flag unblock_on_deleted_key do: 1. remove the client from the list of clients blocked on this key 2. remove the blocking list node from the client blocking key dict 3. remove the client from the timeout list 10. queue the client on the unblocked_clients list 11. *NEW*: call processCommandAndResetClient(c); ``` *NOTE:* for module blocked clients we will still call the moduleUnblockClientByHandle which will queue the client for processing in moduleUnblockedClients list. **Process Unblocked clients** --------------------------------------------------- The process of all unblocked clients is done in the beforeSleep and no change is planned in that part. The general schema will be: For each client *c* in server.unblocked_clients: * remove client from the server.unblocked_clients * set back the client readHandler * continue processing the pending command and input buffer. *Some notes regarding the new implementation* --------------------------------------------------- 1. Although it was proposed, it is currently difficult to remove the read handler from the client while it is blocked. The reason is that a blocked client should be unblocked when it is disconnected, or we might consume data into void. 2. While this PR mainly keep the current blocking logic as-is, there might be some future additions to the infrastructure that we would like to have: - allow non-preemptive blocking of client - sometimes we can think that a new kind of blocking can be expected to not be preempt. for example lets imagine we hold some keys on disk and when a command needs to process them it will block until the keys are uploaded. in this case we will want the client to not disconnect or be unblocked until the process is completed (remove the client read handler, prevent client timeout, disable unblock via debug command etc...). - allow generic blocking based on command declared keys - we might want to add a hook before command processing to check if any of the declared keys require the command to block. this way it would be easier to add new kinds of key-based blocking mechanisms. Co-authored-by: Oran Agra Signed-off-by: Ran Shidlansik --- src/blocked.c | 784 +++++++++++------------------ src/cluster.c | 12 +- src/db.c | 2 +- src/dict.h | 9 + src/module.c | 31 +- src/networking.c | 22 +- src/replication.c | 16 +- src/script.c | 2 +- src/script_lua.c | 2 +- src/server.c | 134 +++-- src/server.h | 85 ++-- src/t_list.c | 103 +--- src/t_stream.c | 32 +- src/t_zset.c | 3 +- src/timeout.c | 15 +- tests/integration/replication.tcl | 3 +- tests/modules/misc.c | 10 + tests/unit/introspection.tcl | 43 ++ tests/unit/moduleapi/propagate.tcl | 38 ++ tests/unit/slowlog.tcl | 22 + tests/unit/type/list.tcl | 138 ++++- tests/unit/type/stream-cgroups.tcl | 71 ++- 22 files changed, 805 insertions(+), 772 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index b935ab9e0eab..ff38f70b378a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -65,23 +65,21 @@ #include "latency.h" #include "monotonic.h" -void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted); -int getListPositionFromObjectOrReply(client *c, robj *arg, int *position); - -/* This structure represents the blocked key information that we store - * in the client structure. Each client blocked on keys, has a - * client->bpop.keys hash table. The keys of the hash table are Redis - * keys pointers to 'robj' structures. The value is this structure. - * The structure has two goals: firstly we store the list node that this - * client uses to be listed in the database "blocked clients for this key" - * list, so we can later unblock in O(1) without a list scan. - * Secondly for certain blocking types, we have additional info. Right now - * the only use for additional info we have is when clients are blocked - * on streams, as we have to remember the ID it blocked for. */ -typedef struct bkinfo { - listNode *listnode; /* List node for db->blocking_keys[key] list. */ - streamID stream_id; /* Stream ID if we blocked in a stream. */ -} bkinfo; +/* forward declarations */ +static void unblockClientWaitingData(client *c); +static void handleClientsBlockedOnKey(readyList *rl); +static void unblockClientOnKey(client *c, robj *key); +static void moduleUnblockClientOnKey(client *c, robj *key); +static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key); + +void initClientBlockingState(client *c) { + c->bstate.btype = BLOCKED_NONE; + c->bstate.timeout = 0; + c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType); + c->bstate.numreplicas = 0; + c->bstate.reploffset = 0; + c->bstate.unblock_on_nokey = 0; +} /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, @@ -93,24 +91,22 @@ void blockClient(client *c, int btype) { btype != BLOCKED_POSTPONE)); c->flags |= CLIENT_BLOCKED; - c->btype = btype; + c->bstate.btype = btype; server.blocked_clients++; server.blocked_clients_by_type[btype]++; addClientToTimeoutTable(c); - if (btype == BLOCKED_POSTPONE) { - listAddNodeTail(server.postponed_clients, c); - c->postponed_list_node = listLast(server.postponed_clients); - /* Mark this client to execute its command */ - c->flags |= CLIENT_PENDING_COMMAND; - } } -/* This function is called after a client has finished a blocking operation - * in order to update the total command duration, log the command into - * the Slow log if needed, and log the reply duration event if needed. */ +/* Usually when a client is unblocked due to being blocked while processing some command + * he will attempt to reprocess the command which will update the statistics. + * However in case the client was timed out or in case of module blocked client is being unblocked + * the command will not be reprocessed and we need to make stats update. + * This function will make updates to the commandstats, slowlog and monitors.*/ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){ const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; c->lastcmd->microseconds += total_cmd_duration; + c->lastcmd->calls++; + server.stat_numcommands++; if (had_errors) c->lastcmd->failed_calls++; if (server.latency_tracking_enabled) @@ -177,29 +173,27 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - if (c->btype == BLOCKED_LIST || - c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM) { + if (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->btype == BLOCKED_WAIT) { + } else if (c->bstate.btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); - } else if (c->btype == BLOCKED_MODULE) { + } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); - } else if (c->btype == BLOCKED_POSTPONE) { + } else if (c->bstate.btype == BLOCKED_POSTPONE) { listDelNode(server.postponed_clients,c->postponed_list_node); c->postponed_list_node = NULL; - } else if (c->btype == BLOCKED_SHUTDOWN) { + } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { /* No special cleanup. */ } else { serverPanic("Unknown btype in unblockClient()."); } - /* Reset the client for a new query since, for blocking commands - * we do not do it immediately after the command returns (when the - * client got blocked) in order to be still able to access the argument - * vector from module callbacks and updateStatsOnUnblock. */ - if (c->btype != BLOCKED_POSTPONE && c->btype != BLOCKED_SHUTDOWN) { + /* Reset the client for a new query, unless the client has pending command to process + * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ + if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { freeClientOriginalArgv(c); resetClient(c); } @@ -207,9 +201,10 @@ void unblockClient(client *c) { /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ server.blocked_clients--; - server.blocked_clients_by_type[c->btype]--; + server.blocked_clients_by_type[c->bstate.btype]--; c->flags &= ~CLIENT_BLOCKED; - c->btype = BLOCKED_NONE; + c->bstate.btype = BLOCKED_NONE; + c->bstate.unblock_on_nokey = 0; removeClientFromTimeoutTable(c); queueClientForReprocessing(c); } @@ -218,13 +213,14 @@ void unblockClient(client *c) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->btype == BLOCKED_LIST || - c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM) { + if (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM) { addReplyNullArray(c); - } else if (c->btype == BLOCKED_WAIT) { - addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); - } else if (c->btype == BLOCKED_MODULE) { + updateStatsOnUnblock(c, 0, 0, 0); + } else if (c->bstate.btype == BLOCKED_WAIT) { + addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset)); + } else if (c->bstate.btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); @@ -240,7 +236,7 @@ void replyToClientsBlockedOnShutdown(void) { listRewind(server.clients, &li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); - if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_SHUTDOWN) { + if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) { addReplyError(c, "Errors trying to SHUTDOWN. Check logs."); unblockClient(c); } @@ -267,335 +263,29 @@ void disconnectAllBlockedClients(void) { * command processing will start from scratch, and the command will * be either executed or rejected. (unlike LIST blocked clients for * which the command is already in progress in a way. */ - if (c->btype == BLOCKED_POSTPONE) + if (c->bstate.btype == BLOCKED_POSTPONE) continue; - addReplyError(c, + unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " "instance state changed (master -> replica?)"); - unblockClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } } -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked on a list key, and there may be new - * data to fetch (the key is ready). */ -void serveClientsBlockedOnListKey(robj *o, readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_LIST, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_LIST]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_LIST) continue; - - int deleted = 0; - robj *dstkey = receiver->bpop.target; - int wherefrom = receiver->bpop.blockpos.wherefrom; - int whereto = receiver->bpop.blockpos.whereto; - - /* Protect receiver->bpop.target, that will be - * freed by the next unblockClient() - * call. */ - if (dstkey) incrRefCount(dstkey); - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - serveClientBlockedOnList(receiver, o, - rl->key, dstkey, rl->db, - wherefrom, whereto, - &deleted); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - - if (dstkey) decrRefCount(dstkey); - - /* The list is empty and has been deleted. */ - if (deleted) break; - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked on a sorted set key, and there may be new - * data to fetch (the key is ready). */ -void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_ZSET, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_ZSET]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_ZSET) continue; - - int deleted = 0; - long llen = zsetLength(o); - long count = receiver->bpop.count; - int where = receiver->bpop.blockpos.wherefrom; - int use_nested_array = (receiver->lastcmd && - receiver->lastcmd->proc == bzmpopCommand) - ? 1 : 0; - int reply_nil_when_empty = use_nested_array; - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted); - - /* Replicate the command. */ - int argc = 2; - robj *argv[3]; - argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax; - argv[1] = rl->key; - incrRefCount(rl->key); - if (count != -1) { - /* Replicate it as command with COUNT. */ - robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count); - argv[2] = count_obj; - argc++; - } - alsoPropagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(argv[1]); - if (count != -1) decrRefCount(argv[2]); - - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - - /* The zset is empty and has been deleted. */ - if (deleted) break; - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked on a stream key, and there may be new - * data to fetch (the key is ready). - * This function also handles the case where there may be clients blocked, - * via XREADGROUP, on an existing stream which was deleted. - * We need to unblock the clients in that case. - * The idea is that a client that is blocked via XREADGROUP is different from - * any other blocking type in the sense that it depends on the existence of both - * the key and the group. Even if the key is deleted and then revived with XADD - * it won't help any clients blocked on XREADGROUP because the group no longer - * exist, so they would fail with -NOGROUP anyway. - * The conclusion is that it's better to unblock these client (with error) upon - * the deletion of the key, rather than waiting for the first XADD.*/ -void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_STREAM, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; - - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - /* This function may be called with o=NULL (in order to unblock - * XREADGROUP clients whose key was deleted) */ - stream *s = o? o->ptr : NULL; - - /* We need to provide the new data arrived on the stream - * to all the clients that are waiting for an offset smaller - * than the current top item. */ - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_STREAM) continue; - bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); - streamID *gt = &bki->stream_id; - - if (!receiver->bpop.xread_group && (!o || o->type != OBJ_STREAM)) { - /* If it's a blocking XREAD and the stream was either deleted - * or replaced with another key, we don't do anything (it's ok - * the the client blocks on a non-existing key). */ - continue; - } - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - - /* If we blocked in the context of a consumer - * group, we need to resolve the group and update the - * last ID the client is blocked for: this is needed - * because serving other clients in the same consumer - * group will alter the "last ID" of the consumer - * group, and clients blocked in a consumer group are - * always blocked for the ">" ID: we need to deliver - * only new messages and avoid unblocking the client - * otherwise. */ - streamCG *group = NULL; - if (receiver->bpop.xread_group) { - /* If it's a blocking XREADGROUP and the stream was either deleted - * or replaced with another key, we unblock the client */ - if (!o || o->type != OBJ_STREAM) { - addReplyError(receiver, "-UNBLOCKED the stream key no longer exists"); - goto unblock_receiver; - } - group = streamLookupCG(s, - receiver->bpop.xread_group->ptr); - /* If the group was not found, send an error - * to the consumer. */ - if (!group) { - addReplyError(receiver, - "-NOGROUP the consumer group this client " - "was blocked on no longer exists"); - goto unblock_receiver; - } else { - *gt = group->last_id; - } - } - - if (streamCompareID(&s->last_id, gt) > 0) { - streamID start = *gt; - streamIncrID(&start); - - /* Lookup the consumer for the group, if any. */ - streamConsumer *consumer = NULL; - int noack = 0; - - if (group) { - noack = receiver->bpop.xread_group_noack; - sds name = receiver->bpop.xread_consumer->ptr; - consumer = streamLookupConsumer(group,name); - if (consumer == NULL) { - consumer = streamCreateConsumer(group,name,rl->key, - rl->db->id,SCC_DEFAULT); - if (noack) { - streamPropagateConsumerCreation(receiver,rl->key, - receiver->bpop.xread_group, - consumer->name); - } - } - } - - /* Emit the two elements sub-array consisting of - * the name of the stream and the data we - * extracted from it. Wrapped in a single-item - * array, since we have just one key. */ - if (receiver->resp == 2) { - addReplyArrayLen(receiver,1); - addReplyArrayLen(receiver,2); - } else { - addReplyMapLen(receiver,1); - } - addReplyBulk(receiver,rl->key); - - streamPropInfo pi = { - rl->key, - receiver->bpop.xread_group - }; - streamReplyWithRange(receiver,s,&start,NULL, - receiver->bpop.xread_count, - 0, group, consumer, noack, &pi); - /* Note that after we unblock the client, 'gt' - * and other receiver->bpop stuff are no longer - * valid, so we must do the setup above before - * the unblockClient call. */ - -unblock_receiver: - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - } - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * in order to check if we can serve clients blocked by modules using - * RM_BlockClientOnKeys(), when the corresponding key was signaled as ready: - * our goal here is to call the RedisModuleBlockedClient reply() callback to - * see if the key is really able to serve the client, and in that case, - * unblock it. */ -void serveClientsBlockedOnKeyByModule(readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_MODULE, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_MODULE) continue; - - /* Note that if *this* client cannot be served by this key, - * it does not mean that another client that is next into the - * list cannot be served as well: they may be blocked by - * different modules with different triggers to consider if a key - * is ready or not. This means we can't exit the loop but need - * to continue after the first failure. */ - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - if (moduleTryServeClientBlockedOnKey(receiver, rl->key)) { - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - moduleUnblockClient(receiver); - } - /* We need to call afterCommand even if the client was not unblocked - * in order to propagate any changes that could have been done inside - * moduleTryServeClientBlockedOnKey */ - afterCommand(receiver); - server.current_client = old_client; - } - } -} - /* This function should be called by Redis every time a single command, * a MULTI/EXEC block, or a Lua script, terminated its execution after - * being called by a client. It handles serving clients blocked in - * lists, streams, and sorted sets, via a blocking commands. + * being called by a client. It handles serving clients blocked in all scenarios + * where a specific key access requires to block until that key is available. * - * All the keys with at least one client blocked that received at least - * one new element via some write operation are accumulated into - * the server.ready_keys list. This function will run the list and will - * serve clients accordingly. Note that the function will iterate again and - * again as a result of serving BLMOVE we can have new blocking clients - * to serve because of the PUSH side of BLMOVE. + * All the keys with at least one client blocked that are signaled as ready + * are accumulated into the server.ready_keys list. This function will run + * the list and will serve clients accordingly. + * Note that the function will iterate again and again (for example as a result of serving BLMOVE + * we can have new blocking clients to serve because of the PUSH side of BLMOVE.) * - * This function is normally "fair", that is, it will server clients + * This function is normally "fair", that is, it will serve clients * using a FIFO behavior. However this fairness is violated in certain * edge cases, that is, when we have clients blocked at the same time * in a sorted set and in a list, for the same key (a very odd thing to @@ -605,6 +295,14 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * be used only for a single type, like virtually any Redis application will * do, the function is already fair. */ void handleClientsBlockedOnKeys(void) { + + /* In case we are already in the process of unblocking clients we should + * not make a recursive call, in order to prevent breaking fairness. */ + static int in_handling_blocked_clients = 0; + if (in_handling_blocked_clients) + return; + in_handling_blocked_clients = 1; + /* This function is called only when also_propagate is in its basic state * (i.e. not from call(), module context, etc.) */ serverAssert(server.also_propagate.numops == 0); @@ -627,39 +325,7 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); - updateCachedTime(0); - - /* Serve clients blocked on the key. */ - robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS); - if (!o) { - /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to - * take care of the propagation here, because afterCommand wasn't called */ - postExecutionUnitOperations(); - } else { - if (o->type == OBJ_LIST) - serveClientsBlockedOnListKey(o,rl); - else if (o->type == OBJ_ZSET) - serveClientsBlockedOnSortedSetKey(o,rl); - } - /* We need to try to serve stream clients even if the key no longer exists because - * XREADGROUP clients need to be unblocked in case the key is missing, either deleted - * or replaced by SET or something like {MULTI, DEL key, SADD key e, EXEC}. - * In this case we need to unblock all these clients. */ - serveClientsBlockedOnStreamKey(o,rl); - /* We want to serve clients blocked on module keys regardless of the object type, or - * whether the object exists or not: we don't know what the module is trying to - * accomplish right now. - * Please note that this function must be called only after handling non-module - * clients, since moduleTryServeClientBlockedOnKey may delete the key, causing `o` - * to be stale. - * The scenario is that we have one client blocked on BLPOP while another module - * client is blocked by MODULE.SAME-AS-BLPOP on the same key. - * Of course we can call lookupKeyReadWithFlags again, but: - * 1) It takes CPU - * 2) It makes more sense to give priority to "native" blocking clients rather - * than module blocking clients - * */ - serveClientsBlockedOnKeyByModule(rl); + handleClientsBlockedOnKey(rl); /* Free this item. */ decrRefCount(rl->key); @@ -668,131 +334,80 @@ void handleClientsBlockedOnKeys(void) { } listRelease(l); /* We have the new list on place at this point. */ } + in_handling_blocked_clients = 0; } -/* This is how the current blocking lists/sorted sets/streams work, we use - * BLPOP as example, but the concept is the same for other list ops, sorted - * sets and XREAD. - * - If the user calls BLPOP and the key exists and contains a non empty list - * then LPOP is called instead. So BLPOP is semantically the same as LPOP - * if blocking is not required. - * - If instead BLPOP is called and the key does not exists or the list is - * empty we need to block. In order to do so we remove the notification for - * new data to read in the client socket (so that we'll not serve new - * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blocking_keys) mapping keys to a list of clients - * blocking for this keys. - * - If a PUSH operation against a key with blocked clients waiting is - * performed, we mark this key as "ready", and after the current command, - * MULTI/EXEC block, or script, is executed, we serve all the clients waiting - * for this list, from the one that blocked first, to the last, accordingly - * to the number of elements we have in the ready list. - */ - -/* Set a client in blocking mode for the specified key (list, zset or stream), - * with the specified timeout. The 'type' argument is BLOCKED_LIST, - * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are +/* Set a client in blocking mode for the specified key, with the specified timeout. + * The 'type' argument is BLOCKED_LIST,BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are * waiting for an empty key in order to awake the client. The client is blocked - * for all the 'numkeys' keys as in the 'keys' argument. When we block for - * stream keys, we also provide an array of streamID structures: clients will - * be unblocked only when items with an ID greater or equal to the specified - * one is appended to the stream. - * - * 'count' for those commands that support the optional count argument. - * Otherwise the value is 0. */ -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey) { + * for all the 'numkeys' keys as in the 'keys' argument. + * The client will unblocked as soon as one of the keys in 'keys' value was updated. + * the parameter unblock_on_nokey can be used to force client to be unblocked even in the case the key + * is updated to become unavailable, either by type change (override), deletion or swapdb */ +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey) { + dictEntry *db_blocked_entry, *db_blocked_existing_entry, *client_blocked_entry; list *l; int j; - c->bpop.count = count; - c->bpop.timeout = timeout; - c->bpop.target = target; - - if (blockpos != NULL) c->bpop.blockpos = *blockpos; - - if (target != NULL) incrRefCount(target); - + c->bstate.timeout = timeout; for (j = 0; j < numkeys; j++) { - /* Allocate our bkinfo structure, associated to each key the client - * is blocked for. */ - bkinfo *bki = zmalloc(sizeof(*bki)); - if (btype == BLOCKED_STREAM) - bki->stream_id = ids[j]; - /* If the key already exists in the dictionary ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) { - zfree(bki); + if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) { continue; } incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - dictEntry *de, *existing; - de = dictAddRaw(c->db->blocking_keys, keys[j], &existing); - if (de) { - incrRefCount(keys[j]); - /* For every key we take a list of clients blocked for it */ + db_blocked_entry = dictAddRaw(c->db->blocking_keys,keys[j], &db_blocked_existing_entry); + + /* In case key[j] did not have blocking clients yet, we need to create a new list */ + if (db_blocked_entry != NULL) { l = listCreate(); - dictSetVal(c->db->blocking_keys, de, l); + dictSetVal(c->db->blocking_keys, db_blocked_entry, l); + incrRefCount(keys[j]); } else { - l = dictGetVal(existing); + l = dictGetVal(db_blocked_existing_entry); } listAddNodeTail(l,c); - bki->listnode = listLast(l); + dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l)); + /* We need to add the key to blocking_keys_unblock_on_nokey, if the client * wants to be awakened if key is deleted (like XREADGROUP) */ if (unblock_on_nokey) { - de = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], NULL); - if (de) { + db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry); + if (db_blocked_entry) { incrRefCount(keys[j]); + dictSetUnsignedIntegerVal(db_blocked_entry, 1); + } else { + dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1); } } } + c->bstate.unblock_on_nokey = unblock_on_nokey; + c->flags |= CLIENT_PENDING_COMMAND; blockClient(c,btype); } -/* Unblock a client that's waiting in a blocking operation such as BLPOP. - * You should never call this function directly, but unblockClient() instead. */ -void unblockClientWaitingData(client *c) { +/* Helper function to unblock a client that's waiting in a blocking operation such as BLPOP. + * Internal function for unblockClient() */ +static void unblockClientWaitingData(client *c) { dictEntry *de; dictIterator *di; - list *l; - serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0); - di = dictGetIterator(c->bpop.keys); + if (dictSize(c->bstate.keys) == 0) + return; + + di = dictGetIterator(c->bstate.keys); /* The client may wait for multiple keys, so unblock it for every key. */ while((de = dictNext(di)) != NULL) { - robj *key = dictGetKey(de); - bkinfo *bki = dictGetVal(de); - - /* Remove this client from the list of clients waiting for this key. */ - l = dictFetchValue(c->db->blocking_keys,key); - serverAssertWithInfo(c,key,l != NULL); - listDelNode(l,bki->listnode); - /* If the list is empty we need to remove it to avoid wasting memory */ - if (listLength(l) == 0) { - dictDelete(c->db->blocking_keys,key); - dictDelete(c->db->blocking_keys_unblock_on_nokey,key); - } + releaseBlockedEntry(c, de, 0); } dictReleaseIterator(di); - - /* Cleanup the client structure */ - dictEmpty(c->bpop.keys,NULL); - if (c->bpop.target) { - decrRefCount(c->bpop.target); - c->bpop.target = NULL; - } - if (c->bpop.xread_group) { - decrRefCount(c->bpop.xread_group); - decrRefCount(c->bpop.xread_consumer); - c->bpop.xread_group = NULL; - c->bpop.xread_consumer = NULL; - } + dictEmpty(c->bstate.keys, NULL); } -static int getBlockedTypeByType(int type) { +static blocking_type getBlockedTypeByType(int type) { switch (type) { case OBJ_LIST: return BLOCKED_LIST; case OBJ_ZSET: return BLOCKED_ZSET; @@ -858,6 +473,52 @@ static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) listAddNodeTail(server.ready_keys,rl); } +/* Helper function to wrap the logic of removing a client blocked key entry + * In this case we would like to do the following: + * 1. unlink the client from the global DB locked client list + * 2. remove the entry from the global db blocking list in case the list is empty + * 3. in case the global list is empty, also remove the key from the global dict of keys + * which should trigger unblock on key deletion + * 4. remove key from the client blocking keys list - NOTE, since client can be blocked on lots of keys, + * but unblocked when only one of them is triggered, we would like to avoid deleting each key separately + * and instead clear the dictionary in one-shot. this is why the remove_key argument is provided + * to support this logic in unblockClientWaitingData + */ +static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) { + list *l; + listNode *pos; + void *key; + dictEntry *unblock_on_nokey_entry; + + key = dictGetKey(de); + pos = dictGetVal(de); + /* Remove this client from the list of clients waiting for this key. */ + l = dictFetchValue(c->db->blocking_keys, key); + serverAssertWithInfo(c,key,l != NULL); + listUnlinkNode(l,pos); + /* If the list is empty we need to remove it to avoid wasting memory + * We will also remove the key (if exists) from the blocking_keys_unblock_on_nokey dict. + * However, in case the list is not empty, we will have to still perform reference accounting + * on the blocking_keys_unblock_on_nokey and delete the entry in case of zero reference. + * Why? because it is possible that some more clients are blocked on the same key but without + * require to be triggered on key deletion, we do not want these to be later triggered by the + * signalDeletedKeyAsReady. */ + if (listLength(l) == 0) { + dictDelete(c->db->blocking_keys, key); + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } else if (c->bstate.unblock_on_nokey) { + unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey,key); + /* it is not possible to have a client blocked on nokey with no matching entry */ + serverAssertWithInfo(c,key,unblock_on_nokey_entry != NULL); + if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) { + /* in case the count is zero, we can delete the entry */ + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } + } + if (remove_key) + dictDelete(c->bstate.keys, key); +} + void signalKeyAsReady(redisDb *db, robj *key, int type) { signalKeyAsReadyLogic(db, key, type, 0); } @@ -865,3 +526,156 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) { void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) { signalKeyAsReadyLogic(db, key, type, 1); } + +/* Helper function for handleClientsBlockedOnKeys(). This function is called + * whenever a key is ready. we iterate over all the clients blocked on this key + * and try to re-execute the command (in case the key is still available). */ +static void handleClientsBlockedOnKey(readyList *rl) { + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + + if (de) { + list *clients = dictGetVal(de); + listNode *ln; + listIter li; + listRewind(clients,&li); + + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOKKUP_NOEFFECTS); + /* 1. In case new key was added/touched we need to verify it satisfy the + * blocked type, since we might process the wrong key type. + * 2. We want to serve clients blocked on module keys + * regardless of the object type: we don't know what the + * module is trying to accomplish right now. + * 3. In case of XREADGROUP call we will want to unblock on any change in object type + * or in case the key was deleted, since the group is no longer valid. */ + if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) || + (o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) || + (receiver->bstate.unblock_on_nokey)) + { + if (receiver->bstate.btype != BLOCKED_MODULE) + unblockClientOnKey(receiver, rl->key); + else + moduleUnblockClientOnKey(receiver, rl->key); + } + } + } +} + +/* block a client due to wait command */ +void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { + c->bstate.timeout = timeout; + c->bstate.reploffset = offset; + c->bstate.numreplicas = numreplicas; + listAddNodeHead(server.clients_waiting_acks,c); + blockClient(c,BLOCKED_WAIT); +} + +/* Postpone client from executing a command. For example the server might be busy + * requesting to avoid processing clients commands which will be processed later + * when the it is ready to accept them. */ +void blockPostponeClient(client *c) { + c->bstate.timeout = 0; + blockClient(c,BLOCKED_POSTPONE); + listAddNodeTail(server.postponed_clients, c); + c->postponed_list_node = listLast(server.postponed_clients); + /* Mark this client to execute its command */ + c->flags |= CLIENT_PENDING_COMMAND; +} + +/* Block client due to shutdown command */ +void blockClientShutdown(client *c) { + blockClient(c, BLOCKED_SHUTDOWN); + /* Mark this client to execute its command */ +} + +/* Unblock a client once a specific key became available for it. + * This function will remove the client from the list of clients blocked on this key + * and also remove the key from the dictionary of keys this client is blocked on. + * in case the client has a command pending it will process it immediately. */ +static void unblockClientOnKey(client *c, robj *key) { + dictEntry *de; + + de = dictFind(c->bstate.keys, key); + releaseBlockedEntry(c, de, 1); + + /* Only in case of blocking API calls, we might be blocked on several keys. + however we should force unblock the entire blocking keys */ + serverAssert(c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET); + + unblockClient(c); + /* In case this client was blocked on keys during command + * we need to re process the command again */ + if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags &= ~CLIENT_PENDING_COMMAND; + processCommandAndResetClient(c); + } +} + +/* Unblock a client blocked on the specific key from module context. + * This function will try to serve the module call, and in case it succeeds, + * it will add the client to the list of module unblocked clients which will + * be processed in moduleHandleBlockedClients. */ +static void moduleUnblockClientOnKey(client *c, robj *key) { + long long prev_error_replies = server.stat_total_error_replies; + client *old_client = server.current_client; + server.current_client = c; + monotime replyTimer; + elapsedStart(&replyTimer); + + if (moduleTryServeClientBlockedOnKey(c, key)) { + updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + moduleUnblockClient(c); + } + /* We need to call afterCommand even if the client was not unblocked + * in order to propagate any changes that could have been done inside + * moduleTryServeClientBlockedOnKey */ + afterCommand(c); + server.current_client = old_client; +} + +/* Unblock a client which is currently Blocked on and provided a timeout. + * The implementation will first reply to the blocked client with null response + * or, in case of module blocked client the timeout callback will be used. + * In this case since we might have a command pending + * we want to remove the pending flag to indicate we already responded to the + * command with timeout reply. */ +void unblockClientOnTimeout(client *c) { + replyToBlockedClientTimedOut(c); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + unblockClient(c); +} + +/* Unblock a client which is currently Blocked with error. + * If err_str is provided it will be used to reply to the blocked client */ +void unblockClientOnError(client *c, const char *err_str) { + if (err_str) + addReplyError(c, err_str); + updateStatsOnUnblock(c, 0, 0, 1); + if (c->flags & CLIENT_PENDING_COMMAND) + c->flags &= ~CLIENT_PENDING_COMMAND; + unblockClient(c); +} + +/* sets blocking_keys to the total number of keys which has at least one client blocked on them + * sets blocking_keys_on_nokey to the total number of keys which has at least one client + * blocked on them to be written or deleted */ +void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey) { + unsigned long bkeys=0, bkeys_on_nokey=0; + for (int j = 0; j < server.dbnum; j++) { + bkeys += dictSize(server.db[j].blocking_keys); + bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey); + } + if (blocking_keys) + *blocking_keys = bkeys; + if (bloking_keys_on_nokey) + *bloking_keys_on_nokey = bkeys_on_nokey; +} diff --git a/src/cluster.c b/src/cluster.c index 7414f830f0c1..10be81930262 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -7215,10 +7215,10 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co * returns 1. Otherwise 0 is returned and no operation is performed. */ int clusterRedirectBlockedClientIfNeeded(client *c) { if (c->flags & CLIENT_BLOCKED && - (c->btype == BLOCKED_LIST || - c->btype == BLOCKED_ZSET || - c->btype == BLOCKED_STREAM || - c->btype == BLOCKED_MODULE)) + (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_MODULE)) { dictEntry *de; dictIterator *di; @@ -7234,11 +7234,11 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { /* If the client is blocked on module, but not on a specific key, * don't unblock it (except for the CLUSTER_FAIL case above). */ - if (c->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) + if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) return 0; /* All keys must belong to the same slot, so check first key only. */ - di = dictGetIterator(c->bpop.keys); + di = dictGetIterator(c->bstate.keys); if ((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); diff --git a/src/db.c b/src/db.c index e82402125fa8..aad822630d34 100644 --- a/src/db.c +++ b/src/db.c @@ -1147,7 +1147,7 @@ void shutdownCommand(client *c) { return; } - blockClient(c, BLOCKED_SHUTDOWN); + blockClientShutdown(c); if (prepareForShutdown(flags) == C_OK) exit(0); /* If we're here, then shutdown is ongoing (the client is still blocked) or * failed (the client has received an error). */ diff --git a/src/dict.h b/src/dict.h index c818d6d4d664..16f576ffdd76 100644 --- a/src/dict.h +++ b/src/dict.h @@ -131,6 +131,15 @@ typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref); #define dictSetDoubleVal(entry, _val_) \ do { (entry)->v.d = _val_; } while(0) +#define dictIncrSignedIntegerVal(entry, _val_) \ + ((entry)->v.s64 += _val_) + +#define dictIncrUnsignedIntegerVal(entry, _val_) \ + ((entry)->v.u64 += _val_) + +#define dictIncrDoubleVal(entry, _val_) \ + ((entry)->v.d += _val_) + #define dictFreeKey(d, entry) \ if ((d)->type->keyDestructor) \ (d)->type->keyDestructor((d), (entry)->key) diff --git a/src/module.c b/src/module.c index 7fbc1d52b611..034105d3e98c 100644 --- a/src/module.c +++ b/src/module.c @@ -6158,7 +6158,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch server.replication_allowed = replicate && server.replication_allowed; /* Run the command */ - int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_FROM_MODULE; + int call_flags = CMD_CALL_FROM_MODULE; if (replicate) { if (!(flags & REDISMODULE_ARGV_NO_AOF)) call_flags |= CMD_CALL_PROPAGATE_AOF; @@ -7282,7 +7282,7 @@ void RM_LatencyAddSample(const char *event, mstime_t latency) { * The structure RedisModuleBlockedClient will be always deallocated when * running the list of clients blocked by a module that need to be unblocked. */ void unblockClientFromModule(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; /* Call the disconnection callback if any. Note that * bc->disconnect_callback is set to NULL if the client gets disconnected @@ -7346,8 +7346,8 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF int islua = scriptIsRunning(); int ismulti = server.in_exec; - c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + c->bstate.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; ctx->module->blocked_clients++; /* We need to handle the invalid operation of calling modules blocking @@ -7371,16 +7371,16 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->unblocked = 0; bc->background_timer = 0; bc->background_duration = 0; - c->bpop.timeout = timeout; + c->bstate.timeout = timeout; if (islua || ismulti) { - c->bpop.module_blocked_handle = NULL; + c->bstate.module_blocked_handle = NULL; addReplyError(c, islua ? "Blocking module command called from Lua script" : "Blocking module command called from transaction"); } else { if (keys) { - blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED); + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED); } else { blockClient(c,BLOCKED_MODULE); } @@ -7397,7 +7397,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF * This function returns 1 if client was served (and should be unblocked) */ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() @@ -7566,14 +7566,14 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { /* This API is used by the Redis core to unblock a client that was blocked * by a module. */ void moduleUnblockClient(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; moduleUnblockClientByHandle(bc,NULL); } /* Return true if the client 'c' was blocked by a module using * RM_BlockClientOnKeys(). */ int moduleClientIsBlockedOnKeys(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; return bc->blocked_on_keys; } @@ -7740,10 +7740,10 @@ void moduleHandleBlockedClients(void) { * moduleBlockedClientTimedOut(). */ int moduleBlockedClientMayTimeout(client *c) { - if (c->btype != BLOCKED_MODULE) + if (c->bstate.btype != BLOCKED_MODULE) return 1; - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; return (bc && bc->timeout_callback != NULL); } @@ -7752,7 +7752,7 @@ int moduleBlockedClientMayTimeout(client *c) { * does not need to do any cleanup. Eventually the module will call the * API to unblock the client and the memory will be released. */ void moduleBlockedClientTimedOut(client *c) { - RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() @@ -7767,9 +7767,8 @@ void moduleBlockedClientTimedOut(client *c) { long long prev_error_replies = server.stat_total_error_replies; bc->timeout_callback(&ctx,(void**)c->argv,c->argc); moduleFreeContext(&ctx); - if (!bc->blocked_on_keys) { - updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); - } + updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); + /* For timeout events, we do not want to call the disconnect callback, * because the blocked client will be automatically disconnected in * this case, and the user can still hook using the timeout callback. */ diff --git a/src/networking.c b/src/networking.c index 054bca6a0787..ed4f8858100b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -165,6 +165,7 @@ client *createClient(connection *conn) { c->flags = 0; c->slot = -1; c->ctime = c->lastinteraction = server.unixtime; + c->duration = 0; clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; c->repl_start_cmd_stream_on_ack = 0; @@ -184,15 +185,7 @@ client *createClient(connection *conn) { c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); - c->btype = BLOCKED_NONE; - c->bpop.timeout = 0; - c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType); - c->bpop.target = NULL; - c->bpop.xread_group = NULL; - c->bpop.xread_consumer = NULL; - c->bpop.xread_group_noack = 0; - c->bpop.numreplicas = 0; - c->bpop.reploffset = 0; + initClientBlockingState(c); c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType); @@ -1588,7 +1581,7 @@ void freeClient(client *c) { /* Deallocate structures used to block on blocking ops. */ if (c->flags & CLIENT_BLOCKED) unblockClient(c); - dictRelease(c->bpop.keys); + dictRelease(c->bstate.keys); /* UNWATCH all the keys */ unwatchAllKeys(c); @@ -2033,6 +2026,8 @@ void resetClient(client *c) { c->multibulklen = 0; c->bulklen = -1; c->slot = -1; + c->duration = 0; + c->flags &= ~CLIENT_EXECUTING_COMMAND; if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); @@ -3142,12 +3137,11 @@ NULL * it also doesn't expect to be unblocked by CLIENT UNBLOCK */ if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) { if (unblock_error) - addReplyError(target, + unblockClientOnError(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); else - replyToBlockedClientTimedOut(target); - unblockClient(target); - updateStatsOnUnblock(target, 0, 0, 1); + unblockClientOnTimeout(target); + addReply(c,shared.cone); } else { addReply(c,shared.czero); diff --git a/src/replication.c b/src/replication.c index 1b2ef6731b49..55636fd77240 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3480,11 +3480,7 @@ void waitCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from slaves. */ - c->bpop.timeout = timeout; - c->bpop.reploffset = offset; - c->bpop.numreplicas = numreplicas; - listAddNodeHead(server.clients_waiting_acks,c); - blockClient(c,BLOCKED_WAIT); + blockForReplication(c,timeout,offset,numreplicas); /* Make sure that the server will send an ACK request to all the slaves * before returning to the event loop. */ @@ -3518,16 +3514,16 @@ void processClientsWaitingReplicas(void) { * offset and number of replicas, we remember it so the next client * may be unblocked without calling replicationCountAcksByOffset() * if the requested offset / replicas were equal or less. */ - if (last_offset && last_offset >= c->bpop.reploffset && - last_numreplicas >= c->bpop.numreplicas) + if (last_offset && last_offset >= c->bstate.reploffset && + last_numreplicas >= c->bstate.numreplicas) { unblockClient(c); addReplyLongLong(c,last_numreplicas); } else { - int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset); + int numreplicas = replicationCountAcksByOffset(c->bstate.reploffset); - if (numreplicas >= c->bpop.numreplicas) { - last_offset = c->bpop.reploffset; + if (numreplicas >= c->bstate.numreplicas) { + last_offset = c->bstate.reploffset; last_numreplicas = numreplicas; unblockClient(c); addReplyLongLong(c,numreplicas); diff --git a/src/script.c b/src/script.c index 2a770fa61443..fd47e390e1e7 100644 --- a/src/script.c +++ b/src/script.c @@ -559,7 +559,7 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) { goto error; } - int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; + int call_flags = CMD_CALL_NONE; if (run_ctx->repl_flags & PROPAGATE_AOF) { call_flags |= CMD_CALL_PROPAGATE_AOF; } diff --git a/src/script_lua.c b/src/script_lua.c index b22f9e5b97fe..c86cdc2d1aac 100644 --- a/src/script_lua.c +++ b/src/script_lua.c @@ -981,7 +981,7 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) { c->argc = c->argv_len = 0; c->user = NULL; c->argv = NULL; - freeClientArgv(c); + resetClient(c); inuse--; if (raise_error) { diff --git a/src/server.c b/src/server.c index 380b20d323b2..eaad37c070b7 100644 --- a/src/server.c +++ b/src/server.c @@ -92,6 +92,10 @@ const char *replstateToString(int replstate); /*============================ Utility functions ============================ */ +/* This macro tells if we are in the context of loading an AOF. */ +#define isAOFLoadingContext() \ + ((server.current_client && server.current_client->id == CLIENT_ID_AOF) ? 1 : 0) + /* We use a private localtime implementation which is fork-safe. The logging * function of Redis may be called from other threads. */ void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); @@ -3374,8 +3378,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags) { * * The following flags can be passed: * CMD_CALL_NONE No flags. - * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed. - * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE_REPL Send command to slaves if it modified the dataset @@ -3411,6 +3413,15 @@ void call(client *c, int flags) { long long dirty; uint64_t client_old_flags = c->flags; struct redisCommand *real_cmd = c->realcmd; + /* When call() is issued during loading the AOF we don't want commands called + * from module, exec or LUA to go into the slowlog or to populate statistics. */ + int update_command_stats = !isAOFLoadingContext(); + + /* We want to be aware of a client which is making a first time attempt to execute this command + * and a client which is reprocessing command again (after being unblocked). + * Blocked clients can be blocked in different places and not always it means the call() function has been + * called. For example this is required for avoiding double logging to monitors.*/ + int reprocessing_command = ((!server.execution_nesting) && (c->flags & CLIENT_EXECUTING_COMMAND)) ? 1 : 0; /* Initialization: clear the flags that must be set by the command on * demand, and initialize the array for additional commands propagation. */ @@ -3429,10 +3440,11 @@ void call(client *c, int flags) { const long long call_timer = ustime(); - /* Update cache time, in case we have nested calls we want to - * update only on the first call */ + /* Update cache time, and indicate we are starting command execution. + * in case we have nested calls we want to update only on the first call */ if (server.execution_nesting++ == 0) { updateCachedTimeWithUs(0,call_timer); + c->flags |= CLIENT_EXECUTING_COMMAND; } monotime monotonic_start = 0; @@ -3440,7 +3452,13 @@ void call(client *c, int flags) { monotonic_start = getMonotonicUs(); c->cmd->proc(c); - server.execution_nesting--; + + if (--server.execution_nesting == 0) { + /* In case client is blocked after trying to execute the command, + * it means the execution is not yet completed and we MIGHT reprocess the command in the future. */ + if (!(c->flags & CLIENT_BLOCKED)) + c->flags &= ~(CLIENT_EXECUTING_COMMAND); + } /* In order to avoid performance implication due to querying the clock using a system call 3 times, * we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */ @@ -3450,7 +3468,7 @@ void call(client *c, int flags) { else duration = ustime() - call_timer; - c->duration = duration; + c->duration += duration; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; @@ -3471,11 +3489,6 @@ void call(client *c, int flags) { c->flags |= CLIENT_CLOSE_AFTER_REPLY; } - /* When EVAL is called loading the AOF we don't want commands called - * from Lua to go into the slowlog or to populate statistics. */ - if (server.loading && c->flags & CLIENT_SCRIPT) - flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); - /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ @@ -3493,7 +3506,7 @@ void call(client *c, int flags) { /* Record the latency this command induced on the main thread. * unless instructed by the caller not to log. (happens when processing * a MULTI-EXEC from inside an AOF). */ - if (flags & CMD_CALL_SLOWLOG) { + if (update_command_stats) { char *latency_event = (real_cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); @@ -3501,12 +3514,15 @@ void call(client *c, int flags) { /* Log the command into the Slow log if needed. * If the client is blocked we will handle slowlog when it is unblocked. */ - if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED)) - slowlogPushCurrentCommand(c, real_cmd, duration); - - /* Send the command to clients in MONITOR mode if applicable. - * Administrative commands are considered too dangerous to be shown. */ - if (!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { + if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) + slowlogPushCurrentCommand(c, real_cmd, c->duration); + + /* Send the command to clients in MONITOR mode if applicable, + * since some administrative commands are considered too dangerous to be shown. + * Other exceptions is a client which is unblocked and retring to process the command + * or we are currently in the process of loading AOF. */ + if (update_command_stats && !reprocessing_command && + !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { robj **argv = c->original_argv ? c->original_argv : c->argv; int argc = c->original_argv ? c->original_argc : c->argc; replicationFeedMonitors(c,server.monitors,c->db->id,argv,argc); @@ -3517,13 +3533,13 @@ void call(client *c, int flags) { if (!(c->flags & CLIENT_BLOCKED)) freeClientOriginalArgv(c); - /* populate the per-command statistics that we show in INFO commandstats. */ - if (flags & CMD_CALL_STATS) { - real_cmd->microseconds += duration; + /* populate the per-command statistics that we show in INFO commandstats. + * If the client is blocked we will handle latency stats and duration when it is unblocked. */ + if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) { real_cmd->calls++; - /* If the client is blocked we will handle latency stats when it is unblocked. */ + real_cmd->microseconds += c->duration; if (server.latency_tracking_enabled && !(c->flags & CLIENT_BLOCKED)) - updateCommandLatencyHistogram(&(real_cmd->latency_histogram), duration*1000); + updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration*1000); } /* Propagate the command into the AOF and replication link. @@ -3584,7 +3600,8 @@ void call(client *c, int flags) { } } - server.stat_numcommands++; + if (!(c->flags & CLIENT_BLOCKED)) + server.stat_numcommands++; /* Record peak memory after each command and before the eviction that runs * before the next command. */ @@ -3735,6 +3752,10 @@ int processCommand(client *c) { moduleCallCommandFilters(c); + /* in case we are starting to ProcessCommand and we already have a command we assume + * this is a reprocessing of this command, so we do not want to perform some of the actions again. */ + int client_reprocessing_command = c->cmd ? 1 : 0; + /* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { securityWarningCommand(c); @@ -3746,36 +3767,40 @@ int processCommand(client *c) { if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE && !(server.busy_module_yield_flags & BUSY_MODULE_YIELD_CLIENTS)) { - c->bpop.timeout = 0; - blockClient(c,BLOCKED_POSTPONE); + blockPostponeClient(c); return C_OK; } /* Now lookup the command and check ASAP about trivial error conditions - * such as wrong arity, bad command name and so forth. */ - c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); - sds err; - if (!commandCheckExistence(c, &err)) { - rejectCommandSds(c, err); - return C_OK; - } - if (!commandCheckArity(c, &err)) { - rejectCommandSds(c, err); - return C_OK; - } - - /* Check if the command is marked as protected and the relevant configuration allows it */ - if (c->cmd->flags & CMD_PROTECTED) { - if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) || - (c->cmd->proc == moduleCommand && !allowProtectedAction(server.enable_module_cmd, c))) - { - rejectCommandFormat(c,"%s command not allowed. If the %s option is set to \"local\", " - "you can run it from a local connection, otherwise you need to set this option " - "in the configuration file, and then restart the server.", - c->cmd->proc == debugCommand ? "DEBUG" : "MODULE", - c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command"); + * such as wrong arity, bad command name and so forth. + * In case we are reprocessing a command after it was blocked, + * we do not have to repeat the same checks */ + if (!client_reprocessing_command) { + c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); + sds err; + if (!commandCheckExistence(c, &err)) { + rejectCommandSds(c, err); + return C_OK; + } + if (!commandCheckArity(c, &err)) { + rejectCommandSds(c, err); return C_OK; + } + + /* Check if the command is marked as protected and the relevant configuration allows it */ + if (c->cmd->flags & CMD_PROTECTED) { + if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) || + (c->cmd->proc == moduleCommand && !allowProtectedAction(server.enable_module_cmd, c))) + { + rejectCommandFormat(c,"%s command not allowed. If the %s option is set to \"local\", " + "you can run it from a local connection, otherwise you need to set this option " + "in the configuration file, and then restart the server.", + c->cmd->proc == debugCommand ? "DEBUG" : "MODULE", + c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command"); + return C_OK; + + } } } @@ -4028,8 +4053,7 @@ int processCommand(client *c) { ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) { - c->bpop.timeout = 0; - blockClient(c,BLOCKED_POSTPONE); + blockPostponeClient(c); return C_OK; } @@ -5444,7 +5468,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { /* Clients */ if (all_sections || (dictFind(section_dict,"clients") != NULL)) { size_t maxin, maxout; + unsigned long blocking_keys, blocking_keys_on_nokey; getExpansiveClientsInfo(&maxin,&maxout); + totalNumberOfBlockingKeys(&blocking_keys, &blocking_keys_on_nokey); if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Clients\r\n" @@ -5455,14 +5481,18 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "client_recent_max_output_buffer:%zu\r\n" "blocked_clients:%d\r\n" "tracking_clients:%d\r\n" - "clients_in_timeout_table:%llu\r\n", + "clients_in_timeout_table:%llu\r\n" + "total_blocking_keys:%lu\r\n" + "total_blocking_keys_on_nokey:%lu\r\n", listLength(server.clients)-listLength(server.slaves), getClusterConnectionsCount(), server.maxclients, maxin, maxout, server.blocked_clients, server.tracking_clients, - (unsigned long long) raxSize(server.clients_timeout_table)); + (unsigned long long) raxSize(server.clients_timeout_table), + blocking_keys, + blocking_keys_on_nokey); } /* Memory */ diff --git a/src/server.h b/src/server.h index 765b67d62889..1201cab3b641 100644 --- a/src/server.h +++ b/src/server.h @@ -359,7 +359,11 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ -/* #define CLIENT_... (1<<29) currently unused, feel free to use in the future */ +#define CLIENT_EXECUTING_COMMAND (1<<29) /* Indicates that the client is currently in the process of handling + a command. usually this will be marked only during call() + however, blocked clients might have this flag kept until they + will try to reprocess the command. */ + #define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully * parsed command ready for execution. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to @@ -388,15 +392,18 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ -#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */ -#define BLOCKED_LIST 1 /* BLPOP & co. */ -#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ -#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ -#define BLOCKED_STREAM 4 /* XREAD. */ -#define BLOCKED_ZSET 5 /* BZPOP et al. */ -#define BLOCKED_POSTPONE 6 /* Blocked by processCommand, re-try processing later. */ -#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */ -#define BLOCKED_NUM 8 /* Number of blocked states. */ +typedef enum blocking_type { + BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ + BLOCKED_LIST, /* BLPOP & co. */ + BLOCKED_WAIT, /* WAIT for synchronous replication. */ + BLOCKED_MODULE, /* Blocked by a loadable module. */ + BLOCKED_STREAM, /* XREAD. */ + BLOCKED_ZSET, /* BZPOP et al. */ + BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ + BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_NUM, /* Number of blocked states. */ + BLOCKED_END /* End of enumeration */ +} blocking_type; /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -569,13 +576,11 @@ typedef enum { /* Command call flags, see call() function */ #define CMD_CALL_NONE 0 -#define CMD_CALL_SLOWLOG (1<<0) -#define CMD_CALL_STATS (1<<1) -#define CMD_CALL_PROPAGATE_AOF (1<<2) -#define CMD_CALL_PROPAGATE_REPL (1<<3) +#define CMD_CALL_PROPAGATE_AOF (1<<0) +#define CMD_CALL_PROPAGATE_REPL (1<<1) #define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL) -#define CMD_CALL_FULL (CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE) -#define CMD_CALL_FROM_MODULE (1<<4) /* From RM_Call */ +#define CMD_CALL_FULL (CMD_CALL_PROPAGATE) +#define CMD_CALL_FROM_MODULE (1<<2) /* From RM_Call */ /* Command propagation flags, see propagateNow() function */ #define PROPAGATE_NONE 0 @@ -992,27 +997,13 @@ typedef struct multiState { * The fields used depend on client->btype. */ typedef struct blockingState { /* Generic fields. */ - long count; /* Elements to pop if count was specified (BLMPOP/BZMPOP), -1 otherwise. */ - mstime_t timeout; /* Blocking operation timeout. If UNIX current time - * is > timeout then the operation timed out. */ - - /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */ - dict *keys; /* The keys we are waiting to terminate a blocking - * operation such as BLPOP or XREAD. Or NULL. */ - robj *target; /* The key that should receive the element, - * for BLMOVE. */ - struct blockPos { - int wherefrom; /* Where to pop from */ - int whereto; /* Where to push to */ - } blockpos; /* The positions in the src/dst lists/zsets - * where we want to pop/push an element - * for BLPOP, BRPOP, BLMOVE and BZMPOP. */ - - /* BLOCK_STREAM */ - size_t xread_count; /* XREAD COUNT option. */ - robj *xread_group; /* XREADGROUP group name. */ - robj *xread_consumer; /* XREADGROUP consumer name. */ - int xread_group_noack; + blocking_type btype; /* Type of blocking op if CLIENT_BLOCKED. */ + mstime_t timeout; /* Blocking operation timeout. If UNIX current time + * is > timeout then the operation timed out. */ + int unblock_on_nokey; /* Whether to unblock the client when at least one of the keys + is deleted or does not exist anymore */ + /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM or any other Keys related blocking */ + dict *keys; /* The keys we are blocked on */ /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ @@ -1029,7 +1020,7 @@ typedef struct blockingState { * operation such as B[LR]POP, but received new data in the context of the * last executed command. * - * After the execution of every command or script, we run this list to check + * After the execution of every command or script, we iterate over this list to check * if as a result we should serve data to clients blocked, unblocking them. * Note that server.ready_keys will not have duplicates as there dictionary * also called ready_keys in every structure representing a Redis database, @@ -1167,8 +1158,7 @@ typedef struct client { int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ int slave_req; /* Slave requirements: SLAVE_REQ_* */ multiState mstate; /* MULTI/EXEC state */ - int btype; /* Type of blocking op if CLIENT_BLOCKED. */ - blockingState bpop; /* blocking state */ + blockingState bstate; /* blocking state */ long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ @@ -2635,7 +2625,6 @@ int listTypeEqual(listTypeEntry *entry, robj *o); void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry); robj *listTypeDup(robj *o); void listTypeDelRange(robj *o, long start, long stop); -void unblockClientWaitingData(client *c); void popGenericCommand(client *c, int where); void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int signal, int *deleted); typedef enum { @@ -2938,6 +2927,7 @@ int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); int processCommand(client *c); int processPendingCommandAndInputBuffer(client *c); +int processCommandAndResetClient(client *c); void setupSignalHandlers(void); void removeSignalHandlers(void); int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler); @@ -3166,6 +3156,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, #define LOOKUP_NOSTATS (1<<2) /* Don't update keyspace hits/misses counters. */ #define LOOKUP_WRITE (1<<3) /* Delete expired keys even in replicas. */ #define LOOKUP_NOEXPIRE (1<<4) /* Avoid deleting lazy expired keys. */ +#define LOKKUP_NOEFFECTS (LOOKUP_NONOTIFY | LOOKUP_NOSTATS | LOOKUP_NOTOUCH | LOOKUP_NOEXPIRE) /* Avoid any effects from fetching the key */ void dbAdd(redisDb *db, robj *key, robj *val); int dbAddRDBLoad(redisDb *db, sds key, robj *val); @@ -3281,20 +3272,28 @@ typedef struct luaScript { robj *body; } luaScript; -/* Blocked clients */ +/* Blocked clients API */ void processUnblockedClients(void); +void initClientBlockingState(client *c); void blockClient(client *c, int btype); void unblockClient(client *c); +void unblockClientOnTimeout(client *c); +void unblockClientOnError(client *c, const char *err_str); void queueClientForReprocessing(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); +void blockClientShutdown(client *c); +void blockPostponeClient(client *c); +void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); void signalDeletedKeyAsReady(redisDb *db, robj *key, int type); -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with); +void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey); + /* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); diff --git a/src/t_list.c b/src/t_list.c index d7906d391ce7..ee5f6fd9105d 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -1200,103 +1200,6 @@ void rpoplpushCommand(client *c) { lmoveGenericCommand(c, LIST_TAIL, LIST_HEAD); } -/*----------------------------------------------------------------------------- - * Blocking POP operations - *----------------------------------------------------------------------------*/ - -/* This is a helper function for handleClientsBlockedOnKeys(). Its work - * is to serve a specific client (receiver) that is blocked on 'key' - * in the context of the specified 'db', doing the following: - * - * 1) Provide the client with the 'value' element or a range of elements. - * We will do the pop in here and caller does not need to bother the return. - * 2) If the dstkey is not NULL (we are serving a BLMOVE) also push the - * 'value' element on the destination list (the "push" side of the command). - * 3) Propagate the resulting BRPOP, BLPOP, BLMPOP and additional xPUSH if any into - * the AOF and replication channel. - * - * The argument 'wherefrom' is LIST_TAIL or LIST_HEAD, and indicates if the - * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that - * we can propagate the command properly. - * - * The argument 'whereto' is LIST_TAIL or LIST_HEAD, and indicates if the - * 'value' element is to be pushed to the head or tail so that we can - * propagate the command properly. - * - * 'deleted' is an optional output argument to get an indication - * if the key got deleted by this function. */ -void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted) -{ - robj *argv[5]; - robj *value = NULL; - - if (deleted) *deleted = 0; - - if (dstkey == NULL) { - /* Propagate the [LR]POP operation. */ - argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop : - shared.rpop; - argv[1] = key; - - if (receiver->lastcmd->proc == blmpopCommand) { - /* Propagate the [LR]POP COUNT operation. */ - long count = receiver->bpop.count; - serverAssert(count > 0); - long llen = listTypeLength(o); - serverAssert(llen > 0); - - argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count); - alsoPropagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(argv[2]); - - /* Pop a range of elements in a nested arrays way. */ - listPopRangeAndReplyWithKey(receiver, o, key, wherefrom, count, 0, deleted); - return; - } - - alsoPropagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL); - - /* BRPOP/BLPOP */ - value = listTypePop(o, wherefrom); - serverAssert(value != NULL); - - addReplyArrayLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,value); - - /* We don't call signalModifiedKey() as it was already called - * when an element was pushed on the list. */ - listElementsRemoved(receiver,key,wherefrom,o,1,0,deleted); - } else { - /* BLMOVE */ - robj *dstobj = - lookupKeyWrite(receiver->db,dstkey); - if (!(dstobj && - checkType(receiver,dstobj,OBJ_LIST))) - { - value = listTypePop(o, wherefrom); - serverAssert(value != NULL); - - /* "lpush" or "rpush" notify event will be notified by lmoveHandlePush. */ - lmoveHandlePush(receiver,dstkey,dstobj,value,whereto); - /* Propagate the LMOVE/RPOPLPUSH operation. */ - int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand); - argv[0] = isbrpoplpush ? shared.rpoplpush : shared.lmove; - argv[1] = key; - argv[2] = dstkey; - argv[3] = getStringObjectFromListPosition(wherefrom); - argv[4] = getStringObjectFromListPosition(whereto); - alsoPropagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL); - - /* We don't call signalModifiedKey() as it was already called - * when an element was pushed on the list. */ - listElementsRemoved(receiver,key,wherefrom,o,1,0,deleted); - } - } - - if (value) decrRefCount(value); -} - /* Blocking RPOP/LPOP/LMPOP * * 'numkeys' is the number of keys. @@ -1368,8 +1271,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i } /* If the keys do not exist we must block */ - struct blockPos pos = {where}; - blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL,0); + blockForKeys(c,BLOCKED_LIST,keys,numkeys,timeout,0); } /* BLPOP [ ...] */ @@ -1393,8 +1295,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou addReplyNull(c); } else { /* The list is empty and the client blocks. */ - struct blockPos pos = {wherefrom, whereto}; - blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL,0); + blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,0); } } else { /* The list exists and has elements, so diff --git a/src/t_stream.c b/src/t_stream.c index b2e58dc159c4..16ad044a2c68 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2409,27 +2409,19 @@ void xreadCommand(client *c) { addReplyNullArray(c); goto cleanup; } - blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, - -1, timeout, NULL, NULL, ids, xreadgroup); - /* If no COUNT is given and we block, set a relatively small count: - * in case the ID provided is too low, we do not want the server to - * block just to serve this client a huge stream of messages. */ - c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; - - /* If this is a XREADGROUP + GROUP we need to remember for which - * group and consumer name we are blocking, so later when one of the - * keys receive more data, we can call streamReplyWithRange() passing - * the right arguments. */ - if (groupname) { - incrRefCount(groupname); - incrRefCount(consumername); - c->bpop.xread_group = groupname; - c->bpop.xread_consumer = consumername; - c->bpop.xread_group_noack = noack; - } else { - c->bpop.xread_group = NULL; - c->bpop.xread_consumer = NULL; + /* We change the '$' to the current last ID for this stream. this is + * Since later on when we unblock on arriving data - we would like to + * re-process the command and in case '$' stays we will spin-block forever. + */ + for (int id_idx = 0; id_idx < streams_count; id_idx++) { + int arg_idx = id_idx + streams_arg + streams_count; + if (strcmp(c->argv[arg_idx]->ptr,"$") == 0) { + robj *argv_streamid = createObjectFromStreamID(&ids[id_idx]); + rewriteClientCommandArgument(c, arg_idx, argv_streamid); + decrRefCount(argv_streamid); + } } + blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, xreadgroup); goto cleanup; } diff --git a/src/t_zset.c b/src/t_zset.c index eb87c5d7dac7..048759d1f50d 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -4071,8 +4071,7 @@ void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where, } /* If the keys do not exist we must block */ - struct blockPos pos = {where}; - blockForKeys(c,BLOCKED_ZSET,keys,numkeys,count,timeout,NULL,&pos,NULL,0); + blockForKeys(c,BLOCKED_ZSET,keys,numkeys,timeout,0); } // BZPOPMIN key [key ...] timeout diff --git a/src/timeout.c b/src/timeout.c index 678a248574b0..b62423a9e684 100644 --- a/src/timeout.c +++ b/src/timeout.c @@ -36,12 +36,11 @@ * Otherwise 0 is returned and no operation is performed. */ int checkBlockedClientTimeout(client *c, mstime_t now) { if (c->flags & CLIENT_BLOCKED && - c->bpop.timeout != 0 - && c->bpop.timeout < now) + c->bstate.timeout != 0 + && c->bstate.timeout < now) { /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); + unblockClientOnTimeout(c); return 1; } else { return 0; @@ -71,7 +70,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { * into keys no longer served by this server. */ if (server.cluster_enabled) { if (clusterRedirectBlockedClientIfNeeded(c)) - unblockClient(c); + unblockClientOnError(c, NULL); } } return 0; @@ -112,8 +111,8 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, client **cptr) { * to handle blocked clients timeouts. The client is not added to the list * if its timeout is zero (block forever). */ void addClientToTimeoutTable(client *c) { - if (c->bpop.timeout == 0) return; - uint64_t timeout = c->bpop.timeout; + if (c->bstate.timeout == 0) return; + uint64_t timeout = c->bstate.timeout; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,timeout,c); if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) @@ -125,7 +124,7 @@ void addClientToTimeoutTable(client *c) { void removeClientFromTimeoutTable(client *c) { if (!(c->flags & CLIENT_IN_TO_TABLE)) return; c->flags &= ~CLIENT_IN_TO_TABLE; - uint64_t timeout = c->bpop.timeout; + uint64_t timeout = c->bstate.timeout; unsigned char buf[CLIENT_ST_KEYLEN]; encodeTimeoutKey(buf,timeout,c); raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 435e8fcde3ab..175506c17b53 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -182,7 +182,8 @@ start_server {tags {"repl external:skip"}} { [$B lrange foo 0 -1] eq {a b c} } else { fail "Master and replica have different digest: [$A debug digest] VS [$B debug digest]" - } + } + assert_match {*calls=1,*,rejected_calls=0,failed_calls=1*} [cmdrstat blpop $B] } } } diff --git a/tests/modules/misc.c b/tests/modules/misc.c index 64b45690431d..cc159a6ce0bc 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -359,6 +359,14 @@ int test_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ return REDISMODULE_OK; } +/* wrapper for RM_Call which also replicates the module command */ +int test_rm_call_replicate(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + test_rm_call(ctx, argv, argc); + RedisModule_ReplicateVerbatim(ctx); + + return REDISMODULE_OK; +} + /* wrapper for RM_Call with flags */ int test_rm_call_flags(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ if(argc < 3){ @@ -497,6 +505,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx, "test.rm_call_flags", test_rm_call_flags,"allow-stale", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "test.rm_call_replicate", test_rm_call_replicate,"allow-stale", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index aceec3cca48d..4617334e7a9a 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -173,6 +173,49 @@ start_server {tags {"introspection"}} { $rd close } + + test {MONITOR log blocked command only once} { + + # need to reconnect in order to reset the clients state + reconnect + + set rd [redis_deferring_client] + set bc [redis_deferring_client] + r del mylist + + $rd monitor + $rd read ; # Discard the OK + + $bc blpop mylist 0 + wait_for_blocked_clients_count 1 + r lpush mylist 1 + wait_for_blocked_clients_count 0 + r lpush mylist 2 + + # we expect to see the blpop on the monitor first + assert_match {*"blpop"*"mylist"*"0"*} [$rd read] + + # we scan out all the info commands on the monitor + set monitor_output [$rd read] + while { [string match {*"info"*} $monitor_output] } { + set monitor_output [$rd read] + } + + # we expect to locate the lpush right when the client was unblocked + assert_match {*"lpush"*"mylist"*"1"*} $monitor_output + + # we scan out all the info commands + set monitor_output [$rd read] + while { [string match {*"info"*} $monitor_output] } { + set monitor_output [$rd read] + } + + # we expect to see the next lpush and not duplicate blpop command + assert_match {*"lpush"*"mylist"*"2"*} $monitor_output + + $rd close + $bc close + } test {CLIENT GETNAME should return NIL if name is not assigned} { r client getname diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index ce4aaae44602..90a369da216d 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -1,4 +1,5 @@ set testmodule [file normalize tests/modules/propagate.so] +set miscmodule [file normalize tests/modules/misc.so] set keyspace_events [file normalize tests/modules/keyspace_events.so] tags "modules" { @@ -721,5 +722,42 @@ tags "modules aof" { assert_equal [s 0 unexpected_error_replies] 0 } } + test "Modules RM_Call does not update stats during aof load: AOF-load type $aofload_type" { + start_server [list overrides [list loadmodule "$miscmodule"]] { + # Enable the AOF + r config set appendonly yes + r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite. + waitForBgrewriteaof r + + r config resetstat + r set foo bar + r EVAL {return redis.call('SET', KEYS[1], ARGV[1])} 1 foo bar2 + r test.rm_call_replicate set foo bar3 + r EVAL {return redis.call('test.rm_call_replicate',ARGV[1],KEYS[1],ARGV[2])} 1 foo set bar4 + + r multi + r set foo bar5 + r EVAL {return redis.call('SET', KEYS[1], ARGV[1])} 1 foo bar6 + r test.rm_call_replicate set foo bar7 + r EVAL {return redis.call('test.rm_call_replicate',ARGV[1],KEYS[1],ARGV[2])} 1 foo set bar8 + r exec + + assert_match {*calls=8,*,rejected_calls=0,failed_calls=0} [cmdrstat set r] + + + # Load the AOF + if {$aofload_type == "debug_cmd"} { + r config resetstat + r debug loadaof + } else { + r config rewrite + restart_server 0 true false + wait_done_loading r + } + + assert_no_match {*calls=*} [cmdrstat set r] + + } + } } } diff --git a/tests/unit/slowlog.tcl b/tests/unit/slowlog.tcl index 8ce1b1c27bb4..bc15c641138d 100644 --- a/tests/unit/slowlog.tcl +++ b/tests/unit/slowlog.tcl @@ -200,4 +200,26 @@ start_server {tags {"slowlog"} overrides {slowlog-log-slower-than 1000000}} { assert_equal 3 [llength [r slowlog get -1]] assert_equal 3 [llength [r slowlog get 3]] } + + test {SLOWLOG - blocking command is reported only after unblocked} { + # Cleanup first + r del mylist + # create a test client + set rd [redis_deferring_client] + + # config the slowlog and reset + r config set slowlog-log-slower-than 0 + r config set slowlog-max-len 110 + r slowlog reset + + $rd BLPOP mylist 0 + wait_for_blocked_clients_count 1 50 20 + assert_equal 0 [llength [regexp -all -inline (?=BLPOP) [r slowlog get]]] + + r LPUSH mylist 1 + wait_for_blocked_clients_count 0 50 20 + assert_equal 1 [llength [regexp -all -inline (?=BLPOP) [r slowlog get]]] + + $rd close + } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 0d00fcedbf9f..f7e043f99a64 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -1101,10 +1101,10 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { r rpush k hello r pexpire k 100 set rd [redis_deferring_client] + $rd deferred 0 $rd select 9 - assert_equal {OK} [$rd read] - $rd client id - set id [$rd read] + set id [$rd client id] + $rd deferred 1 $rd brpop k 1 wait_for_blocked_clients_count 1 after 101 @@ -1115,6 +1115,13 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { assert_match "*flags=b*" [r client list id $id] r client unblock $id assert_equal {} [$rd read] + $rd deferred 0 + # We want to force key deletion to be propagated to the replica + # in order to verify it was expiered on the replication stream. + $rd set somekey1 someval1 + $rd exists k + r set somekey2 someval2 + assert_replication_stream $repl { {select *} {flushall} @@ -1123,11 +1130,14 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { {pexpireat k *} {swapdb 1 9} {select 9} + {set somekey1 someval1} {del k} + {select 1} + {set somekey2 someval2} } close_replication_stream $repl - # Restore server and client state r debug set-active-expire 1 + # Restore server and client state r select 9 } {OK} {singledb:skip needs:debug} @@ -1154,6 +1164,10 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { assert_match "*flags=b*" [r client list id $id] r client unblock $id assert_equal {} [$rd read] + # We want to force key deletion to be propagated to the replica + # in order to verify it was expiered on the replication stream. + $rd exists k + assert_equal {0} [$rd read] assert_replication_stream $repl { {select *} {flushall} @@ -2161,4 +2175,120 @@ foreach {pop} {BLPOP BLMPOP_RIGHT} { assert_equal [lpop k] [string repeat x 31] set _ $k } {12 0 9223372036854775808 2147483647 32767 127} + + test "Unblock fairness is kept while pipelining" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + # delete the list in case already exists + r del mylist + + # block a client on the list + $rd1 BLPOP mylist 0 + wait_for_blocked_clients_count 1 + + # pipline on other client a list push and a blocking pop + # we should expect the fainess to be kept and have $rd1 + # being unblocked + set buf "" + append buf "LPUSH mylist 1\r\n" + append buf "BLPOP mylist 0\r\n" + $rd2 write $buf + $rd2 flush + + # we check that we still have 1 blocked client + # and that the first blocked client has been served + assert_equal [$rd1 read] {mylist 1} + assert_equal [$rd2 read] {1} + wait_for_blocked_clients_count 1 + + # We no unblock the last client and verify it was served last + r LPUSH mylist 2 + wait_for_blocked_clients_count 0 + assert_equal [$rd2 read] {mylist 2} + + $rd1 close + $rd2 close + } + + test "Unblock fairness is kept during nested unblock" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + set rd3 [redis_deferring_client] + + # delete the list in case already exists + r del l1{t} l2{t} l3{t} + + # block a client on the list + $rd1 BRPOPLPUSH l1{t} l3{t} 0 + wait_for_blocked_clients_count 1 + + $rd2 BLPOP l2{t} 0 + wait_for_blocked_clients_count 2 + + $rd3 BLMPOP 0 2 l2{t} l3{t} LEFT COUNT 1 + wait_for_blocked_clients_count 3 + + r multi + r lpush l1{t} 1 + r lpush l2{t} 2 + r exec + + wait_for_blocked_clients_count 0 + + assert_equal [$rd1 read] {1} + assert_equal [$rd2 read] {l2{t} 2} + assert_equal [$rd3 read] {l3{t} 1} + + $rd1 close + $rd2 close + $rd3 close + } + + test "Blocking command acounted only once in commandstats" { + # cleanup first + r del mylist + + # create a test client + set rd [redis_deferring_client] + + # reset the server stats + r config resetstat + + # block a client on the list + $rd BLPOP mylist 0 + wait_for_blocked_clients_count 1 + + # unblock the list + r LPUSH mylist 1 + wait_for_blocked_clients_count 0 + + assert_match {*calls=1,*,rejected_calls=0,failed_calls=0} [cmdrstat blpop r] + + $rd close + } + + test "Blocking command acounted only once in commandstats after timeout" { + # cleanup first + r del mylist + + # create a test client + set rd [redis_deferring_client] + $rd client id + set id [$rd read] + + # reset the server stats + r config resetstat + + # block a client on the list + $rd BLPOP mylist 0 + wait_for_blocked_clients_count 1 + + # unblock the client on timeout + r client unblock $id timeout + + assert_match {*calls=1,*,rejected_calls=0,failed_calls=0} [cmdrstat blpop r] + + $rd close + } } ;# stop servers diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index bae275b4e00b..5405d1b0d5b9 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -223,7 +223,7 @@ start_server { r XDEL mystream 667 set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">" - after 20 + wait_for_blocked_clients_count 0 assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}} $rd close } @@ -234,8 +234,9 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r DEL mystream - assert_error "*no longer exists*" {$rd read} + assert_error "NOGROUP*" {$rd read} $rd close } @@ -245,8 +246,9 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r SET mystream val1 - assert_error "*no longer exists*" {$rd read} + assert_error "*WRONGTYPE*" {$rd read} $rd close } @@ -256,11 +258,12 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r MULTI r DEL mystream r SADD mystream e1 r EXEC - assert_error "*no longer exists*" {$rd read} + assert_error "*WRONGTYPE*" {$rd read} $rd close } @@ -270,8 +273,9 @@ start_server { r XGROUP CREATE mystream mygroup $ set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r FLUSHALL - assert_error "*no longer exists*" {$rd read} + assert_error "*NOGROUP*" {$rd read} $rd close } @@ -286,8 +290,9 @@ start_server { $rd SELECT 9 $rd read $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r SWAPDB 4 9 - assert_error "*no longer exists*" {$rd read} + assert_error "*NOGROUP*" {$rd read} $rd close } {0} {external:skip} @@ -303,8 +308,9 @@ start_server { $rd SELECT 9 $rd read $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r SWAPDB 4 9 - assert_error "*no longer exists*" {$rd read} + assert_error "*WRONGTYPE*" {$rd read} $rd close } {0} {external:skip} @@ -313,6 +319,7 @@ start_server { r XADD mystream 666 f v set rd [redis_deferring_client] $rd XREAD BLOCK 0 STREAMS mystream "$" + wait_for_blocked_clients_count 1 r DEL mystream r XADD mystream 667 f v @@ -326,6 +333,7 @@ start_server { r XADD mystream 666 f v set rd [redis_deferring_client] $rd XREAD BLOCK 0 STREAMS mystream "$" + wait_for_blocked_clients_count 1 r SET mystream val1 r DEL mystream @@ -410,6 +418,55 @@ start_server { $rd close } + test {Blocking XREADGROUP for stream key that has clients blocked on list} { + set rd [redis_deferring_client] + set rd2 [redis_deferring_client] + + # First delete the stream + r DEL mystream + + # now place a client blocked on non-existing key as list + $rd2 BLPOP mystream 0 + + # wait until we verify the client is blocked + wait_for_blocked_clients_count 1 + + # verify we only have 1 regular blocking key + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # now write mystream as stream + r XADD mystream 666 key value + r XGROUP CREATE mystream mygroup $ MKSTREAM + + # block another client on xreadgroup + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream ">" + + # wait until we verify we have 2 blocked clients (one for the list and one for the stream) + wait_for_blocked_clients_count 2 + + # verify we have 1 blocking key which also have clients blocked on nokey condition + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # now delete the key and verify we have no clients blocked on nokey condition + r DEL mystream + assert_error "NOGROUP*" {$rd read} + assert_equal 1 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + # close the only left client and make sure we have no more blocking keys + $rd2 close + + # wait until we verify we have no more blocked clients + wait_for_blocked_clients_count 0 + + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys] + assert_equal 0 [getInfoProperty [r info clients] total_blocking_keys_on_nokey] + + $rd close + } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { r config resetstat r del mystream