Skip to content

Commit

Permalink
GRAPH.COPY (#585)
Browse files Browse the repository at this point in the history
* initial work graph.copy

* pipe serializer

* abstract serializer

* graph encoder switch to abstract serializer

* switch decoder to abstract serializer

* copy command wip

* first copy

* early flow test

* additional tests

* transition to worker thread execution

* switch to worker thread

* retry when fork failed

* skip graph copy replication test when running under sanitizer

* import SANITIZER

* plant cloned key only after decoding is done

* switch to CRON job

* fork writes to file

* add logs

* fix leak

* replicate via RESTORE, switch to FILE*

* add graph.restore

* fix unit-test wrong stream acquisition

* fork requires gil

* rename encoder/decoder v14 to latest
  • Loading branch information
swilly22 committed Mar 5, 2024
1 parent bcb4cdb commit 6ceccaa
Show file tree
Hide file tree
Showing 23 changed files with 1,702 additions and 216 deletions.
501 changes: 501 additions & 0 deletions src/commands/cmd_copy.c

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions src/commands/cmd_restore.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright FalkorDB Ltd. 2023 - present
* Licensed under the Server Side Public License v1 (SSPLv1).
*/

#include "RG.h"
#include "../graph/graphcontext.h"
#include "../serializers/serializer_io.h"
#include "../serializers/decoders/current/v14/decode_v14.h"

extern RedisModuleType *GraphContextRedisModuleType;

// restore a graph from binary representation
// this command is the counter part of GRAPH.COPY
// which replicates the cloned graph via GRAPH.RESTORE
//
// usage:
// GRAPH.RESTORE <graph> <payload>
//
// this function is ment to execute on redis main thread
int Graph_Restore
(
RedisModuleCtx *ctx, // redis module context
RedisModuleString **argv, // command argument
int argc // number of argument
) {
// validations
ASSERT(ctx != NULL);
ASSERT(argv != NULL);

// expecting exactly 3 arguments:
// argv[0] command name
// argv[1] graph key
// argv[2] graph payload
if(argc != 3) {
return RedisModule_WrongArity(ctx);
}

// TODO: reject GRAPH.RESTORE if caller isn't the master

//--------------------------------------------------------------------------
// make sure graph key doesn't exists
//--------------------------------------------------------------------------

RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
int key_type = RedisModule_KeyType(key);
RedisModule_CloseKey(key);

// key exists, fail
if(key_type != REDISMODULE_KEYTYPE_EMPTY) {
RedisModule_ReplyWithError(ctx, "restore graph failed, key already exists");
return REDISMODULE_OK;
}

//--------------------------------------------------------------------------
// decode payload
//--------------------------------------------------------------------------

// create memory stream
size_t len;
const char *payload = RedisModule_StringPtrLen(argv[2], &len);

FILE *stream = fmemopen((void*)payload, len, "r");
ASSERT(stream != NULL);

SerializerIO io = SerializerIO_FromStream(stream);

// decode graph
GraphContext *gc = RdbLoadGraphContext_latest(io, argv[1]);
ASSERT(gc != NULL);

// add graph to keyspace
key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);

// set value in key
RedisModule_ModuleTypeSetValue(key, GraphContextRedisModuleType, gc);

RedisModule_CloseKey(key);

// register graph context for BGSave
GraphContext_RegisterWithModule(gc);

//--------------------------------------------------------------------------
// clean up
//--------------------------------------------------------------------------

SerializerIO_Free(&io);

fclose(stream);

return REDISMODULE_OK;
}

1 change: 1 addition & 0 deletions src/commands/commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
GRAPH_Commands CommandFromString(const char *cmd_name) {
if (!strcasecmp(cmd_name, "graph.INFO")) return CMD_INFO;
if (!strcasecmp(cmd_name, "graph.LIST")) return CMD_LIST;
if (!strcasecmp(cmd_name, "graph.COPY")) return CMD_COPY;
if (!strcasecmp(cmd_name, "graph.QUERY")) return CMD_QUERY;
if (!strcasecmp(cmd_name, "graph.DEBUG")) return CMD_DEBUG;
if (!strcasecmp(cmd_name, "graph.EFFECT")) return CMD_EFFECT;
Expand Down
7 changes: 6 additions & 1 deletion src/commands/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ typedef enum {
CMD_LIST = 9,
CMD_DEBUG = 10,
CMD_INFO = 11,
CMD_EFFECT = 12
CMD_EFFECT = 12,
CMD_COPY = 13,
CMD_RESTORE = 14
} GRAPH_Commands;

//------------------------------------------------------------------------------
Expand All @@ -42,10 +44,13 @@ void Graph_Explain(void *args);

int Graph_List(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Info(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Copy(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Debug(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Delete(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Effect(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Config(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Restore(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Slowlog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int CommandDispatch(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
int Graph_Constraint(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);

1 change: 1 addition & 0 deletions src/errors/error_msgs.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#pragma once

#define EMSG_GRAPH_EXISTS "Graph %s already exists"
#define EMSG_EMPTY_KEY "Encountered an empty key when opened key %s"
#define EMSG_NON_GRAPH_KEY "Encountered a non-graph value type when opened key %s"
#define EMSG_DIFFERENT_VALUE "Encountered different graph value when opened key %s"
Expand Down
10 changes: 10 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ int RedisModule_OnLoad
return REDISMODULE_ERR;
}

if(RedisModule_CreateCommand(ctx, "graph.COPY", Graph_Copy,
"write deny-oom", 1, 2, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if(RedisModule_CreateCommand(ctx, "graph.RESTORE", Graph_Restore,
"write deny-oom", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if(BoltApi_Register(ctx) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
Expand Down
39 changes: 18 additions & 21 deletions src/serializers/decoders/current/v14/decode_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ static void _InitGraphDataStructure

static GraphContext *_DecodeHeader
(
RedisModuleIO *rdb
SerializerIO rdb
) {
// Header format:
// Graph name
Expand All @@ -66,24 +66,24 @@ static GraphContext *_DecodeHeader
// Schema

// graph name
char *graph_name = RedisModule_LoadStringBuffer(rdb, NULL);
char *graph_name = SerializerIO_ReadBuffer(rdb, NULL);

// each key header contains the following:
// #nodes, #edges, #deleted nodes, #deleted edges, #labels matrices, #relation matrices
uint64_t node_count = RedisModule_LoadUnsigned(rdb);
uint64_t edge_count = RedisModule_LoadUnsigned(rdb);
uint64_t deleted_node_count = RedisModule_LoadUnsigned(rdb);
uint64_t deleted_edge_count = RedisModule_LoadUnsigned(rdb);
uint64_t label_count = RedisModule_LoadUnsigned(rdb);
uint64_t relation_count = RedisModule_LoadUnsigned(rdb);
uint64_t multi_edge[relation_count];
uint64_t node_count = SerializerIO_ReadUnsigned(rdb);
uint64_t edge_count = SerializerIO_ReadUnsigned(rdb);
uint64_t deleted_node_count = SerializerIO_ReadUnsigned(rdb);
uint64_t deleted_edge_count = SerializerIO_ReadUnsigned(rdb);
uint64_t label_count = SerializerIO_ReadUnsigned(rdb);
uint64_t relation_count = SerializerIO_ReadUnsigned(rdb);
uint64_t multi_edge[relation_count];

for(uint i = 0; i < relation_count; i++) {
multi_edge[i] = RedisModule_LoadUnsigned(rdb);
multi_edge[i] = SerializerIO_ReadUnsigned(rdb);
}

// total keys representing the graph
uint64_t key_number = RedisModule_LoadUnsigned(rdb);
uint64_t key_number = SerializerIO_ReadUnsigned(rdb);

GraphContext *gc = _GetOrCreateGraphContext(graph_name);
Graph *g = gc->g;
Expand Down Expand Up @@ -116,31 +116,32 @@ static GraphContext *_DecodeHeader

static PayloadInfo *_RdbLoadKeySchema
(
RedisModuleIO *rdb
SerializerIO rdb
) {
// Format:
// #Number of payloads info - N
// N * Payload info:
// Encode state
// Number of entities encoded in this state.

uint64_t payloads_count = RedisModule_LoadUnsigned(rdb);
uint64_t payloads_count = SerializerIO_ReadUnsigned(rdb);
PayloadInfo *payloads = array_new(PayloadInfo, payloads_count);

for(uint i = 0; i < payloads_count; i++) {
// for each payload
// load its type and the number of entities it contains
PayloadInfo payload_info;
payload_info.state = RedisModule_LoadUnsigned(rdb);
payload_info.entities_count = RedisModule_LoadUnsigned(rdb);
payload_info.state = SerializerIO_ReadUnsigned(rdb);
payload_info.entities_count = SerializerIO_ReadUnsigned(rdb);
array_append(payloads, payload_info);
}
return payloads;
}

GraphContext *RdbLoadGraphContext_v14
GraphContext *RdbLoadGraphContext_latest
(
RedisModuleIO *rdb
SerializerIO rdb,
const RedisModuleString *rm_key_name
) {

// Key format:
Expand Down Expand Up @@ -197,7 +198,6 @@ GraphContext *RdbLoadGraphContext_v14
GraphDecodeContext_IncreaseProcessedKeyCount(gc->decoding_context);

// before finalizing keep encountered meta keys names, for future deletion
const RedisModuleString *rm_key_name = RedisModule_GetKeyNameFromIO(rdb);
const char *key_name = RedisModule_StringPtrLen(rm_key_name, NULL);

// the virtual key name is not equal the graph name
Expand Down Expand Up @@ -251,9 +251,6 @@ GraphContext *RdbLoadGraphContext_v14
ASSERT(Graph_Pending(g) == false);

GraphDecodeContext_Reset(gc->decoding_context);

RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb);
RedisModule_Log(ctx, "notice", "Done decoding graph %s", gc->graph_name);
}

return gc;
Expand Down

0 comments on commit 6ceccaa

Please sign in to comment.