Skip to content

Commit

Permalink
[BUG] Use local GC ctx in periodic callback (#2962)
Browse files Browse the repository at this point in the history
* Use local GC ctx and add logs

* get the right ctx

* per review

* add test

* Skip GC on cluster and temporarily comment out part of testMultiTextNested

* streamline code

* test fixed in #2967

* fix leak

Co-authored-by: oshadmi <omer.shadmi@redislabs.com>
Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com>
(cherry picked from commit 3f7c8db)
  • Loading branch information
Ariel Shtul authored and ashtul committed Aug 5, 2022
1 parent 41babe0 commit 75a01b1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/gc.c
Expand Up @@ -79,7 +79,7 @@ static void threadCallback(void* data) {
GCTask* task= data;
GCContext* gc = task->gc;
RedisModuleBlockedClient* bc = task->bClient;
RedisModuleCtx* ctx = RSDummyContext;
RedisModuleCtx* ctx = RedisModule_GetThreadSafeContext(NULL);

if (gc->stopped) {
// if the client is blocked, lets release it
Expand All @@ -88,6 +88,7 @@ static void threadCallback(void* data) {
RedisModule_UnblockClient(bc, NULL);
RedisModule_ThreadSafeContextUnlock(ctx);
}
RedisModule_FreeThreadSafeContext(ctx);
rm_free(task);
return;
}
Expand Down Expand Up @@ -116,6 +117,7 @@ static void threadCallback(void* data) {

end:
RedisModule_ThreadSafeContextUnlock(ctx);
RedisModule_FreeThreadSafeContext(ctx);
}

static void destroyCallback(void* data) {
Expand Down
17 changes: 15 additions & 2 deletions src/spec.c
Expand Up @@ -1336,11 +1336,14 @@ static IndexesScanner *IndexesScanner_New(IndexSpec *spec) {
if (spec->scanner) {
// cancel ongoing scan, keep on_progress indicator on
IndexesScanner_Cancel(spec->scanner, true);
RedisModule_Log(RSDummyContext, "notice", "Scanning index %s in background: cancelled and restarted",
spec->name);
}
spec->scanner = scanner;
spec->scan_in_progress = true;
} else {
global_spec_scanner = scanner;
RedisModule_Log(RSDummyContext, "notice", "Global scanner created");
}

return scanner;
Expand Down Expand Up @@ -1414,7 +1417,7 @@ static void Indexes_ScanProc(RedisModuleCtx *ctx, RedisModuleString *keyname, Re
static void Indexes_ScanAndReindexTask(IndexesScanner *scanner) {
RS_LOG_ASSERT(scanner, "invalid IndexesScanner");

RedisModuleCtx *ctx = RSDummyContext;
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(NULL);
RedisModuleScanCursor *cursor = RedisModule_ScanCursorCreate();
RedisModule_ThreadSafeContextLock(ctx);

Expand All @@ -1433,12 +1436,19 @@ static void Indexes_ScanAndReindexTask(IndexesScanner *scanner) {
RedisModule_ThreadSafeContextLock(ctx);

if (scanner->cancelled) {
RedisModule_Log(ctx, "notice", "Scanning indexes in background: cancelled (scanned=%ld)",
scanner->totalKeys);
goto end;
}
}

RedisModule_Log(ctx, "notice", "Scanning indexes in background: done (scanned=%ld)",
if (scanner->global) {
RedisModule_Log(ctx, "notice", "Scanning indexes in background: done (scanned=%ld)",
scanner->totalKeys);
} else {
RedisModule_Log(ctx, "notice", "Scanning index %s in background: done (scanned=%ld)",
scanner->spec->name, scanner->totalKeys);
}

end:
if (!scanner->cancelled && scanner->global) {
Expand All @@ -1449,6 +1459,7 @@ static void Indexes_ScanAndReindexTask(IndexesScanner *scanner) {

RedisModule_ThreadSafeContextUnlock(ctx);
RedisModule_ScanCursorDestroy(cursor);
RedisModule_FreeThreadSafeContext(ctx);
}

//---------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1891,6 +1902,7 @@ static void Indexes_LoadingEvent(RedisModuleCtx *ctx, RedisModuleEvent eid, uint
} else {
legacySpecDict = dictCreate(&dictTypeHeapStrings, NULL);
}
RedisModule_Log(RSDummyContext, "notice", "Loading event starts");
} else if (subevent == REDISMODULE_SUBEVENT_LOADING_ENDED) {
int hasLegacyIndexes = dictSize(legacySpecDict);
Indexes_UpgradeLegacyIndexes();
Expand All @@ -1907,6 +1919,7 @@ static void Indexes_LoadingEvent(RedisModuleCtx *ctx, RedisModuleEvent eid, uint
RedisModule_Log(ctx, "warning",
"Skip background reindex scan, redis version contains loaded event.");
}
RedisModule_Log(RSDummyContext, "notice", "Loading event ends");
}
}

Expand Down
18 changes: 18 additions & 0 deletions tests/pytests/test_gc.py
Expand Up @@ -271,3 +271,21 @@ def testGFreeEmpryTerms(env):
forceInvokeGC(env, 'idx')
env.expect('FT.DEBUG', 'DUMP_TERMS', 'idx').equal([])

def testAutoMemory_MOD_3951():
env = Env(moduleArgs='FORK_GC_CLEAN_THRESHOLD 0')
env.skipOnCluster()
conn = getConnectionByEnv(env)

# create index with filter
conn.execute_command('FT.CREATE', 'idx', 'FILTER', '@t == "5"', 'SCHEMA', 't', 'TEXT')
# add docs
conn = getConnectionByEnv(env)
for i in range(100):
conn.execute_command('HSET', i, 't', i % 10)
# delete 1 doc and trigger GC
conn.execute_command('DEL', 0)
forceInvokeGC(env, 'idx')
# call alter to trigger rescan
env.expect('FT.ALTER', 'idx', 'SCHEMA', 'ADD', '2nd', 'TEXT').equal('OK')

# This test should catch some leaks on the sanitizer

0 comments on commit 75a01b1

Please sign in to comment.