Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report indexing failures on ft.info - MOD-5364 #3682

Merged
merged 54 commits into from Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
c4d2cfd
scaffolding
DvirDukhan Jun 21, 2023
7917c39
per field info + vector test
DvirDukhan Jun 27, 2023
e700b07
numeric
DvirDukhan Jun 27, 2023
741b5f2
cosmetics
DvirDukhan Jun 27, 2023
af838c9
wip
DvirDukhan Jun 29, 2023
e03602f
wip
DvirDukhan Jul 2, 2023
181dbb5
added exception message for hash and JSON failures
DvirDukhan Jul 8, 2023
68d11b3
wip
DvirDukhan Jul 11, 2023
8399339
wip
DvirDukhan Jul 12, 2023
74d9095
pytest pass for index error
DvirDukhan Jul 16, 2023
111f5fa
wip
DvirDukhan Jul 21, 2023
b0e1789
tests pass + coordinator pass
DvirDukhan Jul 23, 2023
5c4ea5a
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Jul 23, 2023
c4f0692
removed warnings
DvirDukhan Jul 24, 2023
d079e86
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Jul 24, 2023
ba42642
free resources
DvirDukhan Jul 24, 2023
6618682
clear field spec info
DvirDukhan Jul 25, 2023
2d767d2
added const to reply
DvirDukhan Jul 25, 2023
628cd75
free resources
DvirDukhan Jul 25, 2023
d7f1715
cast to remove const
DvirDukhan Jul 25, 2023
330e805
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Jul 25, 2023
3c09fca
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Jul 26, 2023
ff8e361
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Jul 26, 2023
3db65ad
sanitizer build warning fix
DvirDukhan Jul 26, 2023
1373de6
remove print
DvirDukhan Jul 26, 2023
93a1540
free resources
DvirDukhan Jul 26, 2023
e14d2ee
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Jul 26, 2023
f071c0a
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Jul 26, 2023
db56107
clear query errors
DvirDukhan Jul 26, 2023
0d41738
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Aug 7, 2023
1464b91
Merge branch 'master' into dvirdu_index_failure_report
DvirDukhan Sep 12, 2023
d32349f
Merge remote-tracking branch 'origin/master' into dvirdu_index_failur…
GuyAv46 Dec 19, 2023
a493a79
post merge fixes
GuyAv46 Dec 19, 2023
f35a78e
minor fixes
GuyAv46 Dec 20, 2023
f092f3f
more fixes
GuyAv46 Dec 20, 2023
1eba5dd
Merge remote-tracking branch 'origin/master' into dvirdu_index_failur…
GuyAv46 Dec 20, 2023
a5bfab7
import fix
GuyAv46 Dec 20, 2023
d56da95
some tests changes
GuyAv46 Dec 20, 2023
3176866
Merge branch 'master' into dvirdu_index_failure_report
GuyAv46 Dec 20, 2023
05ee35f
fix incorrect offset
GuyAv46 Dec 21, 2023
022022f
remove unneeded helpers
GuyAv46 Dec 21, 2023
f981fe6
move field stats to the end of the info list
GuyAv46 Dec 21, 2023
c995b78
added a test
GuyAv46 Dec 21, 2023
396e0d3
align code
GuyAv46 Dec 21, 2023
8bcc860
fix for cluster
GuyAv46 Dec 21, 2023
014bbc4
simple review fixes
GuyAv46 Dec 24, 2023
041e5a6
added timestamps for indexing
GuyAv46 Dec 24, 2023
6021b66
added timespec to the struct
GuyAv46 Dec 24, 2023
c19c46d
improved test
GuyAv46 Dec 24, 2023
d1e11d7
Merge branch 'master' into dvirdu_index_failure_report
GuyAv46 Dec 24, 2023
6bcc44c
extend mock API
GuyAv46 Dec 24, 2023
c3d0d79
review fixes
GuyAv46 Dec 24, 2023
ac3c751
fix alignment
GuyAv46 Dec 26, 2023
2859ddc
fix misalignment
GuyAv46 Dec 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions CMakeLists.txt
Expand Up @@ -87,6 +87,7 @@ file(GLOB SOURCES
"src/query_parser/v2/*.c"
"src/util/*.c"
"src/trie/*.c"
"src/info/*.c"

"deps/cndict/cndict_data.c"
"deps/libnu/*.c"
Expand All @@ -111,15 +112,15 @@ set(FINAL_OBJECTS

if (BUILD_COORDINATOR)
if (BUILD_COORD_OSS)
add_library(redisearch-oss STATIC ${SOURCES} ${FINAL_OBJECTS} src/module-init/module-init.c)
add_library(redisearch-oss STATIC ${SOURCES} ${FINAL_OBJECTS} src/module-init/module-init.c coord/src/rmr/reply.c)

target_compile_definitions(redisearch-oss PRIVATE
RS_NO_ONLOAD
RS_NO_RMAPI
RS_CLUSTER_OSS)

elseif (BUILD_COORD_RLEC)
add_library(redisearch-enterprise STATIC ${SOURCES} ${FINAL_OBJECTS} src/module-init/module-init.c)
add_library(redisearch-enterprise STATIC ${SOURCES} ${FINAL_OBJECTS} src/module-init/module-init.c coord/src/rmr/reply.c)

target_compile_definitions(redisearch-enterprise PRIVATE
RS_NO_ONLOAD
Expand Down
4 changes: 2 additions & 2 deletions coord/src/dist_aggregate.c
Expand Up @@ -178,7 +178,7 @@ RSValue *MRReply_ToValue(MRReply *r) {
case MR_REPLY_STATUS:
case MR_REPLY_STRING: {
size_t l;
char *s = MRReply_String(r, &l);
const char *s = MRReply_String(r, &l);
v = RS_NewCopiedString(s, l);
// v = RS_StringValT(s, l, RSString_Volatile);
break;
Expand Down Expand Up @@ -397,7 +397,7 @@ static int rpnetNext(ResultProcessor *self, SearchResult *r) {

// If an error was returned, propagate it
if(MRReply_Type(nc->current.root) == MR_REPLY_ERROR) {
char *strErr = MRReply_String(nc->current.root, NULL);
const char *strErr = MRReply_String(nc->current.root, NULL);
if (!strErr
|| strcmp(strErr, "Timeout limit was reached")
|| nc->areq->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) {
Expand Down
123 changes: 100 additions & 23 deletions coord/src/info_command.c
Expand Up @@ -6,13 +6,14 @@

#include "info_command.h"
#include "resp3.h"
#include "info/field_spec_info.h"

// Type of field returned in INFO
typedef enum {
InfoField_WholeSum,
InfoField_DoubleSum,
InfoField_DoubleAverage,
InfoField_Max
InfoField_Max,
} InfoFieldType;

// Field specification
Expand Down Expand Up @@ -76,9 +77,13 @@
size_t total_l;
double total_d;
struct {
double avg;
double sum;
double count;
} avg;
struct {
char *str;
size_t len;
} str;
} u;
} InfoValue;

Expand All @@ -91,6 +96,8 @@
MRReply *indexOptions;
size_t *errorIndexes;
InfoValue toplevelValues[NUM_FIELDS_SPEC];
FieldSpecInfo *fieldSpecInfo_arr;
IndexError indexError;
InfoValue gcValues[NUM_GC_FIELDS_SPEC];
InfoValue cursorValues[NUM_CURSOR_FIELDS_SPEC];
InfoValue dialectValues[NUM_DIALECT_FIELDS_SPEC];
Expand All @@ -115,29 +122,72 @@
static void convertField(InfoValue *dst, MRReply *src, const InfoFieldSpec *spec) {
int type = spec->type;

if (type == InfoField_WholeSum) {
long long tmp;
MRReply_ToInteger(src, &tmp);
dst->u.total_l += tmp;
} else if (type == InfoField_DoubleSum) {
double d;
MRReply_ToDouble(src, &d);
dst->u.total_d += d;
} else if (type == InfoField_DoubleAverage) {
dst->u.avg.count++;
double d;
MRReply_ToDouble(src, &d);
dst->u.avg.avg += d;
} else if (type == InfoField_Max) {
long long newVal;
MRReply_ToInteger(src, &newVal);
if (dst->u.total_l < newVal) {
dst->u.total_l = newVal;
switch (type) {
case InfoField_WholeSum: {
long long tmp;
MRReply_ToInteger(src, &tmp);
dst->u.total_l += tmp;
break;
}
case InfoField_DoubleSum: {
double d;
MRReply_ToDouble(src, &d);
dst->u.total_d += d;
break;
}
case InfoField_DoubleAverage: {
dst->u.avg.count++;
double d;
MRReply_ToDouble(src, &d);
dst->u.avg.sum += d;
break;
}
case InfoField_Max: {
long long newVal;
MRReply_ToInteger(src, &newVal);
if (dst->u.total_l < newVal) {
dst->u.total_l = newVal;
}
break;
}
}
dst->isSet = 1;
}

// Extract an array of FieldSpecInfo from MRReply
void handleFieldStatistics(MRReply *src, InfoFields *fields) {
// Input validations
RedisModule_Assert(src && fields);
RedisModule_Assert(MRReply_Type(src) == MR_REPLY_ARRAY);

size_t len = MRReply_Length(src);
if (!fields->fieldSpecInfo_arr) {
// Lazy initialization
fields->fieldSpecInfo_arr = array_new(FieldSpecInfo, len);
for (size_t i = 0; i < len; i++) {
FieldSpecInfo fieldSpecInfo = FieldSpecInfo_Init();
fields->fieldSpecInfo_arr = array_append(fields->fieldSpecInfo_arr, fieldSpecInfo);
}
}

for (size_t i = 0; i < len; i++) {
MRReply *serializedFieldSpecInfo = MRReply_ArrayElement(src, i);
FieldSpecInfo fieldSpecInfo = FieldSpecInfo_Deserialize(serializedFieldSpecInfo);
FieldSpecInfo_OpPlusEquals(&fields->fieldSpecInfo_arr[i], &fieldSpecInfo);
FieldSpecInfo_Clear(&fieldSpecInfo); // Free Resources
}
}

static void handleIndexError(InfoFields *fields, MRReply *src) {
// Check if indexError is initialized
if (!IndexError_LastError(&fields->indexError)) {
fields->indexError = IndexError_Init();

Check warning on line 184 in coord/src/info_command.c

View check run for this annotation

Codecov / codecov/patch

coord/src/info_command.c#L184

Added line #L184 was not covered by tests
}
IndexError indexError = IndexError_Deserialize(src);
IndexError_OpPlusEquals(&fields->indexError, &indexError);
GuyAv46 marked this conversation as resolved.
Show resolved Hide resolved
IndexError_Clear(indexError); // Free Resources
}

// Handle fields which aren't InfoValue types
static void handleSpecialField(InfoFields *fields, const char *name, MRReply *value) {
if (!strcmp(name, "index_name")) {
Expand Down Expand Up @@ -168,6 +218,10 @@
processKvArray(fields, value, fields->cursorValues, cursorSpecs, NUM_CURSOR_FIELDS_SPEC, 1);
} else if (!strcmp(name, "dialect_stats")) {
processKvArray(fields, value, fields->dialectValues, dialectSpecs, NUM_DIALECT_FIELDS_SPEC, 1);
} else if (!strcmp(name, "field statistics")) {
handleFieldStatistics(value, fields);
} else if (!strcmp(name, IndexError_ObjectName)) {
handleIndexError(fields, value);
}
}

Expand Down Expand Up @@ -204,6 +258,15 @@
}

static void cleanInfoReply(InfoFields *fields) {
if (fields->fieldSpecInfo_arr) {
// Clear the info fields
for (size_t i = 0; i < array_len(fields->fieldSpecInfo_arr); i++) {
FieldSpecInfo_Clear(&fields->fieldSpecInfo_arr[i]);
}
array_free(fields->fieldSpecInfo_arr);
fields->fieldSpecInfo_arr = NULL;
}
IndexError_Clear(fields->indexError);
rm_free(fields->errorIndexes);
}

Expand All @@ -224,7 +287,7 @@
RedisModule_ReplyKV_Double(reply, key, source->u.total_d);
} else if (type == InfoField_DoubleAverage) {
if (source->u.avg.count) {
RedisModule_ReplyKV_Double(reply, key, source->u.avg.avg / source->u.avg.count);
RedisModule_ReplyKV_Double(reply, key, source->u.avg.sum / source->u.avg.count);
} else {
RedisModule_ReplyKV_Double(reply, key, 0);
}
Expand All @@ -241,7 +304,7 @@
if (fields->indexName) {
RedisModule_ReplyKV_StringBuffer(reply, "index_name", fields->indexName, fields->indexNameLen);
}

if (fields->indexDef) {
RedisModule_ReplyKV_MRReply(reply, "index_definition", fields->indexDef);
}
Expand All @@ -268,12 +331,26 @@

replyKvArray(reply, fields, fields->toplevelValues, toplevelSpecs_g, NUM_FIELDS_SPEC);


// Global index error stats
RedisModule_Reply_SimpleString(reply, IndexError_ObjectName);
IndexError_Reply(&fields->indexError, reply, 0);

if (fields->fieldSpecInfo_arr) {
RedisModule_ReplyKV_Array(reply, "field statistics"); //Field statistics
for (size_t i = 0; i < array_len(fields->fieldSpecInfo_arr); ++i) {
FieldSpecInfo_Reply(&fields->fieldSpecInfo_arr[i], reply, 0);
}
RedisModule_Reply_ArrayEnd(reply); // >Field statistics
}


RedisModule_Reply_MapEnd(reply);
}

int InfoReplyReducer(struct MRCtx *mc, int count, MRReply **replies) {
// Summarize all aggregate replies
InfoFields fields = {0};
InfoFields fields = { .indexError = IndexError_Init() };
size_t numErrored = 0;
MRReply *firstError = NULL;
RedisModuleCtx *ctx = MRCtx_GetRedisCtx(mc);
Expand Down
22 changes: 10 additions & 12 deletions coord/src/module.c
Expand Up @@ -91,9 +91,9 @@ int uniqueStringsReducer(struct MRCtx *mc, int count, MRReply **replies) {
nArrs++;
for (size_t j = 0; j < MRReply_Length(replies[i]); j++) {
size_t sl = 0;
char *s = MRReply_String(MRReply_ArrayElement(replies[i], j), &sl);
const char *s = MRReply_String(MRReply_ArrayElement(replies[i], j), &sl);
if (s && sl) {
TrieMap_Add(dict, s, sl, NULL, NULL);
TrieMap_Add(dict, (char*)s, sl, NULL, NULL);
}
}
} else if (MRReply_Type(replies[i]) == MR_REPLY_ERROR && err == NULL) {
Expand Down Expand Up @@ -782,7 +782,7 @@ searchResult *newResult_resp2(searchResult *cached, MRReply *arr, int j, searchR
res->id = NULL;
return res;
}
res->id = MRReply_String(MRReply_ArrayElement(arr, j), &res->idLen);
res->id = (char*)MRReply_String(MRReply_ArrayElement(arr, j), &res->idLen);
if (!res->id) {
return res;
}
Expand Down Expand Up @@ -844,7 +844,7 @@ searchResult *newResult_resp3(searchResult *cached, MRReply *results, int j, sea
}

MRReply *result_id = MRReply_MapElement(result_j, "id");
res->id = MRReply_String(result_id, &res->idLen);
res->id = (char*)MRReply_String(result_id, &res->idLen);
if (!res->id) {
return res;
}
Expand Down Expand Up @@ -1513,14 +1513,11 @@ static int searchResultReducer_background(struct MRCtx *mc, int count, MRReply *
}

static bool should_return_error(MRReply *reply) {
// TODO: Replace second condition with a var instead of hard-coded string
char *errStr = MRReply_String(reply, NULL);
if (!errStr
|| strcmp(errStr, "Timeout limit was reached")
|| RSGlobalConfig.requestConfigParams.timeoutPolicy == TimeoutPolicy_Fail) {
return true;
}
return false;
// TODO: Replace third condition with a var instead of hard-coded string
const char *errStr = MRReply_String(reply, NULL);
return (!errStr
|| RSGlobalConfig.requestConfigParams.timeoutPolicy == TimeoutPolicy_Fail
|| strcmp(errStr, "Timeout limit was reached"));
}

static bool should_return_timeout_error(searchRequestCtx *req) {
Expand Down Expand Up @@ -1927,6 +1924,7 @@ int InfoCommandHandler(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
}
RS_AutoMemory(ctx);
MRCommand cmd = MR_NewCommandFromRedisStrings(argc, argv);
MRCommand_Append(&cmd, WITH_INDEX_ERROR_TIME, strlen(WITH_INDEX_ERROR_TIME));
MRCommand_SetProtocol(&cmd, ctx);
MRCommand_SetPrefix(&cmd, "_FT");

Expand Down