Skip to content

Commit

Permalink
Before evicted and before expired server events are not executed insi…
Browse files Browse the repository at this point in the history
…de an execution unit.

This [PR](redis#9406) introduces new server event, `RedisModuleEvent_Key`. This new event allows to read a key data just before it removed from the database (either deleted, expired, evicted, or overwritten).

When the key is removed from the database, either by active expire or eviction. The new event was not called as part of an execution unit. This can cause an issue if the module registers a post notification job inside the event. This job will not be executed atomically with the expiration/eviction operation and will not replicated inside a Multi/Exec. Moreover, the post notification job will be executed right after the event where it is still not safe to perform any write operation, this will violate the promise that post notification job will be called atomically with the operation that triggered it and **only when it is safe to write**.

The PR fixes the issue by wrapping each expiration/eviction of a key with an execution unit. This make sure the entire operation will run atomically and all the post notification jobs will be executed at the end where it safe to write.

Tests were modified to verify the fix.
  • Loading branch information
MeirShpilraien committed Nov 6, 2023
1 parent 282b82e commit cadd1d9
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/evict.c
Expand Up @@ -682,6 +682,7 @@ int performEvictions(void) {
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
enterExecutionUnit(1, 0);
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);
Expand All @@ -694,6 +695,7 @@ int performEvictions(void) {
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(keyobj);
keys_freed++;
Expand Down
2 changes: 2 additions & 0 deletions src/expire.c
Expand Up @@ -54,10 +54,12 @@
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
enterExecutionUnit(1, 0);
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
deleteExpiredKeyAndPropagate(db,keyobj);
decrRefCount(keyobj);
exitExecutionUnit();
return 1;
} else {
return 0;
Expand Down
34 changes: 34 additions & 0 deletions tests/modules/postnotifications.c
Expand Up @@ -90,6 +90,10 @@ static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const cha
return REDISMODULE_OK; /* do not count the evicted key */
}

if (strncmp(key_str, "before_evicted", 14) == 0) {
return REDISMODULE_OK; /* do not count the before_evicted key */
}

RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
Expand Down Expand Up @@ -186,6 +190,32 @@ static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleSt
return REDISMODULE_OK;
}

static void KeySpace_ServerEventCallback(RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data) {
REDISMODULE_NOT_USED(eid);
REDISMODULE_NOT_USED(data);
if (subevent > 3) {
RedisModule_Log(ctx, "warning", "Got an unexpected subevent '%ld'", subevent);
return;
}
static const char* events[] = {
"before_deleted",
"before_expired",
"before_evicted",
"before_overwritten",
};

const RedisModuleString *key_name = RedisModule_GetKeyNameFromModuleKey(((RedisModuleKeyInfo*)data)->key);
const char *key_str = RedisModule_StringPtrLen(key_name, NULL);
const char *event = events[subevent];
size_t event_len = strlen(event);
if (strncmp(key_str, event , event_len) == 0) {
return; /* avoid loops */
}

RedisModuleString *new_key = RedisModule_CreateString(NULL, event, event_len);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
}

/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
Expand Down Expand Up @@ -222,6 +252,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}

if(RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, KeySpace_ServerEventCallback) != REDISMODULE_OK){
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/moduleapi/async_rm_call.tcl
Expand Up @@ -321,6 +321,7 @@ start_server {tags {"modules"}} {
{lpop l}
{set string_foo 1}
{set string_bar 2}
{incr before_deleted}
{incr string_changed{string_foo}}
{incr string_changed{string_bar}}
{incr string_total}
Expand Down Expand Up @@ -362,6 +363,7 @@ start_server {tags {"modules"}} {
{lpop l}
{set string_foo 1}
{set string_bar 2}
{incr before_deleted}
{incr string_changed{string_foo}}
{incr string_changed{string_bar}}
{incr string_total}
Expand All @@ -374,8 +376,11 @@ start_server {tags {"modules"}} {
{del string_foo}
{set string_foo 1}
{set string_bar 2}
{incr before_deleted}
{incr before_expired}
{incr expired}
{incr string_changed{string_foo}}
{incr before_overwritten}
{incr string_changed{string_bar}}
{incr string_total}
{incr string_total}
Expand Down
16 changes: 11 additions & 5 deletions tests/unit/moduleapi/postnotifications.tcl
Expand Up @@ -9,7 +9,7 @@ tags "modules" {
r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]

r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]
Expand All @@ -23,6 +23,7 @@ tags "modules" {
{exec}
{multi}
{set string_x 2}
{incr before_overwritten}
{incr string_changed{string_x}}
{incr string_total}
{exec}
Expand All @@ -37,7 +38,7 @@ tags "modules" {
assert_equal {OK} [r postnotification.async_set]
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]

assert_replication_stream $repl {
{multi}
{select *}
Expand Down Expand Up @@ -69,6 +70,7 @@ tags "modules" {
{pexpireat x *}
{multi}
{del x}
{incr before_expired}
{incr expired}
{exec}
}
Expand All @@ -91,6 +93,7 @@ tags "modules" {
{pexpireat x *}
{multi}
{del x}
{incr before_expired}
{incr expired}
{exec}
}
Expand All @@ -115,6 +118,7 @@ tags "modules" {
{multi}
{set read_x 1}
{del x}
{incr before_expired}
{incr expired}
{exec}
}
Expand Down Expand Up @@ -143,7 +147,7 @@ tags "modules" {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r config set maxmemory-policy allkeys-random
r config set maxmemory-policy allkeys-random
r config set maxmemory 1

assert_error {OOM *} {r set y 1}
Expand All @@ -153,6 +157,7 @@ tags "modules" {
{set x 1}
{multi}
{del x}
{incr before_evicted}
{incr evicted}
{exec}
}
Expand All @@ -172,7 +177,7 @@ tags "modules" {
r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]

r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]
Expand All @@ -190,6 +195,7 @@ tags "modules" {
{exec}
{multi}
{set string_x 2}
{incr before_overwritten}
{incr string_changed{string_x}}
{incr string_total}
{exec}
Expand All @@ -202,4 +208,4 @@ tags "modules" {
close_replication_stream $repl
}
}
}
}

0 comments on commit cadd1d9

Please sign in to comment.