From 75a01b151dbfa56e40711a39e8b5767ceeb6d874 Mon Sep 17 00:00:00 2001 From: Ariel Shtul Date: Fri, 5 Aug 2022 12:34:15 +0300 Subject: [PATCH] [BUG] Use local GC ctx in periodic callback (#2962) * Use local GC ctx and add logs * get the right ctx * per review * add test * Skip GC on cluster and temporarily comment out part of testMultiTextNested * streamline code * test fixed in #2967 * fix leak Co-authored-by: oshadmi Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com> (cherry picked from commit 3f7c8dbc9d20d805c2fc544c6ec20351ba337a96) --- src/gc.c | 4 +++- src/spec.c | 17 +++++++++++++++-- tests/pytests/test_gc.py | 18 ++++++++++++++++++ 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/gc.c b/src/gc.c index a10d78e587..3b4e98f647 100644 --- a/src/gc.c +++ b/src/gc.c @@ -79,7 +79,7 @@ static void threadCallback(void* data) { GCTask* task= data; GCContext* gc = task->gc; RedisModuleBlockedClient* bc = task->bClient; - RedisModuleCtx* ctx = RSDummyContext; + RedisModuleCtx* ctx = RedisModule_GetThreadSafeContext(NULL); if (gc->stopped) { // if the client is blocked, lets release it @@ -88,6 +88,7 @@ static void threadCallback(void* data) { RedisModule_UnblockClient(bc, NULL); RedisModule_ThreadSafeContextUnlock(ctx); } + RedisModule_FreeThreadSafeContext(ctx); rm_free(task); return; } @@ -116,6 +117,7 @@ static void threadCallback(void* data) { end: RedisModule_ThreadSafeContextUnlock(ctx); + RedisModule_FreeThreadSafeContext(ctx); } static void destroyCallback(void* data) { diff --git a/src/spec.c b/src/spec.c index 59b48009dd..417f556188 100644 --- a/src/spec.c +++ b/src/spec.c @@ -1336,11 +1336,14 @@ static IndexesScanner *IndexesScanner_New(IndexSpec *spec) { if (spec->scanner) { // cancel ongoing scan, keep on_progress indicator on IndexesScanner_Cancel(spec->scanner, true); + RedisModule_Log(RSDummyContext, "notice", "Scanning index %s in background: cancelled and restarted", + spec->name); } spec->scanner = scanner; spec->scan_in_progress = true; } else { global_spec_scanner = scanner; + RedisModule_Log(RSDummyContext, "notice", "Global scanner created"); } return scanner; @@ -1414,7 +1417,7 @@ static void Indexes_ScanProc(RedisModuleCtx *ctx, RedisModuleString *keyname, Re static void Indexes_ScanAndReindexTask(IndexesScanner *scanner) { RS_LOG_ASSERT(scanner, "invalid IndexesScanner"); - RedisModuleCtx *ctx = RSDummyContext; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(NULL); RedisModuleScanCursor *cursor = RedisModule_ScanCursorCreate(); RedisModule_ThreadSafeContextLock(ctx); @@ -1433,12 +1436,19 @@ static void Indexes_ScanAndReindexTask(IndexesScanner *scanner) { RedisModule_ThreadSafeContextLock(ctx); if (scanner->cancelled) { + RedisModule_Log(ctx, "notice", "Scanning indexes in background: cancelled (scanned=%ld)", + scanner->totalKeys); goto end; } } - RedisModule_Log(ctx, "notice", "Scanning indexes in background: done (scanned=%ld)", + if (scanner->global) { + RedisModule_Log(ctx, "notice", "Scanning indexes in background: done (scanned=%ld)", scanner->totalKeys); + } else { + RedisModule_Log(ctx, "notice", "Scanning index %s in background: done (scanned=%ld)", + scanner->spec->name, scanner->totalKeys); + } end: if (!scanner->cancelled && scanner->global) { @@ -1449,6 +1459,7 @@ static void Indexes_ScanAndReindexTask(IndexesScanner *scanner) { RedisModule_ThreadSafeContextUnlock(ctx); RedisModule_ScanCursorDestroy(cursor); + RedisModule_FreeThreadSafeContext(ctx); } //--------------------------------------------------------------------------------------------- @@ -1891,6 +1902,7 @@ static void Indexes_LoadingEvent(RedisModuleCtx *ctx, RedisModuleEvent eid, uint } else { legacySpecDict = dictCreate(&dictTypeHeapStrings, NULL); } + RedisModule_Log(RSDummyContext, "notice", "Loading event starts"); } else if (subevent == REDISMODULE_SUBEVENT_LOADING_ENDED) { int hasLegacyIndexes = dictSize(legacySpecDict); Indexes_UpgradeLegacyIndexes(); @@ -1907,6 +1919,7 @@ static void Indexes_LoadingEvent(RedisModuleCtx *ctx, RedisModuleEvent eid, uint RedisModule_Log(ctx, "warning", "Skip background reindex scan, redis version contains loaded event."); } + RedisModule_Log(RSDummyContext, "notice", "Loading event ends"); } } diff --git a/tests/pytests/test_gc.py b/tests/pytests/test_gc.py index 04451bb4b3..5342e4d1e9 100644 --- a/tests/pytests/test_gc.py +++ b/tests/pytests/test_gc.py @@ -271,3 +271,21 @@ def testGFreeEmpryTerms(env): forceInvokeGC(env, 'idx') env.expect('FT.DEBUG', 'DUMP_TERMS', 'idx').equal([]) +def testAutoMemory_MOD_3951(): + env = Env(moduleArgs='FORK_GC_CLEAN_THRESHOLD 0') + env.skipOnCluster() + conn = getConnectionByEnv(env) + + # create index with filter + conn.execute_command('FT.CREATE', 'idx', 'FILTER', '@t == "5"', 'SCHEMA', 't', 'TEXT') + # add docs + conn = getConnectionByEnv(env) + for i in range(100): + conn.execute_command('HSET', i, 't', i % 10) + # delete 1 doc and trigger GC + conn.execute_command('DEL', 0) + forceInvokeGC(env, 'idx') + # call alter to trigger rescan + env.expect('FT.ALTER', 'idx', 'SCHEMA', 'ADD', '2nd', 'TEXT').equal('OK') + + # This test should catch some leaks on the sanitizer