From 99d950cc66343ddf834c0657ae2b117d7571ad1a Mon Sep 17 00:00:00 2001 From: Raz Monsonego <74051729+raz-mon@users.noreply.github.com> Date: Fri, 1 Dec 2023 08:52:03 +0000 Subject: [PATCH] MOD-5948: Respect timeout policy P1 (single shard) (#4038) (#4133) * wip * wip * fix resp3 response * fix resp2 section * fix total results counter report * fix loop * fix response condition * fix condition * fix leak * add timeout check after aggregation in strict timeout policy * fix coordinator to wait for shard-reply BEFORE polling for timeout * fix test * fix test * wait for reply after polling for timeout (so we don't loose data) * fix leak * fix leak * add test * non-related code touchups * reposition response section * move duplicated code to function * some fixes * address reveiw * fix nelem increment * fix use after free * address review * address comment --- coord/src/dist_aggregate.c | 1 - src/aggregate/aggregate.h | 2 + src/aggregate/aggregate_exec.c | 377 +++++++++++++++++++----------- src/aggregate/aggregate_request.c | 2 +- src/info_command.c | 2 +- src/query_error.h | 2 +- src/result_processor.c | 12 +- src/result_processor.h | 7 +- src/util/timeout.h | 14 +- src/vector_index.c | 2 +- tests/pytests/test.py | 3 +- tests/pytests/test_aggregate.py | 16 +- tests/pytests/test_coordinator.py | 9 +- 13 files changed, 290 insertions(+), 159 deletions(-) diff --git a/coord/src/dist_aggregate.c b/coord/src/dist_aggregate.c index 0b141a0ff7..f2849568b4 100644 --- a/coord/src/dist_aggregate.c +++ b/coord/src/dist_aggregate.c @@ -358,7 +358,6 @@ static int rpnetNext(ResultProcessor *self, SearchResult *r) { // get the next reply from the channel while (!root || !rows || MRReply_Length(rows) == 0) { - // if(TimedOut(&self->parent->sctx->timeout)) { if(TimedOut(&self->parent->sctx->timeout)) { // Set the `timedOut` flag in the MRIteratorCtx, later to be read by the // callback so that a `CURSOR DEL` command will be dispatched instead of diff --git a/src/aggregate/aggregate.h b/src/aggregate/aggregate.h index 0425e7712f..ad6365dff6 100644 --- a/src/aggregate/aggregate.h +++ b/src/aggregate/aggregate.h @@ -92,6 +92,8 @@ typedef enum { #define IsFormatExpand(r) ((r)->reqflags & QEXEC_FORMAT_EXPAND) #define IsWildcard(r) ((r)->ast.root->type == QN_WILDCARD) #define HasScorer(r) ((r)->optimizer->scorerType != SCORER_TYPE_NONE) +// Get the index search context from the result processor +#define RP_SCTX(rpctx) ((rpctx)->parent->sctx) #ifdef MT_BUILD // Indicates whether a query should run in the background. This diff --git a/src/aggregate/aggregate_exec.c b/src/aggregate/aggregate_exec.c index 1702aa5881..e07b1a81cc 100644 --- a/src/aggregate/aggregate_exec.c +++ b/src/aggregate/aggregate_exec.c @@ -293,208 +293,315 @@ static size_t getResultsFactor(AREQ *req) { } /** - * Sends a chunk of rows, optionally also sending the preamble - */ -void sendChunk(AREQ *req, RedisModule_Reply *reply, size_t limit) { - size_t nelem = 0; + * Aggregates all the results from the pipeline into a single array, and returns + * it. rc is populated with the latest return code from the pipeline. +*/ +static SearchResult **AggregateResults(ResultProcessor *rp, int *rc) { + SearchResult **results = array_new(SearchResult *, 8); SearchResult r = {0}; - int rc = RS_RESULT_EOF; - ResultProcessor *rp = req->qiter.endProc; - bool has_map = RedisModule_HasMap(reply); + while (rp->parent->resultLimit-- && (*rc = rp->Next(rp, &r)) == RS_RESULT_OK) { + array_append(results, SearchResult_Copy(&r)); - if (!(req->reqflags & QEXEC_F_IS_CURSOR) && !(req->reqflags & QEXEC_F_IS_SEARCH)) { - limit = req->maxAggregateResults; + // clean the search result + r = (SearchResult){0}; } - cachedVars cv = {0}; - cv.lastLk = AGPLN_GetLookup(&req->ap, NULL, AGPLN_GETLOOKUP_LAST); - cv.lastAstp = AGPLN_GetArrangeStep(&req->ap); + if (rc != RS_RESULT_OK) { + SearchResult_Destroy(&r); + } - // Set the chunk size limit for the query - rp->parent->resultLimit = limit; + return results; +} - //------------------------------------------------------------------------------------------- - if (has_map) // RESP3 variant - { - RedisModule_ReplyKV_Array(reply, "attributes"); - RedisModule_Reply_ArrayEnd(reply); +// Free's the results array and all the results inside it +static void destroyResults(SearchResult **results) { + if (results) { + for (size_t i = 0; i < array_len(results); i++) { + SearchResult_Destroy(results[i]); + rm_free(results[i]); + } + array_free(results); + } +} + +static bool ShouldReplyWithTimeoutError(int rc, AREQ *req) { + // TODO: Remove cursor condition (MOD-5992) + return rc == RS_RESULT_TIMEDOUT + && req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail + && !(req->reqflags & QEXEC_F_IS_CURSOR) + && !IsProfile(req); +} + +static void ReplyWithTimeoutError(RedisModule_Reply *reply) { + // TODO: Change to an error (MOD-5965) + RedisModule_Reply_SimpleString(reply, "Timeout limit was reached"); +} - rc = rp->Next(rp, &r); - long resultsLen = REDISMODULE_POSTPONED_ARRAY_LEN; - if (rc == RS_RESULT_TIMEDOUT && !(req->reqflags & QEXEC_F_IS_CURSOR) && !IsProfile(req) && - req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) { - resultsLen = 0; +static uint32_t populateReplyWithResults(RedisModule_Reply *reply,SearchResult **results,AREQ *req,cachedVars *cv) { + // populate the reply with an array containing the serialized results + int len = array_len(results); + for (uint32_t i = 0; i < len; i++) { + SearchResult *curr = array_pop(results); + serializeResult(req, reply, curr, cv); + SearchResult_Destroy(curr); + rm_free(curr); + } + array_free(results); + return len; +} + +long calc_results_len(AREQ *req, size_t limit) { + long resultsLen; + PLN_ArrangeStep *arng = AGPLN_GetArrangeStep(&req->ap); + size_t reqLimit = arng && arng->isLimited? arng->limit : DEFAULT_LIMIT; + size_t reqOffset = arng && arng->isLimited? arng->offset : 0; + size_t resultFactor = getResultsFactor(req); + + size_t expected_res = reqLimit + reqOffset <= req->maxSearchResults ? req->qiter.totalResults : MIN(req->maxSearchResults, req->qiter.totalResults); + size_t reqResults = expected_res > reqOffset ? expected_res - reqOffset : 0; + + return 1 + MIN(limit, MIN(reqLimit, reqResults)) * resultFactor; +} + +/** + * Sends a chunk of rows in the resp2 format +*/ +static void sendChunk_Resp2(AREQ *req, RedisModule_Reply *reply, size_t limit, + cachedVars cv) { + SearchResult r = {0}; + int rc = RS_RESULT_EOF; + ResultProcessor *rp = req->qiter.endProc; + SearchResult **results = NULL; + long nelem = 0, resultsLen = REDISMODULE_POSTPONED_ARRAY_LEN; + + if (req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) { + // Aggregate all results before populating the response + results = AggregateResults(rp, &rc); + // Check timeout after aggregation + if (TimedOut(&RP_SCTX(rp)->timeout) == TIMED_OUT) { + rc = RS_RESULT_TIMEDOUT; + } + } else { + // Send the results received from the pipeline as they come (no need to aggregate) + rc = rp->Next(rp, &r); + } + + // Set `resultsLen` to be the expected number of results in the response. + if (ShouldReplyWithTimeoutError(rc, req)) { + resultsLen = 1; } else if (rc == RS_RESULT_ERROR) { - resultsLen = 0; + resultsLen = 2; } else if (req->reqflags & QEXEC_F_IS_SEARCH && rc != RS_RESULT_TIMEDOUT && req->optimizer->type != Q_OPT_NO_SORTER) { - PLN_ArrangeStep *arng = AGPLN_GetArrangeStep(&req->ap); - size_t reqLimit = arng && arng->isLimited? arng->limit : DEFAULT_LIMIT; - size_t reqOffset = arng && arng->isLimited? arng->offset : 0; - size_t resultFactor = getResultsFactor(req); - - size_t expected_res = reqLimit + reqOffset <= req->maxSearchResults ? req->qiter.totalResults : MIN(req->maxSearchResults, req->qiter.totalResults); - size_t reqResults = expected_res > reqOffset ? expected_res - reqOffset : 0; - - resultsLen = MIN(limit, MIN(reqLimit, reqResults)); + resultsLen = calc_results_len(req, limit); } if (IsOptimized(req)) { QOptimizer_UpdateTotalResults(req); } - RedisModule_ReplyKV_Array(reply, "error"); // >errors - if (rc == RS_RESULT_TIMEDOUT) { - RedisModule_Reply_SimpleString(reply, "Timeout limit was reached"); + RedisModule_Reply_Array(reply); + + // Reply with a timeout or regular error, if needed (policy dependent) + if (ShouldReplyWithTimeoutError(rc, req)) { + // For now, embed this error inside an array - may be changed shortly + ReplyWithTimeoutError(reply); + } else if (rc == RS_RESULT_TIMEDOUT) { + // Set rc to OK such that we will respond with the partial results + rc = RS_RESULT_OK; + RedisModule_Reply_LongLong(reply, req->qiter.totalResults); } else if (rc == RS_RESULT_ERROR) { + // TODO: PLEASE, remove the irrelevant `totalResults` from the response - always 0, and not coherent with timeout error response format! + // TODO: Remove the double embedding of the error + + RedisModule_Reply_LongLong(reply, req->qiter.totalResults); + RedisModule_Reply_Array(reply); RedisModule_Reply_Error(reply, QueryError_GetError(req->qiter.err)); QueryError_ClearError(req->qiter.err); - } - RedisModule_Reply_ArrayEnd(reply); // >errors - - if (rc == RS_RESULT_TIMEDOUT) { - if (!(req->reqflags & QEXEC_F_IS_CURSOR) && !IsProfile(req) && - req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) { - RedisModule_ReplyKV_LongLong(reply, "total_results", 0); - } else { - rc = RS_RESULT_OK; - RedisModule_ReplyKV_LongLong(reply, "total_results", req->qiter.totalResults); - } - } else if (rc == RS_RESULT_ERROR) { - RedisModule_ReplyKV_LongLong(reply, "total_results", req->qiter.totalResults); + RedisModule_Reply_ArrayEnd(reply); nelem++; } else { - RedisModule_ReplyKV_LongLong(reply, "total_results", req->qiter.totalResults); + RedisModule_Reply_LongLong(reply, req->qiter.totalResults); } nelem++; - if (req->reqflags & QEXEC_FORMAT_EXPAND) { - RedisModule_ReplyKV_SimpleString(reply, "format", "EXPAND"); // >format - } else { - RedisModule_ReplyKV_SimpleString(reply, "format", "STRING"); // >format + // Once we get here, we want to return the results we got from the pipeline (with no error) + if (req->reqflags & QEXEC_F_NOROWS || (rc != RS_RESULT_OK && rc != RS_RESULT_EOF)) { + goto done_2; } - RedisModule_ReplyKV_Array(reply, "results"); // >results - nelem = 0; - - if (rc == RS_RESULT_OK && rp->parent->resultLimit && !(req->reqflags & QEXEC_F_NOROWS)) { - serializeResult(req, reply, &r, &cv); - nelem++; + // If the policy is `ON_TIMEOUT FAIL`, we already aggregated the results + if (results != NULL) { + nelem += populateReplyWithResults(reply, results, req, &cv); + results = NULL; + goto done_2; } - SearchResult_Clear(&r); - if (rc != RS_RESULT_OK || !rp->parent->resultLimit) { - goto done_3; + if (rp->parent->resultLimit && rc == RS_RESULT_OK) { + nelem += serializeResult(req, reply, &r, &cv); + } else { + goto done_2; } + SearchResult_Clear(&r); while (--rp->parent->resultLimit && (rc = rp->Next(rp, &r)) == RS_RESULT_OK) { - if (!(req->reqflags & QEXEC_F_NOROWS)) { - serializeResult(req, reply, &r, &cv); - nelem++; - } - // Serialize it as a search result + nelem += serializeResult(req, reply, &r, &cv); SearchResult_Clear(&r); } -done_3: - SearchResult_Destroy(&r); +done_2: + RedisModule_Reply_ArrayEnd(reply); // - if (rc != RS_RESULT_OK && - !(rc == RS_RESULT_TIMEDOUT && req->reqConfig.timeoutPolicy == TimeoutPolicy_Return)) { - req->stateflags |= QEXEC_S_ITERDONE; + if (results) { + destroyResults(results); + } else { + SearchResult_Destroy(&r); } // Reset the total results length: req->qiter.totalResults = 0; - RedisModule_Reply_ArrayEnd(reply); // >results - } - //------------------------------------------------------------------------------------------- - else // ! has_map (RESP2 variant) - { - rc = rp->Next(rp, &r); - long resultsLen = REDISMODULE_POSTPONED_ARRAY_LEN; - if (rc == RS_RESULT_TIMEDOUT && !(req->reqflags & QEXEC_F_IS_CURSOR) && !IsProfile(req) && - req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) { - resultsLen = 1; - } else if (rc == RS_RESULT_ERROR) { - resultsLen = 2; - } else if (req->reqflags & QEXEC_F_IS_SEARCH && rc != RS_RESULT_TIMEDOUT && - req->optimizer->type != Q_OPT_NO_SORTER) { - PLN_ArrangeStep *arng = AGPLN_GetArrangeStep(&req->ap); - size_t reqLimit = arng && arng->isLimited? arng->limit : DEFAULT_LIMIT; - size_t reqOffset = arng && arng->isLimited? arng->offset : 0; - size_t resultFactor = getResultsFactor(req); - size_t expected_res = reqLimit + reqOffset <= req->maxSearchResults ? req->qiter.totalResults : MIN(req->maxSearchResults, req->qiter.totalResults); - size_t reqResults = expected_res > reqOffset ? expected_res - reqOffset : 0; + if (rc != RS_RESULT_OK && + !(rc == RS_RESULT_TIMEDOUT && req->reqConfig.timeoutPolicy == TimeoutPolicy_Return)) { + req->stateflags |= QEXEC_S_ITERDONE; + } - resultsLen = 1 + MIN(limit, MIN(reqLimit, reqResults)) * resultFactor; + if (resultsLen != REDISMODULE_POSTPONED_ARRAY_LEN && rc == RS_RESULT_OK && resultsLen != nelem) { + RedisModule_Log(RSDummyContext, "warning", "Failed to predict the number of replied results. Prediction=%ld, actual_number=%ld.", resultsLen, nelem); + RS_LOG_ASSERT(0, "Precalculated number of replies must be equal to actual number"); } +} - RedisModule_Reply_Array(reply); // results @@ +/** + * Sends a chunk of rows in the resp3 format +*/ +static void sendChunk_Resp3(AREQ *req, RedisModule_Reply *reply, size_t limit, + cachedVars cv) { + SearchResult r = {0}; + int rc = RS_RESULT_EOF; + ResultProcessor *rp = req->qiter.endProc; + SearchResult **results = NULL; + + if (req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) { + // Aggregate all results before populating the response + results = AggregateResults(rp, &rc); + // Check timeout after aggregation + if (TimedOut(&RP_SCTX(rp)->timeout) == TIMED_OUT) { + rc = RS_RESULT_TIMEDOUT; + } + } else { + // Send the results received from the pipeline as they come (no need to aggregate) + rc = rp->Next(rp, &r); + } if (IsOptimized(req)) { QOptimizer_UpdateTotalResults(req); } + // + RedisModule_ReplyKV_Array(reply, "attributes"); + RedisModule_Reply_ArrayEnd(reply); + + // TODO: Move this to after the results section, so that we can report the + // error even if we respond with results (`ON_TIMEOUT RETURN`). + // + RedisModule_ReplyKV_Array(reply, "error"); // >errors if (rc == RS_RESULT_TIMEDOUT) { - if (!(req->reqflags & QEXEC_F_IS_CURSOR) && !IsProfile(req) && - req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) { - RedisModule_Reply_SimpleString(reply, "Timeout limit was reached"); - } else { - rc = RS_RESULT_OK; - RedisModule_Reply_LongLong(reply, req->qiter.totalResults); - } + ReplyWithTimeoutError(reply); } else if (rc == RS_RESULT_ERROR) { - RedisModule_Reply_LongLong(reply, req->qiter.totalResults); - RedisModule_Reply_Array(reply); - // QueryError_ReplyAndClear(reply->ctx, req->qiter.err); - RedisModule_Reply_Error(reply, QueryError_GetError(req->qiter.err)); - QueryError_ClearError(req->qiter.err); - RedisModule_Reply_ArrayEnd(reply); - nelem++; + RedisModule_Reply_Error(reply, QueryError_GetError(req->qiter.err)); + QueryError_ClearError(req->qiter.err); + } + RedisModule_Reply_ArrayEnd(reply); // >errors + + // TODO: Move this to after the results section, so that we can report the + // correct number of returned results. + // + if (ShouldReplyWithTimeoutError(rc, req)) { + RedisModule_ReplyKV_LongLong(reply, "total_results", 0); + } else if (rc == RS_RESULT_TIMEDOUT) { + // Set rc to OK such that we will respond with the partial results + rc = RS_RESULT_OK; + RedisModule_ReplyKV_LongLong(reply, "total_results", req->qiter.totalResults); } else { - RedisModule_Reply_LongLong(reply, req->qiter.totalResults); + RedisModule_ReplyKV_LongLong(reply, "total_results", req->qiter.totalResults); } - nelem++; - if (rc == RS_RESULT_OK && rp->parent->resultLimit && !(req->reqflags & QEXEC_F_NOROWS)) { - nelem += serializeResult(req, reply, &r, &cv); + // + if (req->reqflags & QEXEC_FORMAT_EXPAND) { + RedisModule_ReplyKV_SimpleString(reply, "format", "EXPAND"); // >format + } else { + RedisModule_ReplyKV_SimpleString(reply, "format", "STRING"); // >format } - SearchResult_Clear(&r); - if (rc != RS_RESULT_OK || !rp->parent->resultLimit) { - goto done_2; + // + RedisModule_ReplyKV_Array(reply, "results"); // >results + + if (req->reqflags & QEXEC_F_NOROWS || (rc != RS_RESULT_OK && rc != RS_RESULT_EOF)) { + goto done_3; } - while (--rp->parent->resultLimit && (rc = rp->Next(rp, &r)) == RS_RESULT_OK) { - if (!(req->reqflags & QEXEC_F_NOROWS)) { - nelem += serializeResult(req, reply, &r, &cv); + if (results != NULL) { + populateReplyWithResults(reply, results, req, &cv); + results = NULL; + } else { + if (rp->parent->resultLimit && rc == RS_RESULT_OK) { + serializeResult(req, reply, &r, &cv); } - // Serialize it as a search result + SearchResult_Clear(&r); + if (rc != RS_RESULT_OK || !rp->parent->resultLimit) { + goto done_3; + } + + while (--rp->parent->resultLimit && (rc = rp->Next(rp, &r)) == RS_RESULT_OK) { + serializeResult(req, reply, &r, &cv); + // Serialize it as a search result + SearchResult_Clear(&r); + } } - done_2: - SearchResult_Destroy(&r); +done_3: + RedisModule_Reply_ArrayEnd(reply); // >results + + if (results) { + destroyResults(results); + } else { + SearchResult_Destroy(&r); + } - if ((rc == RS_RESULT_TIMEDOUT && req->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) - || rc == RS_RESULT_EOF || rc == RS_RESULT_ERROR) { + if (rc != RS_RESULT_OK && + !(rc == RS_RESULT_TIMEDOUT && req->reqConfig.timeoutPolicy == TimeoutPolicy_Return)) { req->stateflags |= QEXEC_S_ITERDONE; } - RedisModule_Reply_ArrayEnd(reply); // results // Reset the total results length: req->qiter.totalResults = 0; - if (resultsLen == REDISMODULE_POSTPONED_ARRAY_LEN || rc != RS_RESULT_OK) { - return; - } - if (resultsLen != nelem) { - RedisModule_Log(RSDummyContext, "warning", "Failed predict number of replied, prediction=%ld, actual_number=%ld.", resultsLen, nelem); - RS_LOG_ASSERT(0, "Precalculated number of replies must be equal to actual number"); - } +} + +/** + * Sends a chunk of rows, optionally also sending the preamble + */ +void sendChunk(AREQ *req, RedisModule_Reply *reply, size_t limit) { + if (!(req->reqflags & QEXEC_F_IS_CURSOR) && !(req->reqflags & QEXEC_F_IS_SEARCH)) { + limit = req->maxAggregateResults; + } + + cachedVars cv = { + .lastLk = AGPLN_GetLookup(&req->ap, NULL, AGPLN_GETLOOKUP_LAST), + .lastAstp = AGPLN_GetArrangeStep(&req->ap) + }; + + // Set the chunk size limit for the query + req->qiter.resultLimit = limit; + + bool has_map = RedisModule_HasMap(reply); + + if (has_map) { + sendChunk_Resp3(req, reply, limit, cv); + } else { + sendChunk_Resp2(req, reply, limit, cv); } - //------------------------------------------------------------------------------------------- } void AREQ_Execute(AREQ *req, RedisModuleCtx *ctx) { @@ -622,7 +729,7 @@ int prepareExecutionPlan(AREQ *req, QueryError *status) { QOptimizer_Iterators(req, req->optimizer); } - TimedOut_WithStatus(&req->timeoutTime, status); + TimedOut_WithStatus(&sctx->timeout, status); if (QueryError_HasError(status)) { return REDISMODULE_ERR; diff --git a/src/aggregate/aggregate_request.c b/src/aggregate/aggregate_request.c index 0e2c889b2d..c51a13438e 100644 --- a/src/aggregate/aggregate_request.c +++ b/src/aggregate/aggregate_request.c @@ -1284,7 +1284,7 @@ static void buildImplicitPipeline(AREQ *req, QueryError *Status) { RLookup_Init(first, cache); - ResultProcessor *rp = RPIndexIterator_New(req->rootiter, req->timeoutTime); + ResultProcessor *rp = RPIndexIterator_New(req->rootiter); ResultProcessor *rpUpstream = NULL; req->qiter.rootProc = req->qiter.endProc = rp; PUSH_RP(); diff --git a/src/info_command.c b/src/info_command.c index b142ecdd7b..dcb1d2a7a3 100644 --- a/src/info_command.c +++ b/src/info_command.c @@ -42,7 +42,7 @@ static void renderIndexOptions(RedisModule_Reply *reply, IndexSpec *sp) { RedisModule_Reply_ArrayEnd(reply); } -static int renderIndexDefinitions(RedisModule_Reply *reply, IndexSpec *sp) { +static void renderIndexDefinitions(RedisModule_Reply *reply, IndexSpec *sp) { SchemaRule *rule = sp->rule; RedisModule_ReplyKV_Map(reply, "index_definition"); // index_definition diff --git a/src/query_error.h b/src/query_error.h index b5cb85c509..923ada32e5 100644 --- a/src/query_error.h +++ b/src/query_error.h @@ -52,7 +52,7 @@ extern "C" { X(QUERY_ENODISTRIBUTE, "Could not distribute the operation") \ X(QUERY_EUNSUPPTYPE, "Unsupported index type") \ X(QUERY_ENOTNUMERIC, "Could not convert value to a number") \ - X(QUERY_TIMEDOUT, "Timeout limit was reached") \ + X(QUERY_ETIMEDOUT, "Timeout limit was reached") \ X(QUERY_ENOPARAM, "Parameter not found") \ X(QUERY_EDUPPARAM, "Parameter was specified twice") \ X(QUERY_EBADVAL, "Invalid value was given") \ diff --git a/src/result_processor.c b/src/result_processor.c index 07233ca683..ec5aa17cfa 100644 --- a/src/result_processor.c +++ b/src/result_processor.c @@ -30,6 +30,14 @@ void QITR_Cleanup(QueryIterator *qitr) { } } +// Allocates a new SearchResult, and populates it with `r`'s data (takes +// ownership as well) +SearchResult *SearchResult_Copy(SearchResult *r) { + SearchResult *ret = rm_malloc(sizeof(*ret)); + *ret = *r; + return ret; +} + void SearchResult_Clear(SearchResult *r) { // This won't affect anything if the result is null r->score = 0; @@ -63,8 +71,6 @@ void SearchResult_Destroy(SearchResult *r) { * downstream. *******************************************************************************************************************/ -// Get the index search context from the result processor -#define RP_SCTX(rpctx) ((rpctx)->parent->sctx) // Get the index spec from the result processor - this should be used only if the spec // can be accessed safely. #define RP_SPEC(rpctx) (RP_SCTX(rpctx)->spec) @@ -155,7 +161,7 @@ static void rpidxFree(ResultProcessor *iter) { rm_free(iter); } -ResultProcessor *RPIndexIterator_New(IndexIterator *root, struct timespec timeout) { +ResultProcessor *RPIndexIterator_New(IndexIterator *root) { RPIndexIterator *ret = rm_calloc(1, sizeof(*ret)); ret->iiter = root; ret->base.Next = rpidxNext; diff --git a/src/result_processor.h b/src/result_processor.h index d1c44975f9..ffa3fac975 100644 --- a/src/result_processor.h +++ b/src/result_processor.h @@ -170,6 +170,11 @@ typedef struct ResultProcessor { void (*Free)(struct ResultProcessor *self); } ResultProcessor; +/** + * This function allocates a new SearchResult, copies the data from `src` to it, + * and returns it. +*/ +SearchResult *SearchResult_Copy(SearchResult *r); /** * This function resets the search result, so that it may be reused again. @@ -183,7 +188,7 @@ void SearchResult_Clear(SearchResult *r); */ void SearchResult_Destroy(SearchResult *r); -ResultProcessor *RPIndexIterator_New(IndexIterator *itr, struct timespec timeoutTime); +ResultProcessor *RPIndexIterator_New(IndexIterator *itr); ResultProcessor *RPScorer_New(const ExtScoringFunctionCtx *funcs, const ScoringFunctionArgs *fnargs); diff --git a/src/util/timeout.h b/src/util/timeout.h index cf7298ae3b..d0e18dde54 100644 --- a/src/util/timeout.h +++ b/src/util/timeout.h @@ -1,9 +1,9 @@ -/* - * Copyright Redis Ltd. 2016 - present - * Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or - * the Server Side Public License v1 (SSPLv1). - */ - +/* + * Copyright Redis Ltd. 2016 - present + * Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or + * the Server Side Public License v1 (SSPLv1). + */ + #pragma once #include @@ -88,7 +88,7 @@ static inline int TimedOut_WithCtx(TimeoutCtx *ctx) { static inline int TimedOut_WithStatus(struct timespec *timeout, QueryError *status) { int rc = TimedOut(timeout); if (status && rc == TIMED_OUT) { - QueryError_SetCode(status, QUERY_TIMEDOUT); + QueryError_SetCode(status, QUERY_ETIMEDOUT); } return rc; } diff --git a/src/vector_index.c b/src/vector_index.c index 732857d4cf..1af62a507e 100644 --- a/src/vector_index.c +++ b/src/vector_index.c @@ -137,7 +137,7 @@ IndexIterator *NewVectorIterator(QueryEvalCtx *q, VectorQuery *vq, IndexIterator &qParams, vq->range.order); if (VecSimQueryReply_GetCode(results) == VecSim_QueryReply_TimedOut) { VecSimQueryReply_Free(results); - QueryError_SetError(q->status, QUERY_TIMEDOUT, NULL); + QueryError_SetError(q->status, QUERY_ETIMEDOUT, NULL); return NULL; } bool yields_metric = vq->scoreField != NULL; diff --git a/tests/pytests/test.py b/tests/pytests/test.py index 18ec47fdda..adbeab8833 100644 --- a/tests/pytests/test.py +++ b/tests/pytests/test.py @@ -2191,6 +2191,7 @@ def testTimeout(env): env.expect('ft.search', 'myIdx', 'aa*|aa*|aa*|aa* aa*', 'timeout', -1).error() env.expect('ft.search', 'myIdx', 'aa*|aa*|aa*|aa* aa*', 'timeout', 'STR').error() + # TODO: Modify this test, as now it will receive a timeout error, so the check is bad # check no time w/o sorter/grouper res = env.cmd('FT.AGGREGATE', 'myIdx', '*', 'LOAD', 1, 'geo', @@ -2228,7 +2229,7 @@ def testTimeout(env): while cursor != 0: r, cursor = env.cmd('FT.CURSOR', 'READ', 'myIdx', str(cursor)) l += (len(r) - 1) - env.assertEqual(l, 1000) + env.assertEqual(l, num_range) @skip(cluster=True) def testTimeoutOnSorter(env): diff --git a/tests/pytests/test_aggregate.py b/tests/pytests/test_aggregate.py index 22cec52cf0..6cd2e5b523 100644 --- a/tests/pytests/test_aggregate.py +++ b/tests/pytests/test_aggregate.py @@ -990,7 +990,7 @@ def populate_db(env): num_docs = 10000 * nshards pipeline = conn.pipeline(transaction=False) for i, t1 in enumerate(np.random.randint(1, 1024, num_docs)): - pipeline.hset (i, 't1', str(t1)) + pipeline.hset(i, 't1', str(t1)) if i % 1000 == 0: pipeline.execute() pipeline = conn.pipeline(transaction=False) @@ -1005,10 +1005,11 @@ def aggregate_test(protocol=2): raise unittest.SkipTest("Unsupported protocol") env = Env(moduleArgs='DEFAULT_DIALECT 2 ON_TIMEOUT FAIL', protocol=protocol) + conn = getConnectionByEnv(env) populate_db(env) - res = env.execute_command('FT.AGGREGATE', 'idx', '*', + res = conn.execute_command('FT.AGGREGATE', 'idx', '*', 'LOAD', '2', '@t1', '@__key', 'APPLY', '@t1 ^ @t1', 'AS', 't1exp', 'groupby', '2', '@t1', '@t1exp', @@ -1020,6 +1021,17 @@ def aggregate_test(protocol=2): else: env.assertEqual(res['error'], ['Timeout limit was reached']) + # Tests MOD-5948 - An `FT.AGGREGATE` command with no depleting result-processors + # should return a timeout (rather than results) + res = conn.execute_command( + 'FT.AGGREGATE', 'idx', '*', 'LOAD', '1', '@t1', 'TIMEOUT', '1' + ) + + if protocol == 2: + env.assertEqual(res, ['Timeout limit was reached']) + else: + env.assertEqual(res['error'], ['Timeout limit was reached']) + def test_aggregate_timeout_resp2(): aggregate_test(protocol=2) diff --git a/tests/pytests/test_coordinator.py b/tests/pytests/test_coordinator.py index a6f8c69005..05a2e3d4d0 100644 --- a/tests/pytests/test_coordinator.py +++ b/tests/pytests/test_coordinator.py @@ -151,11 +151,10 @@ def test_timeout(): env.assertContains('Timeout limit was reached', str(res[0])) # Client cursor mid execution - # If the cursor id is 0, this means there was a timeout throughout execution - # caught by the coordinator - res, cursor = conn.execute_command('FT.AGGREGATE', 'idx', '*', 'LOAD', '*', - 'WITHCURSOR', 'COUNT', n_docs, 'timeout', '1') - env.assertEqual(cursor, 0) + res, _ = conn.execute_command('FT.AGGREGATE', 'idx', '*', 'LOAD', '1', '@t1', + 'GROUPBY', '1', '@t1', 'WITHCURSOR', 'COUNT', + str(n_docs), 'TIMEOUT', 1) + env.assertEqual(res, [0]) # FT.SEARCH res = conn.execute_command('FT.SEARCH', 'idx', '*', 'LIMIT', '0', n_docs,