From 6ceccaa8cdc497da7145f708873987470af3b218 Mon Sep 17 00:00:00 2001 From: Roi Lipman Date: Tue, 5 Mar 2024 12:35:50 +0200 Subject: [PATCH] GRAPH.COPY (#585) * initial work graph.copy * pipe serializer * abstract serializer * graph encoder switch to abstract serializer * switch decoder to abstract serializer * copy command wip * first copy * early flow test * additional tests * transition to worker thread execution * switch to worker thread * retry when fork failed * skip graph copy replication test when running under sanitizer * import SANITIZER * plant cloned key only after decoding is done * switch to CRON job * fork writes to file * add logs * fix leak * replicate via RESTORE, switch to FILE* * add graph.restore * fix unit-test wrong stream acquisition * fork requires gil * rename encoder/decoder v14 to latest --- src/commands/cmd_copy.c | 501 ++++++++++++++++++ src/commands/cmd_restore.c | 93 ++++ src/commands/commands.c | 1 + src/commands/commands.h | 7 +- src/errors/error_msgs.h | 1 + src/module.c | 10 + .../decoders/current/v14/decode_graph.c | 39 +- .../current/v14/decode_graph_entities.c | 66 +-- .../current/v14/decode_graph_schema.c | 60 ++- .../decoders/current/v14/decode_v14.h | 15 +- src/serializers/decoders/decode_graph.c | 8 +- src/serializers/encoder/encode_graph.c | 5 +- src/serializers/encoder/v14/encode_graph.c | 39 +- .../encoder/v14/encode_graph_entities.c | 64 +-- src/serializers/encoder/v14/encode_schema.c | 56 +- src/serializers/encoder/v14/encode_v14.h | 14 +- src/serializers/serializer_io.c | 248 +++++++++ src/serializers/serializer_io.h | 151 ++++++ src/serializers/serializers_include.h | 1 + tests/flow/graph_utils.py | 50 ++ tests/flow/test_effects.py | 38 +- tests/flow/test_graph_copy.py | 278 ++++++++++ tests/unit/test_serializer.c | 173 ++++++ 23 files changed, 1702 insertions(+), 216 deletions(-) create mode 100644 src/commands/cmd_copy.c create mode 100644 src/commands/cmd_restore.c create mode 100644 src/serializers/serializer_io.c create mode 100644 src/serializers/serializer_io.h create mode 100644 tests/flow/graph_utils.py create mode 100644 tests/flow/test_graph_copy.py create mode 100644 tests/unit/test_serializer.c diff --git a/src/commands/cmd_copy.c b/src/commands/cmd_copy.c new file mode 100644 index 000000000..b81bfe3ba --- /dev/null +++ b/src/commands/cmd_copy.c @@ -0,0 +1,501 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +// copying a graph is performed in a number of steps: +// +// 1. a cron task is created with the responsibility of creating a fork +// +// 2. the forked child process encodes the graph into a temporary file +// once done the child exists and a callback is invoked on Redis main thread +// +// 3. a second cron task is created with the responsibility of decoding the +// dumped file and creating a new graph key +// +// +// +// ┌────────────────┐ ┌────────────────┐ +// │ │ │ │ +// │ Cron Task │ │ Child │ +// │ │ │ │ +// │ │ │ │ ┌────────┐ +// │ │ │ │ │ │ +// │ │ │ │ │ Dump │ +// │ Fork ├─────────────────►│ ├──────►│ Graph │ +// │ │ │ │ │ │ +// │ │ │ │ │ │ +// └────────────────┘ └───────┬────────┘ │ │ +// │ │ │ +// │ └────────┘ +// │ ▲ +// ┌────────────────┐ │ │ +// │ │ │ │ +// │ Main thread │ Done callback │ │ +// │ │◄─────────────────────────┘ │ +// │ │ │ +// │ │ │ +// │ │ │ +// └────────┬───────┘ │ +// │ │ +// │ │ +// ▼ │ +// ┌────────────────┐ │ +// │ │ │ +// │ Cron Task │ │ +// │ │ │ +// │ │ │ +// │ Decode Graph ├───────────────────────────────────────────────┘ +// │ │ +// │ │ +// │ │ +// │ │ +// └────────────────┘ + + +#include "RG.h" +#include "../cron/cron.h" +#include "../util/uuid.h" +#include "../redismodule.h" +#include "../graph/graphcontext.h" +#include "../serializers/serializer_io.h" +#include "../serializers/encoder/v14/encode_v14.h" +#include "../serializers/decoders/current/v14/decode_v14.h" + +#include +#include +#include +#include + +extern RedisModuleType *GraphContextRedisModuleType; + +// GRAPH.COPY command context +typedef struct { + const char *src; // src graph id + const char *dest; // dest graph id + char *path; // path to dumped graph on disk + RedisModuleString *rm_src; // redismodule string src + RedisModuleString *rm_dest; // redismodule string dest + RedisModuleBlockedClient *bc; // blocked client +} GraphCopyContext; + +// return a full path to a temporary dump file +// e.g. /tmp/.dump +static char *_temp_file(void) { + char *uuid = UUID_New(); + char *path; + asprintf(&path, "/tmp/%s.dump", uuid); + rm_free(uuid); + + return path; +} + +// create a new graph copy context +static GraphCopyContext *GraphCopyContext_New +( + RedisModuleBlockedClient *bc, // blocked client + RedisModuleString *src, // src graph key name + RedisModuleString *dest // destination graph key name +) { + ASSERT(bc != NULL); + ASSERT(src != NULL); + ASSERT(dest != NULL); + + GraphCopyContext *ctx = rm_malloc(sizeof(GraphCopyContext)); + + ctx->bc = bc; + ctx->path = _temp_file(); + ctx->rm_src = src; + ctx->rm_dest = dest; + ctx->src = RedisModule_StringPtrLen(src, NULL); + ctx->dest = RedisModule_StringPtrLen(dest, NULL); + + return ctx; +} + +// free graph copy context +static void GraphCopyContext_Free +( + GraphCopyContext *copy_ctx // context to free +) { + ASSERT(copy_ctx != NULL); + + // delete file in case it exists, no harm if file is missing + RedisModule_Log(NULL, REDISMODULE_LOGLEVEL_NOTICE, + "deleting dumped graph file: %s", copy_ctx->path); + remove(copy_ctx->path); + + free(copy_ctx->path); + + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(copy_ctx->bc); + RedisModule_FreeString(ctx, copy_ctx->rm_src); + RedisModule_FreeString(ctx, copy_ctx->rm_dest); + RedisModule_UnblockClient(copy_ctx->bc, NULL); + RedisModule_FreeThreadSafeContext(ctx); + + rm_free(copy_ctx); +} + +// encode graph to disk +// this function should run on a child process, giving us the guarantees: +// 1. the cloned graph wouldn't change +// 2. due to memory seperation we do not need to hold any locks +// 3. we're allowed to make modification to the graph e.g. rename +static int encode_graph +( + RedisModuleCtx *ctx, // redis module context + GraphContext *gc, // graph to clone + GraphCopyContext *copy_ctx // graph copy context +) { + // validations + ASSERT(gc != NULL); + ASSERT(ctx != NULL); + ASSERT(copy_ctx != NULL); + + int res = 0; // 0 indicates success + FILE *f = NULL; + SerializerIO io = NULL; + + // rename graph, needed by the decoding procedure + // when the graph is decoded it is already holds the target name + GraphContext_Rename(ctx, gc, copy_ctx->dest); + + //-------------------------------------------------------------------------- + // serialize graph to file + //-------------------------------------------------------------------------- + + // open dump file + // write only, create if missing, truncate if exists + // grant READ access to group (0644) + f = fopen(copy_ctx->path, "wb"); + if(f == NULL) { + // failed to open file + res = 1; // indicate error + goto cleanup; + } + + // create serializer + io = SerializerIO_FromStream(f); + ASSERT(io != NULL); + + // encode graph to disk + RedisModule_Log(NULL, REDISMODULE_LOGLEVEL_NOTICE, "dump graph: %s to: %s", + copy_ctx->src, copy_ctx->path); + + RdbSaveGraph_latest(io, gc); + +cleanup: + + // free serializer + if(io != NULL) SerializerIO_Free(&io); + + // close file + if(f != NULL) fclose(f); + + // all done, no errors + return res; +} + +// load graph from file +static void LoadGraphFromFile +( + void *pdata // graph copy context +) { + ASSERT(pdata != NULL); + + GraphCopyContext *copy_ctx = (GraphCopyContext*)pdata; + + SerializerIO io = NULL; // graph decode stream + char *buffer = NULL; // dumped graph + FILE *stream = NULL; // memory stream over buffer + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(copy_ctx->bc); + + //-------------------------------------------------------------------------- + // decode graph from disk + //-------------------------------------------------------------------------- + + // open file + FILE *f = fopen(copy_ctx->path, "rb"); + if(f == NULL) { + RedisModule_ReplyWithError(ctx, "copy failed"); + goto cleanup; + } + + //-------------------------------------------------------------------------- + // load dumped file to memory + //-------------------------------------------------------------------------- + + // seek to the end of the file + fseek(f, 0, SEEK_END); + + // get current position, which is the size of the file + long fileLength = ftell(f); + + // seek to the beginning of the file + rewind(f); + + // allocate buffer to hold entire dumped graph + buffer = rm_malloc(sizeof(char) * fileLength); + + // read file content into buffer + fread(buffer, 1, fileLength, f); + + fclose(f); // close file + + //-------------------------------------------------------------------------- + // create memory stream + //-------------------------------------------------------------------------- + + stream = fmemopen(buffer, fileLength, "r"); + if(stream == NULL) { + RedisModule_ReplyWithError(ctx, "copy failed"); + goto cleanup; + } + + // create serializer ontop of file descriptor + io = SerializerIO_FromStream(stream); + ASSERT(io != NULL); + + // decode graph from io + RedisModule_Log(NULL, REDISMODULE_LOGLEVEL_NOTICE, + "Decoding graph: %s from: %s", copy_ctx->dest, copy_ctx->path); + + GraphContext *gc = RdbLoadGraphContext_latest(io, copy_ctx->rm_dest); + ASSERT(gc != NULL); + + //-------------------------------------------------------------------------- + // add cloned graph to keyspace + //-------------------------------------------------------------------------- + + RedisModule_ThreadSafeContextLock(ctx); // lock GIL + + // make sure dest key does not exists + RedisModuleKey *key = + RedisModule_OpenKey(ctx, copy_ctx->rm_dest, REDISMODULE_READ); + int key_type = RedisModule_KeyType(key); + + RedisModule_CloseKey(key); + + if(key_type != REDISMODULE_KEYTYPE_EMPTY) { + // error! + RedisModule_ThreadSafeContextUnlock(ctx); // release GIL + + // free graph + GraphContext_DecreaseRefCount(gc); + + RedisModule_ReplyWithError(ctx, "copy failed"); + } else { + // create key + key = RedisModule_OpenKey(ctx, copy_ctx->rm_dest, REDISMODULE_WRITE); + + // set value in key + RedisModule_ModuleTypeSetValue(key, GraphContextRedisModuleType, gc); + + RedisModule_CloseKey(key); + + // replicate graph + // GRAPH.RESTORE dest + RedisModule_Replicate(ctx, "GRAPH.RESTORE", "cb", copy_ctx->dest, buffer, + fileLength); + + RedisModule_ThreadSafeContextUnlock(ctx); // release GIL + + // register graph context for BGSave + GraphContext_RegisterWithModule(gc); + + RedisModule_ReplyWithCString(ctx, "OK"); + } + +cleanup: + + // free serializer + if(io != NULL) SerializerIO_Free(&io); + + // close file descriptor + if(stream != NULL) fclose(stream); + + // free buffer + if(buffer != NULL) rm_free(buffer); + + // free copy context + GraphCopyContext_Free(copy_ctx); + + RedisModule_FreeThreadSafeContext(ctx); +} + +// fork done handler +// this function runs on Redis main thread +static void ForkDoneHandler +( + int exitcode, // fork return code + int bysignal, // how did fork terminated + void *user_data // private data (GraphCopyContext*) +) { + ASSERT(user_data != NULL); + + // check fork exit code + if(exitcode != 0 || bysignal != 0) { + GraphCopyContext *copy_ctx = (GraphCopyContext*)user_data; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(copy_ctx->bc); + RedisModule_ReplyWithError(ctx, "copy failed"); + // fork failed + GraphCopyContext_Free(copy_ctx); + RedisModule_FreeThreadSafeContext(ctx); + return; + } + + // perform decoding on a different thread to avoid blocking Redis + Cron_AddTask(0, LoadGraphFromFile, NULL, user_data); +} + +// implements GRAPH.COPY logic +// this function is expected to run on a cron thread +// avoiding blocking redis main thread while trying to create a fork +static void _Graph_Copy +( + void *context // graph copy context +) { + ASSERT(context != NULL); + + GraphCopyContext *copy_ctx = (GraphCopyContext*)context; + + bool error = false; + GraphContext *gc = NULL; + + RedisModuleString *rm_src = copy_ctx->rm_src; + RedisModuleString *rm_dest = copy_ctx->rm_dest; + RedisModuleBlockedClient *bc = copy_ctx->bc; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); + + //-------------------------------------------------------------------------- + // validations + //-------------------------------------------------------------------------- + + // lock GIL + RedisModule_ThreadSafeContextLock(ctx); + + // make sure dest key does not exists + RedisModuleKey *dest_key = + RedisModule_OpenKey(ctx, rm_dest, REDISMODULE_READ); + int dest_key_type = RedisModule_KeyType(dest_key); + RedisModule_CloseKey(dest_key); + + // make sure src key is a graph + gc = GraphContext_Retrieve(ctx, rm_src, true, false); + + // release GIL + RedisModule_ThreadSafeContextUnlock(ctx); + + // dest key shouldn't exists + if(dest_key_type != REDISMODULE_KEYTYPE_EMPTY) { + // destination key already exists, abort + error = true; + RedisModule_ReplyWithError(ctx, "destination key already exists"); + goto cleanup; + } + + // src key should be a graph + if(gc == NULL) { + // src graph is missing, abort + error = true; + // error alreay omitted by 'GraphContext_Retrieve' + goto cleanup; + } + + //-------------------------------------------------------------------------- + // fork process + //-------------------------------------------------------------------------- + + // child process will encode src graph to a file + // parent process will decode cloned graph from file + + int pid = -1; + while(pid == -1) { + // try to fork + RedisModule_ThreadSafeContextLock(ctx); // lock GIL + + // acquire READ lock on gc + // we do not want to fork while the graph is modified + // might be redundant, see: GraphContext_LockForCommit + Graph_AcquireReadLock(gc->g); + + pid = RedisModule_Fork(ForkDoneHandler, copy_ctx); + RedisModule_ThreadSafeContextUnlock(ctx); // release GIL + + // release graph READ lock + Graph_ReleaseLock(gc->g); + + if(pid < 0) { + // failed to fork! retry in a bit + // go to sleep for 5.0ms + struct timespec sleep_time; + sleep_time.tv_sec = 0; + sleep_time.tv_nsec = 5000000; + nanosleep(&sleep_time, NULL); + } else if(pid == 0) { + // managed to fork, in child process + // encode graph to disk + int res = encode_graph(ctx, gc, copy_ctx); + // all done, Redis require us to call 'RedisModule_ExitFromChild' + RedisModule_ExitFromChild(res); + return; + } + } + + // clean up +cleanup: + + // decrease src graph ref-count + if(gc != NULL) { + GraphContext_DecreaseRefCount(gc); + } + + if(error) { + // free command context only in the case of an error + // otherwise the fork callback is responsible for freeing this context + GraphCopyContext_Free(copy_ctx); + } + + RedisModule_FreeThreadSafeContext(ctx); +} + +// clone a graph +// this function executes on Redis main thread +// +// usage: +// GRAPH.COPY +int Graph_Copy +( + RedisModuleCtx *ctx, // redis module context + RedisModuleString **argv, // command argument + int argc // number of argument +) { + // validations + ASSERT(ctx != NULL); + ASSERT(argv != NULL); + + // expecting exactly 3 arguments: + // argv[0] command name + // argv[1] src_graph_id + // argv[2] dest_graph_id + if(argc != 3) { + return RedisModule_WrongArity(ctx); + } + + // block the client + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, + NULL, 0); + + // retain arguments + RedisModule_RetainString(ctx, argv[1]); + RedisModule_RetainString(ctx, argv[2]); + + // create command context + GraphCopyContext *context = GraphCopyContext_New(bc, argv[1], argv[2]); + + // add GRAPH.COPY as a cron task to run as soon as possible + Cron_AddTask(0, _Graph_Copy, NULL, context); + + return REDISMODULE_OK; +} + diff --git a/src/commands/cmd_restore.c b/src/commands/cmd_restore.c new file mode 100644 index 000000000..9c1ebe297 --- /dev/null +++ b/src/commands/cmd_restore.c @@ -0,0 +1,93 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +#include "RG.h" +#include "../graph/graphcontext.h" +#include "../serializers/serializer_io.h" +#include "../serializers/decoders/current/v14/decode_v14.h" + +extern RedisModuleType *GraphContextRedisModuleType; + +// restore a graph from binary representation +// this command is the counter part of GRAPH.COPY +// which replicates the cloned graph via GRAPH.RESTORE +// +// usage: +// GRAPH.RESTORE +// +// this function is ment to execute on redis main thread +int Graph_Restore +( + RedisModuleCtx *ctx, // redis module context + RedisModuleString **argv, // command argument + int argc // number of argument +) { + // validations + ASSERT(ctx != NULL); + ASSERT(argv != NULL); + + // expecting exactly 3 arguments: + // argv[0] command name + // argv[1] graph key + // argv[2] graph payload + if(argc != 3) { + return RedisModule_WrongArity(ctx); + } + + // TODO: reject GRAPH.RESTORE if caller isn't the master + + //-------------------------------------------------------------------------- + // make sure graph key doesn't exists + //-------------------------------------------------------------------------- + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + int key_type = RedisModule_KeyType(key); + RedisModule_CloseKey(key); + + // key exists, fail + if(key_type != REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_ReplyWithError(ctx, "restore graph failed, key already exists"); + return REDISMODULE_OK; + } + + //-------------------------------------------------------------------------- + // decode payload + //-------------------------------------------------------------------------- + + // create memory stream + size_t len; + const char *payload = RedisModule_StringPtrLen(argv[2], &len); + + FILE *stream = fmemopen((void*)payload, len, "r"); + ASSERT(stream != NULL); + + SerializerIO io = SerializerIO_FromStream(stream); + + // decode graph + GraphContext *gc = RdbLoadGraphContext_latest(io, argv[1]); + ASSERT(gc != NULL); + + // add graph to keyspace + key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + + // set value in key + RedisModule_ModuleTypeSetValue(key, GraphContextRedisModuleType, gc); + + RedisModule_CloseKey(key); + + // register graph context for BGSave + GraphContext_RegisterWithModule(gc); + + //-------------------------------------------------------------------------- + // clean up + //-------------------------------------------------------------------------- + + SerializerIO_Free(&io); + + fclose(stream); + + return REDISMODULE_OK; +} + diff --git a/src/commands/commands.c b/src/commands/commands.c index 45b26dc08..9dacdc98d 100644 --- a/src/commands/commands.c +++ b/src/commands/commands.c @@ -10,6 +10,7 @@ GRAPH_Commands CommandFromString(const char *cmd_name) { if (!strcasecmp(cmd_name, "graph.INFO")) return CMD_INFO; if (!strcasecmp(cmd_name, "graph.LIST")) return CMD_LIST; + if (!strcasecmp(cmd_name, "graph.COPY")) return CMD_COPY; if (!strcasecmp(cmd_name, "graph.QUERY")) return CMD_QUERY; if (!strcasecmp(cmd_name, "graph.DEBUG")) return CMD_DEBUG; if (!strcasecmp(cmd_name, "graph.EFFECT")) return CMD_EFFECT; diff --git a/src/commands/commands.h b/src/commands/commands.h index 9532bc9b2..83315533f 100644 --- a/src/commands/commands.h +++ b/src/commands/commands.h @@ -27,7 +27,9 @@ typedef enum { CMD_LIST = 9, CMD_DEBUG = 10, CMD_INFO = 11, - CMD_EFFECT = 12 + CMD_EFFECT = 12, + CMD_COPY = 13, + CMD_RESTORE = 14 } GRAPH_Commands; //------------------------------------------------------------------------------ @@ -42,10 +44,13 @@ void Graph_Explain(void *args); int Graph_List(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int Graph_Info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +int Graph_Copy(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int Graph_Debug(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int Graph_Delete(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int Graph_Effect(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int Graph_Config(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +int Graph_Restore(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int Graph_Slowlog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int CommandDispatch(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); int Graph_Constraint(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); + diff --git a/src/errors/error_msgs.h b/src/errors/error_msgs.h index 8234e8f10..f2b9cf7b3 100644 --- a/src/errors/error_msgs.h +++ b/src/errors/error_msgs.h @@ -6,6 +6,7 @@ #pragma once +#define EMSG_GRAPH_EXISTS "Graph %s already exists" #define EMSG_EMPTY_KEY "Encountered an empty key when opened key %s" #define EMSG_NON_GRAPH_KEY "Encountered a non-graph value type when opened key %s" #define EMSG_DIFFERENT_VALUE "Encountered different graph value when opened key %s" diff --git a/src/module.c b/src/module.c index 4a2af20da..e62c748f5 100644 --- a/src/module.c +++ b/src/module.c @@ -227,6 +227,16 @@ int RedisModule_OnLoad return REDISMODULE_ERR; } + if(RedisModule_CreateCommand(ctx, "graph.COPY", Graph_Copy, + "write deny-oom", 1, 2, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + if(RedisModule_CreateCommand(ctx, "graph.RESTORE", Graph_Restore, + "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + if(BoltApi_Register(ctx) == REDISMODULE_ERR) { return REDISMODULE_ERR; } diff --git a/src/serializers/decoders/current/v14/decode_graph.c b/src/serializers/decoders/current/v14/decode_graph.c index e7d544e29..613e74496 100644 --- a/src/serializers/decoders/current/v14/decode_graph.c +++ b/src/serializers/decoders/current/v14/decode_graph.c @@ -51,7 +51,7 @@ static void _InitGraphDataStructure static GraphContext *_DecodeHeader ( - RedisModuleIO *rdb + SerializerIO rdb ) { // Header format: // Graph name @@ -66,24 +66,24 @@ static GraphContext *_DecodeHeader // Schema // graph name - char *graph_name = RedisModule_LoadStringBuffer(rdb, NULL); + char *graph_name = SerializerIO_ReadBuffer(rdb, NULL); // each key header contains the following: // #nodes, #edges, #deleted nodes, #deleted edges, #labels matrices, #relation matrices - uint64_t node_count = RedisModule_LoadUnsigned(rdb); - uint64_t edge_count = RedisModule_LoadUnsigned(rdb); - uint64_t deleted_node_count = RedisModule_LoadUnsigned(rdb); - uint64_t deleted_edge_count = RedisModule_LoadUnsigned(rdb); - uint64_t label_count = RedisModule_LoadUnsigned(rdb); - uint64_t relation_count = RedisModule_LoadUnsigned(rdb); - uint64_t multi_edge[relation_count]; + uint64_t node_count = SerializerIO_ReadUnsigned(rdb); + uint64_t edge_count = SerializerIO_ReadUnsigned(rdb); + uint64_t deleted_node_count = SerializerIO_ReadUnsigned(rdb); + uint64_t deleted_edge_count = SerializerIO_ReadUnsigned(rdb); + uint64_t label_count = SerializerIO_ReadUnsigned(rdb); + uint64_t relation_count = SerializerIO_ReadUnsigned(rdb); + uint64_t multi_edge[relation_count]; for(uint i = 0; i < relation_count; i++) { - multi_edge[i] = RedisModule_LoadUnsigned(rdb); + multi_edge[i] = SerializerIO_ReadUnsigned(rdb); } // total keys representing the graph - uint64_t key_number = RedisModule_LoadUnsigned(rdb); + uint64_t key_number = SerializerIO_ReadUnsigned(rdb); GraphContext *gc = _GetOrCreateGraphContext(graph_name); Graph *g = gc->g; @@ -116,7 +116,7 @@ static GraphContext *_DecodeHeader static PayloadInfo *_RdbLoadKeySchema ( - RedisModuleIO *rdb + SerializerIO rdb ) { // Format: // #Number of payloads info - N @@ -124,23 +124,24 @@ static PayloadInfo *_RdbLoadKeySchema // Encode state // Number of entities encoded in this state. - uint64_t payloads_count = RedisModule_LoadUnsigned(rdb); + uint64_t payloads_count = SerializerIO_ReadUnsigned(rdb); PayloadInfo *payloads = array_new(PayloadInfo, payloads_count); for(uint i = 0; i < payloads_count; i++) { // for each payload // load its type and the number of entities it contains PayloadInfo payload_info; - payload_info.state = RedisModule_LoadUnsigned(rdb); - payload_info.entities_count = RedisModule_LoadUnsigned(rdb); + payload_info.state = SerializerIO_ReadUnsigned(rdb); + payload_info.entities_count = SerializerIO_ReadUnsigned(rdb); array_append(payloads, payload_info); } return payloads; } -GraphContext *RdbLoadGraphContext_v14 +GraphContext *RdbLoadGraphContext_latest ( - RedisModuleIO *rdb + SerializerIO rdb, + const RedisModuleString *rm_key_name ) { // Key format: @@ -197,7 +198,6 @@ GraphContext *RdbLoadGraphContext_v14 GraphDecodeContext_IncreaseProcessedKeyCount(gc->decoding_context); // before finalizing keep encountered meta keys names, for future deletion - const RedisModuleString *rm_key_name = RedisModule_GetKeyNameFromIO(rdb); const char *key_name = RedisModule_StringPtrLen(rm_key_name, NULL); // the virtual key name is not equal the graph name @@ -251,9 +251,6 @@ GraphContext *RdbLoadGraphContext_v14 ASSERT(Graph_Pending(g) == false); GraphDecodeContext_Reset(gc->decoding_context); - - RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); - RedisModule_Log(ctx, "notice", "Done decoding graph %s", gc->graph_name); } return gc; diff --git a/src/serializers/decoders/current/v14/decode_graph_entities.c b/src/serializers/decoders/current/v14/decode_graph_entities.c index 94bbde500..2c2cd6bf1 100644 --- a/src/serializers/decoders/current/v14/decode_graph_entities.c +++ b/src/serializers/decoders/current/v14/decode_graph_entities.c @@ -7,29 +7,29 @@ #include "decode_v14.h" // forward declarations -static SIValue _RdbLoadPoint(RedisModuleIO *rdb); -static SIValue _RdbLoadSIArray(RedisModuleIO *rdb); -static SIValue _RdbLoadVector(RedisModuleIO *rdb, SIType t); +static SIValue _RdbLoadPoint(SerializerIO rdb); +static SIValue _RdbLoadSIArray(SerializerIO rdb); +static SIValue _RdbLoadVector(SerializerIO rdb, SIType t); static SIValue _RdbLoadSIValue ( - RedisModuleIO *rdb + SerializerIO rdb ) { // Format: // SIType // Value - SIType t = RedisModule_LoadUnsigned(rdb); + SIType t = SerializerIO_ReadUnsigned(rdb); switch(t) { case T_INT64: - return SI_LongVal(RedisModule_LoadSigned(rdb)); + return SI_LongVal(SerializerIO_ReadSigned(rdb)); case T_DOUBLE: - return SI_DoubleVal(RedisModule_LoadDouble(rdb)); + return SI_DoubleVal(SerializerIO_ReadDouble(rdb)); case T_STRING: // transfer ownership of the heap-allocated string to the // newly-created SIValue - return SI_TransferStringVal(RedisModule_LoadStringBuffer(rdb, NULL)); + return SI_TransferStringVal(SerializerIO_ReadBuffer(rdb, NULL)); case T_BOOL: - return SI_BoolVal(RedisModule_LoadSigned(rdb)); + return SI_BoolVal(SerializerIO_ReadSigned(rdb)); case T_ARRAY: return _RdbLoadSIArray(rdb); case T_POINT: @@ -44,16 +44,16 @@ static SIValue _RdbLoadSIValue static SIValue _RdbLoadPoint ( - RedisModuleIO *rdb + SerializerIO rdb ) { - double lat = RedisModule_LoadDouble(rdb); - double lon = RedisModule_LoadDouble(rdb); + double lat = SerializerIO_ReadDouble(rdb); + double lon = SerializerIO_ReadDouble(rdb); return SI_Point(lat, lon); } static SIValue _RdbLoadSIArray ( - RedisModuleIO *rdb + SerializerIO rdb ) { /* loads array as unsinged : array legnth @@ -63,7 +63,7 @@ static SIValue _RdbLoadSIArray . array[array length -1] */ - uint arrayLen = RedisModule_LoadUnsigned(rdb); + uint arrayLen = SerializerIO_ReadUnsigned(rdb); SIValue list = SI_Array(arrayLen); for(uint i = 0; i < arrayLen; i++) { SIValue elem = _RdbLoadSIValue(rdb); @@ -75,7 +75,7 @@ static SIValue _RdbLoadSIArray static SIValue _RdbLoadVector ( - RedisModuleIO *rdb, + SerializerIO rdb, SIType t ) { ASSERT(t & T_VECTOR); @@ -90,13 +90,13 @@ static SIValue _RdbLoadVector SIValue vector; - uint32_t dim = RedisModule_LoadUnsigned(rdb); + uint32_t dim = SerializerIO_ReadUnsigned(rdb); vector = SI_Vectorf32(dim); float *values = SIVector_Elements(vector); for(uint32_t i = 0; i < dim; i++) { - values[i] = RedisModule_LoadFloat(rdb); + values[i] = SerializerIO_ReadFloat(rdb); } return vector; @@ -104,7 +104,7 @@ static SIValue _RdbLoadVector static void _RdbLoadEntity ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, GraphEntity *e ) { @@ -112,12 +112,12 @@ static void _RdbLoadEntity // #properties N // (name, value type, value) X N - uint64_t n = RedisModule_LoadUnsigned(rdb); + uint64_t n = SerializerIO_ReadUnsigned(rdb); SIValue vals[n]; AttributeID ids[n]; for(int i = 0; i < n; i++) { - ids[i] = RedisModule_LoadUnsigned(rdb); + ids[i] = SerializerIO_ReadUnsigned(rdb); vals[i] = _RdbLoadSIValue(rdb); } @@ -126,7 +126,7 @@ static void _RdbLoadEntity void RdbLoadNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t node_count ) { @@ -139,15 +139,15 @@ void RdbLoadNodes_v14 for(uint64_t i = 0; i < node_count; i++) { Node n; - NodeID id = RedisModule_LoadUnsigned(rdb); + NodeID id = SerializerIO_ReadUnsigned(rdb); // #labels M - uint64_t nodeLabelCount = RedisModule_LoadUnsigned(rdb); + uint64_t nodeLabelCount = SerializerIO_ReadUnsigned(rdb); // * (labels) x M LabelID labels[nodeLabelCount]; for(uint64_t i = 0; i < nodeLabelCount; i ++){ - labels[i] = RedisModule_LoadUnsigned(rdb); + labels[i] = SerializerIO_ReadUnsigned(rdb); } Serializer_Graph_SetNode(gc->g, id, labels, nodeLabelCount, &n); @@ -166,21 +166,21 @@ void RdbLoadNodes_v14 void RdbLoadDeletedNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_node_count ) { // Format: // node id X N for(uint64_t i = 0; i < deleted_node_count; i++) { - NodeID id = RedisModule_LoadUnsigned(rdb); + NodeID id = SerializerIO_ReadUnsigned(rdb); Serializer_Graph_MarkNodeDeleted(gc->g, id); } } void RdbLoadEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t edge_count ) { @@ -196,10 +196,10 @@ void RdbLoadEdges_v14 // construct connections for(uint64_t i = 0; i < edge_count; i++) { Edge e; - EdgeID edgeId = RedisModule_LoadUnsigned(rdb); - NodeID srcId = RedisModule_LoadUnsigned(rdb); - NodeID destId = RedisModule_LoadUnsigned(rdb); - uint64_t relation = RedisModule_LoadUnsigned(rdb); + EdgeID edgeId = SerializerIO_ReadUnsigned(rdb); + NodeID srcId = SerializerIO_ReadUnsigned(rdb); + NodeID destId = SerializerIO_ReadUnsigned(rdb); + uint64_t relation = SerializerIO_ReadUnsigned(rdb); Serializer_Graph_SetEdge(gc->g, gc->decoding_context->multi_edge[relation], edgeId, srcId, @@ -216,14 +216,14 @@ void RdbLoadEdges_v14 void RdbLoadDeletedEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_edge_count ) { // Format: // edge id X N for(uint64_t i = 0; i < deleted_edge_count; i++) { - EdgeID id = RedisModule_LoadUnsigned(rdb); + EdgeID id = SerializerIO_ReadUnsigned(rdb); Serializer_Graph_MarkEdgeDeleted(gc->g, id); } } diff --git a/src/serializers/decoders/current/v14/decode_graph_schema.c b/src/serializers/decoders/current/v14/decode_graph_schema.c index cf5e4a950..9bf8ca8d6 100644 --- a/src/serializers/decoders/current/v14/decode_graph_schema.c +++ b/src/serializers/decoders/current/v14/decode_graph_schema.c @@ -9,7 +9,7 @@ static void _RdbDecodeIndexField ( - RedisModuleIO *rdb, + SerializerIO rdb, char **name, // index field name IndexFieldType *type, // index field type double *weight, // index field option weight @@ -27,31 +27,31 @@ static void _RdbDecodeIndexField // dimension // decode field name - *name = RedisModule_LoadStringBuffer(rdb, NULL); + *name = SerializerIO_ReadBuffer(rdb, NULL); // docode field type - *type = RedisModule_LoadUnsigned(rdb); + *type = SerializerIO_ReadUnsigned(rdb); //-------------------------------------------------------------------------- // decode field options //-------------------------------------------------------------------------- // decode field weight - *weight = RedisModule_LoadDouble(rdb); + *weight = SerializerIO_ReadDouble(rdb); // decode field nostem - *nostem = RedisModule_LoadUnsigned(rdb); + *nostem = SerializerIO_ReadUnsigned(rdb); // decode field phonetic - *phonetic = RedisModule_LoadStringBuffer(rdb, NULL); + *phonetic = SerializerIO_ReadBuffer(rdb, NULL); // decode field dimension - *dimension = RedisModule_LoadUnsigned(rdb); + *dimension = SerializerIO_ReadUnsigned(rdb); } static void _RdbLoadIndex ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, Schema *s, bool already_loaded @@ -64,19 +64,19 @@ static void _RdbLoadIndex * M * property: {options} */ Index idx = NULL; - char *language = RedisModule_LoadStringBuffer(rdb, NULL); + char *language = SerializerIO_ReadBuffer(rdb, NULL); char **stopwords = NULL; - uint stopwords_count = RedisModule_LoadUnsigned(rdb); + uint stopwords_count = SerializerIO_ReadUnsigned(rdb); if(stopwords_count > 0) { stopwords = array_new(char *, stopwords_count); for (uint i = 0; i < stopwords_count; i++) { - char *stopword = RedisModule_LoadStringBuffer(rdb, NULL); + char *stopword = SerializerIO_ReadBuffer(rdb, NULL); array_append(stopwords, stopword); } } - uint fields_count = RedisModule_LoadUnsigned(rdb); + uint fields_count = SerializerIO_ReadUnsigned(rdb); for(uint i = 0; i < fields_count; i++) { IndexFieldType type; double weight; @@ -124,7 +124,7 @@ static void _RdbLoadIndex static void _RdbLoadConstaint ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, // graph context Schema *s, // schema to populate bool already_loaded // constraints already loaded @@ -140,13 +140,13 @@ static void _RdbLoadConstaint // decode constraint type //-------------------------------------------------------------------------- - ConstraintType t = RedisModule_LoadUnsigned(rdb); + ConstraintType t = SerializerIO_ReadUnsigned(rdb); //-------------------------------------------------------------------------- // decode constraint fields count //-------------------------------------------------------------------------- - uint8_t n = RedisModule_LoadUnsigned(rdb); + uint8_t n = SerializerIO_ReadUnsigned(rdb); //-------------------------------------------------------------------------- // decode constraint fields @@ -157,7 +157,7 @@ static void _RdbLoadConstaint // read fields for(uint8_t i = 0; i < n; i++) { - AttributeID attr = RedisModule_LoadUnsigned(rdb); + AttributeID attr = SerializerIO_ReadUnsigned(rdb); attr_ids[i] = attr; attr_strs[i] = GraphContext_GetAttributeString(gc, attr); } @@ -184,13 +184,13 @@ static void _RdbLoadConstaint // load schema's constraints static void _RdbLoadConstaints ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, // graph context Schema *s, // schema to populate bool already_loaded // constraints already loaded ) { // read number of constraints - uint constraint_count = RedisModule_LoadUnsigned(rdb); + uint constraint_count = SerializerIO_ReadUnsigned(rdb); for (uint i = 0; i < constraint_count; i++) { _RdbLoadConstaint(rdb, gc, s, already_loaded); @@ -199,7 +199,7 @@ static void _RdbLoadConstaints static void _RdbLoadSchema ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, SchemaType type, bool already_loaded @@ -214,8 +214,8 @@ static void _RdbLoadSchema */ Schema *s = NULL; - int id = RedisModule_LoadUnsigned(rdb); - char *name = RedisModule_LoadStringBuffer(rdb, NULL); + int id = SerializerIO_ReadUnsigned(rdb); + char *name = SerializerIO_ReadBuffer(rdb, NULL); if(!already_loaded) { s = Schema_New(type, id, name); @@ -234,7 +234,7 @@ static void _RdbLoadSchema // load indices //-------------------------------------------------------------------------- - uint index_count = RedisModule_LoadUnsigned(rdb); + uint index_count = SerializerIO_ReadUnsigned(rdb); for(uint index = 0; index < index_count; index++) { _RdbLoadIndex(rdb, gc, s, already_loaded); } @@ -246,15 +246,19 @@ static void _RdbLoadSchema _RdbLoadConstaints(rdb, gc, s, already_loaded); } -static void _RdbLoadAttributeKeys(RedisModuleIO *rdb, GraphContext *gc) { +static void _RdbLoadAttributeKeys +( + SerializerIO rdb, + GraphContext *gc +) { /* Format: * #attribute keys * attribute keys */ - uint count = RedisModule_LoadUnsigned(rdb); + uint count = SerializerIO_ReadUnsigned(rdb); for(uint i = 0; i < count; i ++) { - char *attr = RedisModule_LoadStringBuffer(rdb, NULL); + char *attr = SerializerIO_ReadBuffer(rdb, NULL); GraphContext_FindOrAddAttribute(gc, attr, NULL); RedisModule_Free(attr); } @@ -262,7 +266,7 @@ static void _RdbLoadAttributeKeys(RedisModuleIO *rdb, GraphContext *gc) { void RdbLoadGraphSchema_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, bool already_loaded ) { @@ -279,7 +283,7 @@ void RdbLoadGraphSchema_v14 _RdbLoadAttributeKeys(rdb, gc); // #Node schemas - uint schema_count = RedisModule_LoadUnsigned(rdb); + uint schema_count = SerializerIO_ReadUnsigned(rdb); // Load each node schema gc->node_schemas = array_ensure_cap(gc->node_schemas, schema_count); @@ -288,7 +292,7 @@ void RdbLoadGraphSchema_v14 } // #Edge schemas - schema_count = RedisModule_LoadUnsigned(rdb); + schema_count = SerializerIO_ReadUnsigned(rdb); // Load each edge schema gc->relation_schemas = array_ensure_cap(gc->relation_schemas, schema_count); diff --git a/src/serializers/decoders/current/v14/decode_v14.h b/src/serializers/decoders/current/v14/decode_v14.h index bf66febf2..b622a7779 100644 --- a/src/serializers/decoders/current/v14/decode_v14.h +++ b/src/serializers/decoders/current/v14/decode_v14.h @@ -8,42 +8,43 @@ #include "../../../serializers_include.h" -GraphContext *RdbLoadGraphContext_v14 +GraphContext *RdbLoadGraphContext_latest ( - RedisModuleIO *rdb + SerializerIO rdb, + const RedisModuleString *rm_key_name ); void RdbLoadNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t node_count ); void RdbLoadDeletedNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_node_count ); void RdbLoadEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t edge_count ); void RdbLoadDeletedEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_edge_count ); void RdbLoadGraphSchema_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, bool already_loaded ); diff --git a/src/serializers/decoders/decode_graph.c b/src/serializers/decoders/decode_graph.c index 5e33f0999..9b224ebda 100644 --- a/src/serializers/decoders/decode_graph.c +++ b/src/serializers/decoders/decode_graph.c @@ -8,6 +8,12 @@ #include "current/v14/decode_v14.h" GraphContext *RdbLoadGraph(RedisModuleIO *rdb) { - return RdbLoadGraphContext_v14(rdb); + const RedisModuleString *rm_key_name = RedisModule_GetKeyNameFromIO(rdb); + + SerializerIO io = SerializerIO_FromRedisModuleIO(rdb); + GraphContext *gc = RdbLoadGraphContext_latest(io, rm_key_name); + SerializerIO_Free(&io); + + return gc; } diff --git a/src/serializers/encoder/encode_graph.c b/src/serializers/encoder/encode_graph.c index f2fc3529c..b50f45f96 100644 --- a/src/serializers/encoder/encode_graph.c +++ b/src/serializers/encoder/encode_graph.c @@ -6,8 +6,11 @@ #include "encode_graph.h" #include "v14/encode_v14.h" +#include "../serializer_io.h" void RdbSaveGraph(RedisModuleIO *rdb, void *value) { - RdbSaveGraph_v14(rdb, value); + SerializerIO io = SerializerIO_FromRedisModuleIO(rdb); + RdbSaveGraph_latest(io, value); + SerializerIO_Free(&io); } diff --git a/src/serializers/encoder/v14/encode_graph.c b/src/serializers/encoder/v14/encode_graph.c index 90cf84140..f30eac9ba 100644 --- a/src/serializers/encoder/v14/encode_graph.c +++ b/src/serializers/encoder/v14/encode_graph.c @@ -1,7 +1,6 @@ /* - * Copyright Redis Ltd. 2018 - present - * Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or - * the Server Side Public License v1 (SSPLv1). + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). */ #include "encode_v14.h" @@ -15,7 +14,7 @@ static inline bool _shouldAcquireLocks(void) { static void _RdbSaveHeader ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc ) { // Header format: @@ -35,34 +34,34 @@ static void _RdbSaveHeader GraphEncodeHeader *header = &(gc->encoding_context->header); // graph name - RedisModule_SaveStringBuffer(rdb, header->graph_name, strlen(header->graph_name) + 1); + SerializerIO_WriteBuffer(rdb, header->graph_name, strlen(header->graph_name) + 1); // node count - RedisModule_SaveUnsigned(rdb, header->node_count); + SerializerIO_WriteUnsigned(rdb, header->node_count); // edge count - RedisModule_SaveUnsigned(rdb, header->edge_count); + SerializerIO_WriteUnsigned(rdb, header->edge_count); // deleted node count - RedisModule_SaveUnsigned(rdb, header->deleted_node_count); + SerializerIO_WriteUnsigned(rdb, header->deleted_node_count); // deleted edge count - RedisModule_SaveUnsigned(rdb, header->deleted_edge_count); + SerializerIO_WriteUnsigned(rdb, header->deleted_edge_count); // label matrix count - RedisModule_SaveUnsigned(rdb, header->label_matrix_count); + SerializerIO_WriteUnsigned(rdb, header->label_matrix_count); // relation matrix count - RedisModule_SaveUnsigned(rdb, header->relationship_matrix_count); + SerializerIO_WriteUnsigned(rdb, header->relationship_matrix_count); // does relationship Ri holds mutiple edges under a single entry X N for(int i = 0; i < header->relationship_matrix_count; i++) { // true if R[i] contain a multi edge entry - RedisModule_SaveUnsigned(rdb, header->multi_edge[i]); + SerializerIO_WriteUnsigned(rdb, header->multi_edge[i]); } // number of keys - RedisModule_SaveUnsigned(rdb, header->key_count); + SerializerIO_WriteUnsigned(rdb, header->key_count); // save graph schemas RdbSaveGraphSchema_v14(rdb, gc); @@ -112,7 +111,7 @@ static PayloadInfo _StatePayloadInfo // and returns it so the encoder can know how to encode the key static PayloadInfo *_RdbSaveKeySchema ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc ) { // Format: @@ -156,21 +155,21 @@ static PayloadInfo *_RdbSaveKeySchema // save the number of payloads uint payloads_count = array_len(payloads); - RedisModule_SaveUnsigned(rdb, payloads_count); + SerializerIO_WriteUnsigned(rdb, payloads_count); for(uint i = 0; i < payloads_count; i++) { // for each payload // save its type and the number of entities it contains PayloadInfo payload_info = payloads[i]; - RedisModule_SaveUnsigned(rdb, payload_info.state); - RedisModule_SaveUnsigned(rdb, payload_info.entities_count); + SerializerIO_WriteUnsigned(rdb, payload_info.state); + SerializerIO_WriteUnsigned(rdb, payload_info.entities_count); } return payloads; } -void RdbSaveGraph_v14 +void RdbSaveGraph_latest ( - RedisModuleIO *rdb, + SerializerIO rdb, void *value ) { // Encoding format for graph context and graph meta key: @@ -253,8 +252,6 @@ void RdbSaveGraph_v14 GraphEncodeContext_IncreaseProcessedKeyCount(gc->encoding_context); if(GraphEncodeContext_Finished(gc->encoding_context)) { GraphEncodeContext_Reset(gc->encoding_context); - RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb); - RedisModule_Log(ctx, "notice", "Done encoding graph %s", gc->graph_name); } // if a lock was acquired, release it diff --git a/src/serializers/encoder/v14/encode_graph_entities.c b/src/serializers/encoder/v14/encode_graph_entities.c index b6fbd06eb..422fa00f3 100644 --- a/src/serializers/encoder/v14/encode_graph_entities.c +++ b/src/serializers/encoder/v14/encode_graph_entities.c @@ -10,13 +10,13 @@ // forword decleration static void _RdbSaveSIValue ( - RedisModuleIO *rdb, + SerializerIO rdb, const SIValue *v ); static void _RdbSaveSIArray ( - RedisModuleIO *rdb, + SerializerIO rdb, const SIValue list ) { // saves array as @@ -28,7 +28,7 @@ static void _RdbSaveSIArray // array[array length -1] uint arrayLen = SIArray_Length(list); - RedisModule_SaveUnsigned(rdb, arrayLen); + SerializerIO_WriteUnsigned(rdb, arrayLen); for(uint i = 0; i < arrayLen; i ++) { SIValue value = SIArray_Get(list, i); _RdbSaveSIValue(rdb, &value); @@ -37,7 +37,7 @@ static void _RdbSaveSIArray static void _RdbSaveSIVector ( - RedisModuleIO *rdb, + SerializerIO rdb, SIValue v ) { // saves a vector @@ -49,7 +49,7 @@ static void _RdbSaveSIVector // vector[vector dimension -1] uint32_t dim = SIVector_Dim(v); - RedisModule_SaveUnsigned(rdb, dim); + SerializerIO_WriteUnsigned(rdb, dim); // get vector elements void *elements = SIVector_Elements(v); @@ -57,39 +57,39 @@ static void _RdbSaveSIVector // save individual elements float *values = (float*)elements; for(uint32_t i = 0; i < dim; i ++) { - RedisModule_SaveFloat(rdb, values[i]); + SerializerIO_WriteFloat(rdb, values[i]); } } static void _RdbSaveSIValue ( - RedisModuleIO *rdb, + SerializerIO rdb, const SIValue *v ) { // Format: // SIType // Value - RedisModule_SaveUnsigned(rdb, v->type); + SerializerIO_WriteUnsigned(rdb, v->type); switch(v->type) { case T_BOOL: case T_INT64: - RedisModule_SaveSigned(rdb, v->longval); + SerializerIO_WriteSigned(rdb, v->longval); break; case T_DOUBLE: - RedisModule_SaveDouble(rdb, v->doubleval); + SerializerIO_WriteDouble(rdb, v->doubleval); break; case T_STRING: - RedisModule_SaveStringBuffer(rdb, v->stringval, + SerializerIO_WriteBuffer(rdb, v->stringval, strlen(v->stringval) + 1); break; case T_ARRAY: _RdbSaveSIArray(rdb, *v); break; case T_POINT: - RedisModule_SaveDouble(rdb, Point_lat(*v)); - RedisModule_SaveDouble(rdb, Point_lon(*v)); + SerializerIO_WriteDouble(rdb, Point_lat(*v)); + SerializerIO_WriteDouble(rdb, Point_lon(*v)); break; case T_VECTOR_F32: _RdbSaveSIVector(rdb, *v); @@ -103,7 +103,7 @@ static void _RdbSaveSIValue static void _RdbSaveEntity ( - RedisModuleIO *rdb, + SerializerIO rdb, const GraphEntity *e ) { // Format: @@ -113,19 +113,19 @@ static void _RdbSaveEntity const AttributeSet set = GraphEntity_GetAttributes(e); uint16_t attr_count = AttributeSet_Count(set); - RedisModule_SaveUnsigned(rdb, attr_count); + SerializerIO_WriteUnsigned(rdb, attr_count); for(int i = 0; i < attr_count; i++) { AttributeID attr_id; SIValue value = AttributeSet_GetIdx(set, i, &attr_id); - RedisModule_SaveUnsigned(rdb, attr_id); + SerializerIO_WriteUnsigned(rdb, attr_id); _RdbSaveSIValue(rdb, &value); } } static void _RdbSaveEdge ( - RedisModuleIO *rdb, + SerializerIO rdb, const Graph *g, const Edge *e, int r @@ -138,16 +138,16 @@ static void _RdbSaveEdge // relation type // edge properties - RedisModule_SaveUnsigned(rdb, ENTITY_GET_ID(e)); + SerializerIO_WriteUnsigned(rdb, ENTITY_GET_ID(e)); // source node ID - RedisModule_SaveUnsigned(rdb, Edge_GetSrcNodeID(e)); + SerializerIO_WriteUnsigned(rdb, Edge_GetSrcNodeID(e)); // destination node ID - RedisModule_SaveUnsigned(rdb, Edge_GetDestNodeID(e)); + SerializerIO_WriteUnsigned(rdb, Edge_GetDestNodeID(e)); // relation type - RedisModule_SaveUnsigned(rdb, r); + SerializerIO_WriteUnsigned(rdb, r); // edge properties _RdbSaveEntity(rdb, (GraphEntity *)e); @@ -155,7 +155,7 @@ static void _RdbSaveEdge static void _RdbSaveNode_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, GraphEntity *n ) { @@ -168,15 +168,15 @@ static void _RdbSaveNode_v14 // save ID EntityID id = ENTITY_GET_ID(n); - RedisModule_SaveUnsigned(rdb, id); + SerializerIO_WriteUnsigned(rdb, id); // retrieve node labels uint l_count; NODE_GET_LABELS(gc->g, (Node *)n, l_count); - RedisModule_SaveUnsigned(rdb, l_count); + SerializerIO_WriteUnsigned(rdb, l_count); // save labels - for(uint i = 0; i < l_count; i++) RedisModule_SaveUnsigned(rdb, labels[i]); + for(uint i = 0; i < l_count; i++) SerializerIO_WriteUnsigned(rdb, labels[i]); // properties N // (name, value type, value) X N @@ -185,7 +185,7 @@ static void _RdbSaveNode_v14 static void _RdbSaveDeletedEntities_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_entities_to_encode, uint64_t *deleted_id_list @@ -195,13 +195,13 @@ static void _RdbSaveDeletedEntities_v14 // Iterated over the required range in the datablock deleted items. for(uint64_t i = offset; i < offset + deleted_entities_to_encode; i++) { - RedisModule_SaveUnsigned(rdb, deleted_id_list[i]); + SerializerIO_WriteUnsigned(rdb, deleted_id_list[i]); } } void RdbSaveDeletedNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_nodes_to_encode ) { @@ -216,7 +216,7 @@ void RdbSaveDeletedNodes_v14 void RdbSaveDeletedEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_edges_to_encode ) { @@ -232,7 +232,7 @@ void RdbSaveDeletedEdges_v14 void RdbSaveNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t nodes_to_encode ) { @@ -277,7 +277,7 @@ void RdbSaveNodes_v14 // returns true if the number of encoded edges has reached the capacity static void _RdbSaveMultipleEdges ( - RedisModuleIO *rdb, // RDB IO. + SerializerIO rdb, // RDB IO. GraphContext *gc, // Graph context. uint r, // Edges relation id. EdgeID *multiple_edges_array, // Multiple edges array (passed by ref). @@ -312,7 +312,7 @@ static void _RdbSaveMultipleEdges void RdbSaveEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t edges_to_encode ) { diff --git a/src/serializers/encoder/v14/encode_schema.c b/src/serializers/encoder/v14/encode_schema.c index 400f94714..e41167c66 100644 --- a/src/serializers/encoder/v14/encode_schema.c +++ b/src/serializers/encoder/v14/encode_schema.c @@ -9,7 +9,7 @@ static void _RdbSaveAttributeKeys ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc ) { /* Format: @@ -18,17 +18,17 @@ static void _RdbSaveAttributeKeys */ uint count = GraphContext_AttributeCount(gc); - RedisModule_SaveUnsigned(rdb, count); + SerializerIO_WriteUnsigned(rdb, count); for(uint i = 0; i < count; i ++) { char *key = gc->string_mapping[i]; - RedisModule_SaveStringBuffer(rdb, key, strlen(key) + 1); + SerializerIO_WriteBuffer(rdb, key, strlen(key) + 1); } } // encode index field static void _RdbSaveIndexField ( - RedisModuleIO *rdb, // redis io + SerializerIO rdb, // io const IndexField *f // index field to encode ) { // format: @@ -43,32 +43,32 @@ static void _RdbSaveIndexField ASSERT(f != NULL); // encode field name - RedisModule_SaveStringBuffer(rdb, f->name, strlen(f->name) + 1); + SerializerIO_WriteBuffer(rdb, f->name, strlen(f->name) + 1); // encode field type - RedisModule_SaveUnsigned(rdb, f->type); + SerializerIO_WriteUnsigned(rdb, f->type); //-------------------------------------------------------------------------- // encode field options //-------------------------------------------------------------------------- // encode field weight - RedisModule_SaveDouble(rdb, f->options.weight); + SerializerIO_WriteDouble(rdb, f->options.weight); // encode field nostem - RedisModule_SaveUnsigned(rdb, f->options.nostem); + SerializerIO_WriteUnsigned(rdb, f->options.nostem); // encode field phonetic - RedisModule_SaveStringBuffer(rdb, f->options.phonetic, + SerializerIO_WriteBuffer(rdb, f->options.phonetic, strlen(f->options.phonetic) + 1); // encode field dimension - RedisModule_SaveUnsigned(rdb, f->options.dimension); + SerializerIO_WriteUnsigned(rdb, f->options.dimension); } static inline void _RdbSaveIndexData ( - RedisModuleIO *rdb, + SerializerIO rdb, SchemaType type, Index idx ) { @@ -83,22 +83,22 @@ static inline void _RdbSaveIndexData // encode language const char *language = Index_GetLanguage(idx); - RedisModule_SaveStringBuffer(rdb, language, strlen(language) + 1); + SerializerIO_WriteBuffer(rdb, language, strlen(language) + 1); size_t stopwords_count; char **stopwords = Index_GetStopwords(idx, &stopwords_count); // encode stopwords count - RedisModule_SaveUnsigned(rdb, stopwords_count); + SerializerIO_WriteUnsigned(rdb, stopwords_count); for (size_t i = 0; i < stopwords_count; i++) { char *stopword = stopwords[i]; - RedisModule_SaveStringBuffer(rdb, stopword, strlen(stopword) + 1); + SerializerIO_WriteBuffer(rdb, stopword, strlen(stopword) + 1); rm_free(stopword); } rm_free(stopwords); // encode field count uint fields_count = Index_FieldsCount(idx); - RedisModule_SaveUnsigned(rdb, fields_count); + SerializerIO_WriteUnsigned(rdb, fields_count); // encode fields const IndexField *fields = Index_GetFields(idx); @@ -109,7 +109,7 @@ static inline void _RdbSaveIndexData static void _RdbSaveConstraint ( - RedisModuleIO *rdb, + SerializerIO rdb, const Constraint c ) { /* Format: @@ -125,7 +125,7 @@ static void _RdbSaveConstraint //-------------------------------------------------------------------------- ConstraintType t = Constraint_GetType(c); - RedisModule_SaveUnsigned(rdb, t); + SerializerIO_WriteUnsigned(rdb, t); //-------------------------------------------------------------------------- // encode constraint fields count @@ -133,7 +133,7 @@ static void _RdbSaveConstraint const AttributeID *attrs; uint8_t n = Constraint_GetAttributes(c, &attrs, NULL); - RedisModule_SaveUnsigned(rdb, n); + SerializerIO_WriteUnsigned(rdb, n); //-------------------------------------------------------------------------- // encode constraint fields @@ -141,13 +141,13 @@ static void _RdbSaveConstraint for(uint8_t i = 0; i < n; i++) { AttributeID attr = attrs[i]; - RedisModule_SaveUnsigned(rdb, attr); + SerializerIO_WriteUnsigned(rdb, attr); } } static void _RdbSaveConstraintsData ( - RedisModuleIO *rdb, + SerializerIO rdb, Constraint *constraints ) { uint n_constraints = array_len(constraints); @@ -163,7 +163,7 @@ static void _RdbSaveConstraintsData // encode number of active constraints uint n_active_constraints = array_len(active_constraints); - RedisModule_SaveUnsigned(rdb, n_active_constraints); + SerializerIO_WriteUnsigned(rdb, n_active_constraints); // encode constraints for (uint i = 0; i < n_active_constraints; i++) { @@ -175,7 +175,7 @@ static void _RdbSaveConstraintsData array_free(active_constraints); } -static void _RdbSaveSchema(RedisModuleIO *rdb, Schema *s) { +static void _RdbSaveSchema(SerializerIO rdb, Schema *s) { /* Format: * id * name @@ -186,13 +186,13 @@ static void _RdbSaveSchema(RedisModuleIO *rdb, Schema *s) { */ // Schema ID. - RedisModule_SaveUnsigned(rdb, s->id); + SerializerIO_WriteUnsigned(rdb, s->id); // Schema name. - RedisModule_SaveStringBuffer(rdb, s->name, strlen(s->name) + 1); + SerializerIO_WriteBuffer(rdb, s->name, strlen(s->name) + 1); // Number of indices. - RedisModule_SaveUnsigned(rdb, Schema_HasIndices(s)); + SerializerIO_WriteUnsigned(rdb, Schema_HasIndices(s)); // index, prefer pending over active Index idx = PENDING_IDX(s) @@ -205,7 +205,7 @@ static void _RdbSaveSchema(RedisModuleIO *rdb, Schema *s) { _RdbSaveConstraintsData(rdb, s->constraints); } -void RdbSaveGraphSchema_v14(RedisModuleIO *rdb, GraphContext *gc) { +void RdbSaveGraphSchema_v14(SerializerIO rdb, GraphContext *gc) { /* Format: * attribute keys (unified schema) * #node schemas @@ -219,7 +219,7 @@ void RdbSaveGraphSchema_v14(RedisModuleIO *rdb, GraphContext *gc) { // #Node schemas. unsigned short schema_count = GraphContext_SchemaCount(gc, SCHEMA_NODE); - RedisModule_SaveUnsigned(rdb, schema_count); + SerializerIO_WriteUnsigned(rdb, schema_count); // Name of label X #node schemas. for(int i = 0; i < schema_count; i++) { @@ -229,7 +229,7 @@ void RdbSaveGraphSchema_v14(RedisModuleIO *rdb, GraphContext *gc) { // #Relation schemas. unsigned short relation_count = GraphContext_SchemaCount(gc, SCHEMA_EDGE); - RedisModule_SaveUnsigned(rdb, relation_count); + SerializerIO_WriteUnsigned(rdb, relation_count); // Name of label X #relation schemas. for(unsigned short i = 0; i < relation_count; i++) { diff --git a/src/serializers/encoder/v14/encode_v14.h b/src/serializers/encoder/v14/encode_v14.h index 4b46a1e30..5444a7349 100644 --- a/src/serializers/encoder/v14/encode_v14.h +++ b/src/serializers/encoder/v14/encode_v14.h @@ -8,43 +8,43 @@ #include "../../serializers_include.h" -void RdbSaveGraph_v14 +void RdbSaveGraph_latest ( - RedisModuleIO *rdb, + SerializerIO rdb, void *value ); void RdbSaveNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t nodes_to_encode ); void RdbSaveDeletedNodes_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_nodes_to_encode ); void RdbSaveEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t edges_to_encode ); void RdbSaveDeletedEdges_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc, uint64_t deleted_edges_to_encode ); void RdbSaveGraphSchema_v14 ( - RedisModuleIO *rdb, + SerializerIO rdb, GraphContext *gc ); diff --git a/src/serializers/serializer_io.c b/src/serializers/serializer_io.c new file mode 100644 index 000000000..efdc84741 --- /dev/null +++ b/src/serializers/serializer_io.c @@ -0,0 +1,248 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +#include "RG.h" +#include "serializer_io.h" +#include "../util/rmalloc.h" + +#include +#include +#include + +// generic serializer +// contains a number of function pointers for data serialization +struct SerializerIO_Opaque { + void (*WriteFloat)(void*, float); // write float + void (*WriteDouble)(void*, double); // write dobule + void (*WriteSigned)(void*, int64_t); // write signed int + void (*WriteUnsigned)(void*, uint64_t); // write unsigned int + void (*WriteLongDouble)(void*, long double); // write long double + void (*WriteString)(void*, RedisModuleString*); // write RedisModuleString + void (*WriteBuffer)(void*, const char*, size_t); // write bytes + + float (*ReadFloat)(void*); // read float + double (*ReadDouble)(void*); // read dobule + int64_t (*ReadSigned)(void*); // read signed int + uint64_t (*ReadUnsigned)(void*); // read unsigned int + char* (*ReadBuffer)(void*, size_t*); // read bytes + long double (*ReadLongDouble)(void*); // read long double + RedisModuleString* (*ReadString)(void*); // read RedisModuleString + + void *stream; // RedisModuleIO* or a Stream descriptor +}; + +//------------------------------------------------------------------------------ +// Serializer Write API +//------------------------------------------------------------------------------ + +// macro for creating the stream serializer write functions +#define SERIALIZERIO_WRITE(suffix, t) \ +void SerializerIO_Write##suffix(SerializerIO io, t value) { \ + ASSERT(io != NULL); \ + io->Write##suffix(io->stream, value); \ +} + +SERIALIZERIO_WRITE(Unsigned, uint64_t) +SERIALIZERIO_WRITE(Signed, int64_t) +SERIALIZERIO_WRITE(String, RedisModuleString*) +SERIALIZERIO_WRITE(Double, double) +SERIALIZERIO_WRITE(Float, float) +SERIALIZERIO_WRITE(LongDouble, long double) + +// write buffer to stream +void SerializerIO_WriteBuffer +( + SerializerIO io, // stream to write to + const char *buff, // buffer + size_t len // number of bytes to write +) { + ASSERT(io != NULL); + io->WriteBuffer(io->stream, buff, len); +} + +// macro for creating stream serializer write functions +#define STREAM_WRITE(suffix, t) \ +static void Stream_Write##suffix(void *stream, t v) { \ + FILE *f = (FILE*)stream; \ + size_t n = fwrite(&v, sizeof(t), 1 , f); \ + ASSERT(n == 1); \ +} + +// create stream write functions +STREAM_WRITE(Unsigned, uint64_t) // Stream_WriteUnsigned +STREAM_WRITE(Signed, int64_t) // Stream_WriteSigned +STREAM_WRITE(Double, double) // Stream_WriteDouble +STREAM_WRITE(Float, float) // Stream_WriteFloat +STREAM_WRITE(LongDouble, long double) // Stream_WriteLongDouble +STREAM_WRITE(String, RedisModuleString*) // Stream_WriteString + +// write buffer to stream +static void Stream_WriteBuffer +( + void *stream, // stream to write to + const char *buff, // buffer + size_t n // number of bytes to write +) { + FILE *f = (FILE*)stream; + + // write size + size_t written = fwrite(&n, sizeof(size_t), 1, f); + ASSERT(written == 1); + + // write data + written = fwrite(buff, n, 1, f); + ASSERT(written == 1); +} + +// macro for creating stream serializer read functions +#define STREAM_READ(suffix, t) \ + static t Stream_Read##suffix(void *stream) { \ + ASSERT(stream != NULL); \ + FILE *f = (FILE*)stream; \ + t v; \ + size_t n = fread(&v, sizeof(t), 1, f); \ + ASSERT(n == 1); \ + return v; \ + } + +// create stream read functions +STREAM_READ(Unsigned, uint64_t) // Stream_ReadUnsigned +STREAM_READ(Signed, int64_t) // Stream_ReadSigned +STREAM_READ(Double, double) // Stream_ReadDouble +STREAM_READ(Float, float) // Stream_ReadFloat +STREAM_READ(LongDouble, long double) // Stream_ReadLongDouble +STREAM_READ(String, RedisModuleString*) // Stream_ReadString + +// read buffer from stream +static char *Stream_ReadBuffer +( + void *stream, // stream to read from + size_t *n // [optional] number of bytes read +) { + ASSERT(stream != NULL); + + FILE *f = (FILE*)stream; + + // read buffer's size + size_t len; + size_t read = fread(&len, sizeof(size_t), 1, f); + ASSERT(read == 1); + + char *data = rm_malloc(sizeof(char) * len); + + // read data + read = fread(data, len, 1, f); + ASSERT(read == 1); + + if(n != NULL) *n = len; + + return data; +} + +//------------------------------------------------------------------------------ +// Serializer Read API +//------------------------------------------------------------------------------ + +// macro for creating the serializer read functions +#define SERIALIZERIO_READ(suffix, t) \ +t SerializerIO_Read##suffix(SerializerIO io) { \ + ASSERT(io != NULL); \ + return io->Read##suffix(io->stream); \ +} + +SERIALIZERIO_READ(Unsigned, uint64_t) +SERIALIZERIO_READ(Signed, int64_t) +SERIALIZERIO_READ(String, RedisModuleString*) +SERIALIZERIO_READ(Double, double) +SERIALIZERIO_READ(Float, float) +SERIALIZERIO_READ(LongDouble, long double) + +// read buffer from stream +char *SerializerIO_ReadBuffer +( + SerializerIO io, // stream + size_t *lenptr // number of bytes to read +) { + return io->ReadBuffer(io->stream, lenptr); +} + +//------------------------------------------------------------------------------ +// Serializer Create API +//------------------------------------------------------------------------------ + +// create a serializer which uses stream +SerializerIO SerializerIO_FromStream +( + FILE *f // stream +) { + ASSERT(f != NULL); + + SerializerIO serializer = rm_calloc(1, sizeof(struct SerializerIO_Opaque)); + + serializer->stream = (void*)f; + + // set serializer function pointers + serializer->WriteUnsigned = Stream_WriteUnsigned; + serializer->WriteSigned = Stream_WriteSigned; + serializer->WriteString = Stream_WriteString; + serializer->WriteBuffer = Stream_WriteBuffer; + serializer->WriteDouble = Stream_WriteDouble; + serializer->WriteFloat = Stream_WriteFloat; + serializer->WriteLongDouble = Stream_WriteLongDouble; + + serializer->ReadFloat = Stream_ReadFloat; + serializer->ReadDouble = Stream_ReadDouble; + serializer->ReadSigned = Stream_ReadSigned; + serializer->ReadBuffer = Stream_ReadBuffer; + serializer->ReadString = Stream_ReadString; + serializer->ReadUnsigned = Stream_ReadUnsigned; + serializer->ReadLongDouble = Stream_ReadLongDouble; + + return serializer; +} + +// create a serializer which uses RedisIO +SerializerIO SerializerIO_FromRedisModuleIO +( + RedisModuleIO *io +) { + ASSERT(io != NULL); + + SerializerIO serializer = rm_calloc(1, sizeof(struct SerializerIO_Opaque)); + + serializer->stream = io; + + // set serializer function pointers + serializer->WriteUnsigned = (void (*)(void*, uint64_t))RedisModule_SaveUnsigned; + serializer->WriteSigned = (void (*)(void*, int64_t))RedisModule_SaveSigned; + serializer->WriteString = (void (*)(void*, RedisModuleString*))RedisModule_SaveString; + serializer->WriteBuffer = (void (*)(void*, const char*, size_t))RedisModule_SaveStringBuffer; + serializer->WriteDouble = (void (*)(void*, double))RedisModule_SaveDouble; + serializer->WriteFloat = (void (*)(void*, float))RedisModule_SaveFloat; + serializer->WriteLongDouble = (void (*)(void*, long double))RedisModule_SaveLongDouble; + + serializer->ReadFloat = (float (*)(void*))RedisModule_LoadFloat; + serializer->ReadDouble = (double (*)(void*))RedisModule_LoadDouble; + serializer->ReadSigned = (int64_t (*)(void*))RedisModule_LoadSigned; + serializer->ReadBuffer = (char* (*)(void*, size_t*))RedisModule_LoadStringBuffer; + serializer->ReadString = (RedisModuleString* (*)(void*))RedisModule_LoadString; + serializer->ReadUnsigned = (uint64_t (*)(void*))RedisModule_LoadUnsigned; + serializer->ReadLongDouble = (long double (*)(void*))RedisModule_LoadLongDouble; + + return serializer; +} + +// free serializer +void SerializerIO_Free +( + SerializerIO *io // serializer to free +) { + ASSERT(io != NULL); + ASSERT(*io != NULL); + + rm_free(*io); + *io = NULL; +} + diff --git a/src/serializers/serializer_io.h b/src/serializers/serializer_io.h new file mode 100644 index 000000000..f02cdfc23 --- /dev/null +++ b/src/serializers/serializer_io.h @@ -0,0 +1,151 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +#pragma once + +#include "../redismodule.h" + +// SerializerIO +// acts as an abstraction layer for both graph encoding and decoding +// there are two types of serializers: +// 1. RedisIO serializer +// 2. Stream serializer +// +// The graph encoding / decoding logic uses this abstraction without knowing +// what is the underline serializer + +typedef struct SerializerIO_Opaque *SerializerIO; + +// create a serializer which uses a stream +SerializerIO SerializerIO_FromStream +( + FILE *stream // stream +); + +// create a serializer which uses RedisIO +SerializerIO SerializerIO_FromRedisModuleIO +( + RedisModuleIO *io +); + +//------------------------------------------------------------------------------ +// Serializer Write API +//------------------------------------------------------------------------------ + +// write unsingned to stream +void SerializerIO_WriteUnsigned +( + SerializerIO io, // stream to write to + uint64_t value // value +); + +// write signed to stream +void SerializerIO_WriteSigned +( + SerializerIO io, // stream to write to + int64_t value // value +); + +// write string to stream +void SerializerIO_WriteString +( + SerializerIO io, // stream to write to + RedisModuleString *s // string +); + +// write buffer to stream +void SerializerIO_WriteBuffer +( + SerializerIO io, // stream to write to + const char *buff, // buffer + size_t len // number of bytes to write +); + +// write double to stream +void SerializerIO_WriteDouble +( + SerializerIO io, // stream + double value // value +); + +// write float to stream +void SerializerIO_WriteFloat +( + SerializerIO io, // stream + float value // value +); + +// write long double to stream +void SerializerIO_WriteLongDouble +( + SerializerIO io, // stream + long double value // value +); + +//------------------------------------------------------------------------------ +// Serializer Read API +//------------------------------------------------------------------------------ + +// read unsigned from stream +uint64_t SerializerIO_ReadUnsigned +( + SerializerIO io // stream +); + +// read signed from stream +int64_t SerializerIO_ReadSigned +( + SerializerIO io // stream +); + +// read string from stream +RedisModuleString *SerializerIO_ReadString +( + SerializerIO io // stream +); + +// read buffer from stream +char *SerializerIO_ReadBuffer +( + SerializerIO io, // stream + size_t *lenptr // number of bytes to read +); + +// read double from stream +double SerializerIO_ReadDouble +( + SerializerIO io // stream +); + +// read float from stream +float SerializerIO_ReadFloat +( + SerializerIO io // stream +); + +// read long double from stream +long double SerializerIO_ReadLongDouble +( + SerializerIO io // stream +); + +#define SerializerIO_Write(io, value,...) \ + _Generic \ + ( \ + (value), \ + uint64_t : SerializerIO_WriteUnsigned , \ + int64_t : SerializerIO_WriteSigned , \ + RedisModuleString* : SerializerIO_WriteString , \ + double : SerializerIO_WriteDouble , \ + float : SerializerIO_WriteFloat , \ + long double : SerializerIO_WriteLongDouble \ + ) \ + (io, value) + +void SerializerIO_Free +( + SerializerIO *io // serializer to free +); + diff --git a/src/serializers/serializers_include.h b/src/serializers/serializers_include.h index 9e20e8d85..4219f65c6 100644 --- a/src/serializers/serializers_include.h +++ b/src/serializers/serializers_include.h @@ -11,6 +11,7 @@ #include "GraphBLAS.h" #include "../util/arr.h" #include "../query_ctx.h" +#include "serializer_io.h" #include "../redismodule.h" #include "../util/rmalloc.h" #include "graph_extensions.h" diff --git a/tests/flow/graph_utils.py b/tests/flow/graph_utils.py new file mode 100644 index 000000000..a8a50c696 --- /dev/null +++ b/tests/flow/graph_utils.py @@ -0,0 +1,50 @@ +# returns True if graphs have the same: +# set of labels +# set of relations +# set of properties +# node +# edges +# indices +# constrains + +def graph_eq(A, B): + + queries = [ + # labels + "CALL db.labels() YIELD label RETURN label ORDER BY label", + + # relationships + """CALL db.relationshiptypes() YIELD relationshipType + RETURN relationshipType ORDER BY relationshipType""", + + # properties + """CALL db.propertyKeys() YIELD propertyKey + RETURN propertyKey ORDER BY propertyKey""", + + # nodes + "MATCH (n) RETURN n ORDER BY(n)", + + # validate relationships + "MATCH ()-[e]->() RETURN e ORDER BY(e)", + + # indices + """CALL db.indexes() + YIELD label, properties, types, language, stopwords, entitytype + RETURN label, properties, types, language, stopwords, entitytype + ORDER BY label, properties, types, language, stopwords, entitytype""", + + # constraints + """CALL db.constraints() + YIELD type, label, properties, entitytype, status + RETURN type, label, properties, entitytype, status + ORDER BY type, label, properties, entitytype, status""" + ] + + for q in queries: + A_labels = A.ro_query(q).result_set + B_labels = B.ro_query(q).result_set + if A_labels != B_labels: + return False + + return True + diff --git a/tests/flow/test_effects.py b/tests/flow/test_effects.py index 599e56b09..4981a9cef 100644 --- a/tests/flow/test_effects.py +++ b/tests/flow/test_effects.py @@ -2,6 +2,7 @@ import threading from common import * from index_utils import * +from graph_utils import graph_eq GRAPH_ID = "effects" MONITOR_ATTACHED = False @@ -77,42 +78,7 @@ def query_master_and_wait(self, q): # asserts that master and replica have the same view over the graph def assert_graph_eq(self): - #validate schema: - # labels - q = "CALL db.labels()" - master_labels = self.master_graph.query(q).result_set - replica_labels = self.replica_graph.ro_query(q).result_set - self.env.assertEquals(master_labels, replica_labels) - - # relationship-types - q = "CALL db.relationshiptypes()" - master_relations = self.master_graph.query(q).result_set - replica_relations = self.replica_graph.ro_query(q).result_set - self.env.assertEquals(master_relations, replica_relations) - - # properties - q = "CALL db.propertyKeys()" - master_props = self.master_graph.query(q).result_set - replica_props = self.replica_graph.ro_query(q).result_set - self.env.assertEquals(master_props, replica_props) - - # validate nodes - q = "MATCH (n) RETURN n ORDER BY(n)" - master_resultset = self.master_graph.query(q).result_set - replica_resultset = self.replica_graph.ro_query(q).result_set - self.env.assertEquals(master_resultset, replica_resultset) - - # validate relationships - q = "MATCH ()-[e]->() RETURN e ORDER BY(e)" - master_resultset = self.master_graph.query(q).result_set - replica_resultset = self.replica_graph.ro_query(q).result_set - self.env.assertEquals(master_resultset, replica_resultset) - - # validate indices - q = "CALL db.indexes() YIELD label, properties, types, language, stopwords, entitytype" - master_resultset = self.master_graph.query(q).result_set - replica_resultset = self.replica_graph.ro_query(q).result_set - self.env.assertEquals(master_resultset, replica_resultset) + self.env.assertTrue(graph_eq(self.master_graph, self.replica_graph)) def __init__(self): self.env, self.db = Env(env='oss', useSlaves=True) diff --git a/tests/flow/test_graph_copy.py b/tests/flow/test_graph_copy.py new file mode 100644 index 000000000..2b5761400 --- /dev/null +++ b/tests/flow/test_graph_copy.py @@ -0,0 +1,278 @@ +from common import Env, FalkorDB, SANITIZER, VALGRIND +from random_graph import create_random_schema, create_random_graph +from graph_utils import graph_eq +import time + +GRAPH_ID = "graph_copy" + +# tests the GRAPH.COPY command +class testGraphCopy(): + def __init__(self): + self.env, self.db = Env(enableDebugCommand=True) + self.conn = self.env.getConnection() + + def graph_copy(self, src, dest): + # invokes the GRAPH.COPY command + self.conn.execute_command("GRAPH.COPY", src, dest) + + # compare graphs + def assert_graph_eq(self, A, B): + # tests that the graphs are the same + while True: + try: + self.env.assertTrue(graph_eq(A, B)) + break + except Exception as e: + # retry if query was issued while redis is loading + if str(e) == "Redis is loading the dataset in memory": + print("Retry!") + continue + + def test_01_invalid_invocation(self): + # validate invalid invocations of the GRAPH.COPY command + + # missing src graph + src = 'A' + dest = 'Z' + + # wrong number of arguments + try: + self.conn.execute_command("GRAPH.COPY", src) + self.env.assertTrue(False) + except Exception: + pass + + try: + self.conn.execute_command("GRAPH.COPY", src, dest, 3) + self.env.assertTrue(False) + except Exception: + pass + + # src graph doesn't exists + try: + self.graph_copy(src, dest) + self.env.assertTrue(False) + except Exception: + pass + + # src key isn't a graph + self.conn.set(src, 1) + + try: + self.graph_copy(src, dest) + self.env.assertTrue(False) + except Exception: + pass + self.conn.delete(src) + + # create src graph + src_graph = self.db.select_graph(src) + src_graph.query("RETURN 1") + + # dest key exists + # key type doesn't matter + try: + self.graph_copy(src, src) + self.env.assertTrue(False) + except Exception: + pass + + # clean up + self.conn.delete(src, dest) + + def test_02_copy_empty_graph(self): + # src is an empty graph + src = 'a' + dest = 'z' + src_graph = self.db.select_graph(src) + + # create empty src graph + src_graph.query("RETURN 1") + + # make a copy of src graph + self.graph_copy(src, dest) + + # validate that both src and dest graph exists and are the same + self.env.assertTrue(self.conn.type(src) == 'graphdata') + self.env.assertTrue(self.conn.type(dest) == 'graphdata') + + dest_graph = self.db.select_graph(dest) + + # make sure both src and dest graph are empty + src_node_count = src_graph.query("MATCH (n) RETURN count(n)").result_set[0][0] + dest_node_count = dest_graph.query("MATCH (n) RETURN count(n)").result_set[0][0] + self.env.assertEqual(src_node_count, 0) + self.env.assertEqual(dest_node_count, 0) + + # clean up + src_graph.delete() + dest_graph.delete() + + def test_03_copy_random_graph(self): + # make sure copying of a random graph is working as expected + src = 'a' + dest = 'z' + + src_graph = self.db.select_graph(src) + nodes, edges = create_random_schema() + create_random_graph(src_graph, nodes, edges) + + # copy src graph to dest graph + self.graph_copy(src, dest) + dest_graph = self.db.select_graph(dest) + + # validate src and dest graphs are the same + self.assert_graph_eq(src_graph, dest_graph) + + # clean up + src_graph.delete() + dest_graph.delete() + + def test_04_copy_constraints(self): + # make sure constrains and indexes are copied + + src_id = GRAPH_ID + clone_id = GRAPH_ID + "_copy" + + src_graph = self.db.select_graph(src_id) + clone_graph = self.db.select_graph(clone_id) + + # create graph with both indices and constrains + src_graph.create_node_range_index("Person", "name", "age") + + self.conn.execute_command("GRAPH.CONSTRAINT", "CREATE", src_id, "UNIQUE", + "NODE", "Person", "PROPERTIES", 1, "name") + + # copy graph + self.graph_copy(src_id, clone_id) + + # make sure src and cloned graph are the same + self.assert_graph_eq(src_graph, clone_graph) + + # clean up + src_graph.delete() + clone_graph.delete() + + def test_05_chain_of_copies(self): + # make multiple copies of essentially the same graph + # start with graph A + # copy A to B, copy B to C and so on and so forth up to J + # A == B == C == D + src = 'A' + + # create a random graph + src_graph = self.db.select_graph(src) + nodes, edges = create_random_schema() + create_random_graph(src_graph, nodes, edges) + + # clone graph multiple times + for key in range(ord('B'), ord('D')+1): + dest = chr(key) + self.graph_copy(src, dest) + src = dest + + # validate src and dest graphs are the same + src_graph = self.db.select_graph('A') + dest_graph = self.db.select_graph('D') + self.assert_graph_eq(src_graph, dest_graph) + + # clean up + for key in range(ord('A'), ord('D')+1): + i = chr(key) + graph = self.db.select_graph(chr(key)) + graph.delete() + + def test_06_write_to_copy(self): + # make sure copied graph is writeable and loadable + src_graph_id = GRAPH_ID + copy_graph_id = GRAPH_ID + "_copy" + + query = "CREATE (:A {v:1})-[:R {v:2}]->(:B {v:3})" + src_graph = self.db.select_graph(src_graph_id) + src_graph.query(query) + + # create a copy + self.graph_copy(src_graph_id, copy_graph_id) + copy_graph = self.db.select_graph(copy_graph_id) + + query = "MATCH (b:B {v:3}) CREATE (b)-[:R {v:4}]->(:C {v:5})" + src_graph.query(query) + copy_graph.query(query) + + # reload entire keyspace + self.conn.execute_command("DEBUG", "RELOAD") + + # make sure both src and copy exists and functional + self.assert_graph_eq(src_graph, copy_graph) + + # clean up + src_graph.delete() + copy_graph.delete() + + def test_07_copy_uneffected_by_vkey_size(self): + # set size of virtual key to 1 + # i.e. number of entities per virtual key is 1. + vkey_max_entity_count = self.db.config_get("VKEY_MAX_ENTITY_COUNT") + self.db.config_set("VKEY_MAX_ENTITY_COUNT", 1) + + # make sure configuration chnaged + self.env.assertEqual(self.db.config_get("VKEY_MAX_ENTITY_COUNT"), 1) + + src_graph_id = GRAPH_ID + copy_graph_id = GRAPH_ID + "_copy" + + # create graph + src_graph = self.db.select_graph(src_graph_id) + nodes, edges = create_random_schema() + create_random_graph(src_graph, nodes, edges) + + # make a copy + self.graph_copy(src_graph_id, copy_graph_id) + copy_graph = self.db.select_graph(copy_graph_id) + + # restore original VKEY_MAX_ENTITY_COUNT + self.db.config_set("VKEY_MAX_ENTITY_COUNT", vkey_max_entity_count) + + # validate src_graph and copy_graph are the same + self.assert_graph_eq(src_graph, copy_graph) + + # clean up + src_graph.delete() + + def test_08_replicated_copy(self): + # skip test if we're running under Valgrind or sanitizer + if VALGRIND or SANITIZER != "": + self.env.skip() # valgrind is not working correctly with replication + + # make sure the GRAPH.COPY command is replicated + + # stop old environment + self.env.stop() + + # start a new environment, one which have a master and a replica + self.env, self.db = Env(env='oss', useSlaves=True) + + master_con = self.env.getConnection() + + # create a random graph + src_graph_id = GRAPH_ID + copy_graph_id = "copy_" + GRAPH_ID + + src_graph = self.db.select_graph(src_graph_id) + nodes, edges = create_random_schema() + create_random_graph(src_graph, nodes, edges) + + # copy graph + self.graph_copy(src_graph_id, copy_graph_id) + + # the WAIT command forces master slave sync to complete + master_con.execute_command("WAIT", "1", "0") + + # make sure dest graph was replicated + # assuming replica port is env port+1 + replica_db = FalkorDB("localhost", self.env.port+1) + replica_cloned_graph = replica_db.select_graph(copy_graph_id) + + # make sure src graph on master is the same as cloned graph on replica + self.assert_graph_eq(src_graph, replica_cloned_graph) + diff --git a/tests/unit/test_serializer.c b/tests/unit/test_serializer.c new file mode 100644 index 000000000..e037641b0 --- /dev/null +++ b/tests/unit/test_serializer.c @@ -0,0 +1,173 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +#include "src/util/rmalloc.h" +#include "src/serializers/serializer_io.h" + +#include +#include +#include + +void setup() { + Alloc_Reset(); +} + +#define TEST_INIT setup(); +#include "acutest.h" + +void test_serializer(void) { + // create pipe + int pipefd[2]; // read and write ends of a pipe + TEST_ASSERT(pipe(pipefd) != -1); + + // create a FILE* stream both ends of the pipe + FILE *fs_read_stream = fdopen(pipefd[0], "r"); + FILE *fs_write_stream = fdopen(pipefd[1], "w"); + TEST_ASSERT(fs_read_stream != NULL); + TEST_ASSERT(fs_write_stream != NULL); + + SerializerIO reader = SerializerIO_FromStream(fs_read_stream); + SerializerIO writer = SerializerIO_FromStream(fs_write_stream); + TEST_ASSERT(reader != NULL); + TEST_ASSERT(writer != NULL); + + //-------------------------------------------------------------------------- + // write to stream + //-------------------------------------------------------------------------- + + // write unsigned + uint64_t unsigned_v = 2; + SerializerIO_WriteUnsigned(writer, unsigned_v); + + // write signed + int64_t signed_v = 3; + SerializerIO_WriteSigned(writer, signed_v); + + // write buffer + const char* write_buff = "data"; + size_t write_len = strlen(write_buff); + SerializerIO_WriteBuffer(writer, write_buff, write_len); + + // write double + double double_v = 4.5; + SerializerIO_WriteDouble(writer, double_v); + + // write float + float float_v = 6.7; + SerializerIO_WriteFloat(writer, float_v); + + // write long double + long double longdouble_v = 8.9; + SerializerIO_WriteLongDouble(writer, longdouble_v); + + fclose(fs_write_stream); // close write end + close(pipefd[1]); // close pipe + SerializerIO_Free(&writer); // free serializer + + //-------------------------------------------------------------------------- + // read from stream + //-------------------------------------------------------------------------- + + // read unsigned + TEST_ASSERT(SerializerIO_ReadUnsigned(reader) == unsigned_v); + + // read signed + TEST_ASSERT(SerializerIO_ReadSigned(reader) == signed_v); + + // read buffer + size_t read_len; + char *read_buff = SerializerIO_ReadBuffer(reader, &read_len); + TEST_ASSERT(read_len == write_len); + TEST_ASSERT(strcmp(read_buff, write_buff) == 0); + rm_free(read_buff); + + // read double + TEST_ASSERT(SerializerIO_ReadDouble(reader) == double_v); + + // read float + TEST_ASSERT(SerializerIO_ReadFloat(reader) == float_v); + + // read long double + TEST_ASSERT(SerializerIO_ReadLongDouble(reader) == longdouble_v); + + fclose(fs_read_stream); // close file streams + close(pipefd[0]); // close pipe + SerializerIO_Free(&reader); // free serializer +} + +void test_serializer_generic_write(void) { + // create pipe + int pipefd[2]; // read and write ends of a pipe + TEST_ASSERT(pipe(pipefd) != -1); + + // create a FILE* stream both ends of the pipe + FILE *fs_read_stream = fdopen(pipefd[0], "r"); + FILE *fs_write_stream = fdopen(pipefd[1], "w"); + TEST_ASSERT(fs_read_stream != NULL); + TEST_ASSERT(fs_write_stream != NULL); + + SerializerIO reader = SerializerIO_FromStream(fs_read_stream); + SerializerIO writer = SerializerIO_FromStream(fs_write_stream); + TEST_ASSERT(reader != NULL); + TEST_ASSERT(writer != NULL); + + //-------------------------------------------------------------------------- + // write to stream + //-------------------------------------------------------------------------- + + // write unsigned + uint64_t unsigned_v = 2; + SerializerIO_Write(writer, unsigned_v); + + // write signed + int64_t signed_v = 3; + SerializerIO_Write(writer, signed_v); + + // write double + double double_v = 4.5; + SerializerIO_Write(writer, double_v); + + // write float + float float_v = 6.7; + SerializerIO_Write(writer, float_v); + + // write long double + long double longdouble_v = 8.9; + SerializerIO_Write(writer, longdouble_v); + + fclose(fs_write_stream); // close file streams + close(pipefd[1]); // close pipe + SerializerIO_Free(&writer); // free serializer + + //-------------------------------------------------------------------------- + // read from stream + //-------------------------------------------------------------------------- + + // read unsigned + TEST_ASSERT(SerializerIO_ReadUnsigned(reader) == unsigned_v); + + // read signed + TEST_ASSERT(SerializerIO_ReadSigned(reader) == signed_v); + + // read double + TEST_ASSERT(SerializerIO_ReadDouble(reader) == double_v); + + // read float + TEST_ASSERT(SerializerIO_ReadFloat(reader) == float_v); + + // read long double + TEST_ASSERT(SerializerIO_ReadLongDouble(reader) == longdouble_v); + + fclose(fs_read_stream); // close file streams + close(pipefd[0]); // close pipe + SerializerIO_Free(&reader); // free serializer +} + +TEST_LIST = { + { "serializer", test_serializer}, + { "serializer_generic_write", test_serializer_generic_write}, + { NULL, NULL } +}; +