-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GRAPH.COPY #585
GRAPH.COPY #585
Conversation
WalkthroughThe recent updates introduce significant changes to support graph copying and restoration functionalities in a Redis module. These modifications include the introduction of the Changes
Assessment against linked issues
Related issues
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 9
Configuration used: CodeRabbit UI
Files selected for processing (20)
- src/commands/cmd_copy.c (1 hunks)
- src/commands/commands.c (1 hunks)
- src/commands/commands.h (2 hunks)
- src/errors/error_msgs.h (1 hunks)
- src/module.c (1 hunks)
- src/serializers/decoders/current/v14/decode_graph.c (5 hunks)
- src/serializers/decoders/current/v14/decode_graph_entities.c (10 hunks)
- src/serializers/decoders/current/v14/decode_graph_schema.c (13 hunks)
- src/serializers/decoders/current/v14/decode_v14.h (1 hunks)
- src/serializers/decoders/decode_graph.c (1 hunks)
- src/serializers/encoder/encode_graph.c (1 hunks)
- src/serializers/encoder/v14/encode_graph.c (6 hunks)
- src/serializers/encoder/v14/encode_graph_entities.c (14 hunks)
- src/serializers/encoder/v14/encode_schema.c (12 hunks)
- src/serializers/encoder/v14/encode_v14.h (1 hunks)
- src/serializers/serializer_io.c (1 hunks)
- src/serializers/serializer_io.h (1 hunks)
- src/serializers/serializers_include.h (1 hunks)
- tests/flow/test_graph_copy.py (1 hunks)
- tests/unit/test_serializer.c (1 hunks)
Files skipped from review due to trivial changes (1)
- src/serializers/serializers_include.h
Additional comments: 39
src/serializers/encoder/encode_graph.c (1)
- 9-9: Including
serializer_io.h
introduces the necessary abstraction layer for serialization operations, aligning with the broader changes across the serialization mechanism.src/serializers/decoders/decode_graph.c (1)
- 11-15: The introduction of
SerializerIO
and the retrieval of the key name usingRedisModule_GetKeyNameFromIO
before decoding are correctly implemented. This ensures that the decoding process is abstracted and can utilize additional context such as the key name.src/serializers/encoder/v14/encode_v14.h (1)
- 13-47: Updating function signatures in
encode_v14.h
to useSerializerIO
instead ofRedisModuleIO
is consistent with the abstraction strategy. This change enhances modularity and flexibility in handling serialization operations.src/serializers/decoders/current/v14/decode_v14.h (1)
- 13-47: The modifications to function signatures in
decode_v14.h
, including the use ofSerializerIO
and the addition ofconst RedisModuleString *rm_key_name
toRdbLoadGraphContext_v14
, are correctly implemented. These changes support the abstraction layer and provide additional context for decoding operations.src/commands/commands.c (1)
- 13-13: Correctly adding support for the "graph.COPY" command in
CommandFromString
. This change is essential for enabling the new functionality introduced by the PR.src/commands/commands.h (2)
- 30-31: Adding
CMD_COPY
to theGRAPH_Commands
enum is correctly implemented, ensuring the new command is recognized and handled appropriately within the system.- 46-46: The declaration of
Graph_Copy
incommands.h
correctly aligns with the introduction of theCMD_COPY
command, ensuring the system has a defined entry point for handling this command.src/serializers/serializer_io.h (1)
- 1-142: The introduction of
SerializerIO
inserializer_io.h
is well-designed, offering a comprehensive abstraction layer for serialization operations. This includes support for creating serializers from pipes and RedisModuleIO, as well as a complete set of APIs for reading and writing various data types. Ensure that the implementation of these functions correctly handles edge cases and errors to maintain robustness.tests/unit/test_serializer.c (1)
- 1-160: The unit tests in
test_serializer.c
are well-structured and cover a comprehensive range of scenarios for testing the SerializerIO functionality, including specific type serialization and generic write operations. Ensure that these tests are executed as part of the continuous integration process to maintain the reliability of the serialization abstraction layer.src/serializers/decoders/current/v14/decode_graph_entities.c (1)
- 90-123: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [10-226]
The updates in
decode_graph_entities.c
to useSerializerIO
for loading various data types are correctly implemented, aligning with the abstraction strategy for decoding operations. This change enhances modularity and flexibility in handling decoding processes. Ensure that all data types are correctly handled and that there are no regressions in decoding functionality.src/serializers/encoder/v14/encode_schema.c (1)
- 43-74: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [12-232]
The modifications in
encode_schema.c
to useSerializerIO
for serialization operations are correctly implemented, enhancing the abstraction layer for encoding processes. This change supports modularity and flexibility in handling serialization. Ensure that all schema-related data types are correctly serialized and that there are no regressions in encoding functionality.src/module.c (1)
- 230-233: The registration of the
GRAPH.COPY
command looks correct. Ensure that the command flags"write deny-oom"
and arity(1, 2, 1)
align with the intended functionality and usage of the command. Specifically, verify that the command should indeed be marked as a write operation and that it properly handles out-of-memory conditions.src/serializers/decoders/current/v14/decode_graph_schema.c (5)
- 27-57: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [12-49]
The transition from
RedisModuleIO
toSerializerIO
for decoding index fields appears to be correctly implemented. Ensure thorough testing of these changes to validate data integrity and compatibility with existing serialized data.
- 64-82: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [54-127]
The modifications to use
SerializerIO
for loading indices and constraints are correctly implemented. It's crucial to perform comprehensive testing, especially for edge cases and previously serialized data, to ensure the new mechanism works as expected without data loss or corruption.
- 187-193: The changes to use
SerializerIO
for loading constraints are correctly applied. As with other serialization changes, recommend extensive testing to ensure that constraints are accurately decoded and applied to the graph schema.- 214-221: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [202-218]
The updates to use
SerializerIO
for loading schema details, including handling of node and relation schemas, are properly implemented. Emphasize the importance of testing these changes thoroughly to ensure schemas are correctly decoded and integrated into the graph context.
- 249-253: The transition to
SerializerIO
for loading attribute keys is correctly done. Recommend validating this functionality with tests, particularly focusing on the correct association of attributes to nodes and edges in the graph.src/serializers/decoders/current/v14/decode_graph.c (3)
- 66-89: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [54-86]
The updates to use
SerializerIO
for decoding the graph header and the addition of handling for multiple edges and key numbers are correctly implemented. Ensure that the handling of the newrm_key_name
parameter in related functions is thoroughly tested, particularly its impact on graph context creation and key management.
- 119-135: The changes to use
SerializerIO
for loading the key schema are correctly applied. Testing should focus on verifying that the payload information for each key is accurately decoded and that it aligns with the expected graph structure.- 143-144: The addition of the
const RedisModuleString *rm_key_name
parameter toRdbLoadGraphContext_v14
is noted. Ensure that this parameter is effectively used within the function for enhanced key handling during graph decoding. Testing should include scenarios where the key name impacts the decoding process or the graph context's state.src/serializers/encoder/v14/encode_graph.c (2)
- 34-67: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [17-64]
The transition from
RedisModuleIO
toSerializerIO
for encoding the graph header, including handling of graph name, counts, and matrix details, is correctly implemented. While approving these changes, consider the implications of removing logging code on the ability to debug or monitor the encoding process. Ensure that sufficient mechanisms are in place to track encoding operations, especially in production environments.
- 155-175: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [114-164]
The updates to use
SerializerIO
for saving the key schema and encoding various graph components are correctly applied. As with other serialization changes, emphasize the importance of thorough testing, particularly to ensure that the encoded graph data accurately reflects the graph's structure and content.src/commands/cmd_copy.c (1)
- 186-192: Replicating the command using
RedisModule_ReplicateVerbatim
is essential for ensuring that the graph copy operation is replicated in a master-slave setup. However, the comment mentions potential issues with command replication, such asRedisModule_Fork
failing on the replica. It's important to ensure that the replication logic accounts for the asynchronous nature of theGRAPH.COPY
operation and handles errors gracefully on replicas.Consider adding additional checks or mechanisms to ensure that replicas can handle the
GRAPH.COPY
command reliably, especially in scenarios where system limitations might cause the command to fail.tests/flow/test_graph_copy.py (5)
- 47-98: The
test_01_invalid_invocation
method thoroughly tests various invalid invocations of theGRAPH.COPY
command. It's well-structured and covers missing source graphs, wrong argument counts, non-existent source graphs, and existing destination keys. No changes are needed here.- 99-125: The
test_02_copy_empty_graph
method correctly tests copying an empty graph. It verifies that both the source and destination graphs exist and are empty after the copy operation. This test ensures that theGRAPH.COPY
command behaves as expected even when dealing with empty graphs.- 127-145: The
test_03_copy_random_graph
method effectively tests the copying of a graph with randomly generated schema and data. It validates that the source and destination graphs are identical after the copy operation, ensuring the integrity of the copied data.- 203-232: The
test_06_copy_uneffected_by_vkey_size
method tests theGRAPH.COPY
command under a specific configuration (VKEY_MAX_ENTITY_COUNT
set to 1). It's important to restore the original configuration after the test to avoid side effects on subsequent tests.- 233-267: The
test_07_replicated_copy
method ensures that theGRAPH.COPY
command is replicated correctly in a master-slave setup. This test is crucial for verifying the behavior of the command in a distributed environment. The use of theWAIT
command to synchronize the master and slave is a good practice.src/errors/error_msgs.h (1)
- 9-9: The addition of the
EMSG_GRAPH_EXISTS
error message constant is clear and follows the existing naming conventions. Ensure that this new error message is utilized appropriately in the codebase wherever the relevant error condition occurs.src/serializers/encoder/v14/encode_graph_entities.c (10)
- 13-13: The replacement of
RedisModuleIO
withSerializerIO
in the_RdbSaveSIValue
function parameter is consistent with the PR's objective to abstract the serialization process. Ensure that theSerializerIO
interface is correctly implemented.- 10-22: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [19-31]
The changes in
_RdbSaveSIArray
to useSerializerIO
for writing unsigned integers are appropriate and align with the goal of abstracting the serialization process. Ensure comprehensive testing to verify correct serialization behavior.
- 49-95: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [40-60]
The modifications in
_RdbSaveSIVector
to useSerializerIO
for writing unsigned integers and floats are correctly implemented. It's crucial to test these changes thoroughly to confirm the serialization integrity of vectors.
- 66-92: The comprehensive changes in
_RdbSaveSIValue
to utilizeSerializerIO
for writing various data types (unsigned integers, signed integers, doubles, and buffers) are well-executed. Recommend extensive testing to ensure that all data types are serialized correctly without data loss or corruption.- 113-131: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [106-121]
The update in
_RdbSaveEntity
to useSerializerIO
for writing unsigned integers and serializing values is in line with the abstraction goal. Validate through testing that entity serialization remains accurate and efficient.
- 138-161: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [128-150]
The changes in
_RdbSaveEdge
to useSerializerIO
for edge serialization, including writing unsigned integers for various edge attributes, are correctly implemented. Ensure that edge serialization tests cover all edge cases to prevent regressions.
- 168-182: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [158-179]
The modifications in
_RdbSaveNode_v14
to useSerializerIO
for node serialization, including writing unsigned integers for node IDs and labels, are well done. Testing should include various node types to ensure serialization accuracy.
- 185-191: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [188-198]
The implementation of
_RdbSaveDeletedEntities_v14
usingSerializerIO
to write unsigned integers for deleted entity IDs is appropriate. Verify through testing that deleted entities are correctly serialized and tracked.
- 195-207: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [204-219]
The changes in
RdbSaveDeletedNodes_v14
andRdbSaveDeletedEdges_v14
to useSerializerIO
for serialization of deleted nodes and edges are consistent with the abstraction efforts. Ensure that the deletion process is accurately reflected in the serialized data.
- 232-238: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [235-280]
The updates in
RdbSaveNodes_v14
,RdbSaveEdges_v14
, and auxiliary functions to useSerializerIO
for serialization tasks are correctly aligned with the goal of improving serialization flexibility. It's essential to conduct thorough testing, especially for edge cases, to ensure the integrity of the serialized graph data.
SerializerIO io = SerializerIO_FromRedisModuleIO(rdb); | ||
RdbSaveGraph_v14(io, value); | ||
SerializerIO_Free(&io); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
The review comment is substantiated by the findings. The SerializerIO_Free
function's implementation, combined with the explicit mention of a TODO to close the pipe, indicates that the cleanup process may not fully address the closing of pipe file descriptors. This oversight could potentially lead to resource leaks, aligning with the concerns raised in the review comment about ensuring proper cleanup to prevent memory leaks. Therefore, the review comment is valid and highlights an important area for improvement in the codebase.
Analysis chain
The transition to using SerializerIO
for graph serialization operations is correctly implemented. However, ensure that SerializerIO_Free
properly handles all cleanup tasks to prevent memory leaks.
Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verify SerializerIO_Free implementation for resource cleanup
grep -A 10 "void SerializerIO_Free" src/serializers/serializer_io.c
Length of output: 235
Script:
#!/bin/bash
# Search for references to the "pipe" within SerializerIO structure and its usage
grep -R "pipe" src/serializers/
Length of output: 2052
src/commands/cmd_copy.c
Outdated
// encode graph into pipe | ||
// this function should run on a child process, giving us the guarantee that | ||
// 1. the cloned graph wouldn't change | ||
// 2. due to memory seperation we do not need to hold any locks | ||
// 3. we're allowed to make modification to the graph e.g. rename | ||
static void _encode_graph |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of a loop for retrying the GRAPH.COPY
command in case of a fork failure is a good resilience mechanism. However, consider implementing a maximum retry limit to prevent infinite loops in scenarios where forking consistently fails due to system constraints.
+ int retry_limit = 5;
while (retry_limit > 0) {
try {
self.conn.execute_command("GRAPH.COPY", src, dest);
break;
} catch (Exception e) {
if (str(e) == "Graph copy failed, could not fork, please retry") {
time.sleep(0.1);
+ retry_limit--;
continue;
} else {
throw e;
}
}
}
+ if (retry_limit == 0) {
+ throw new Exception("Graph copy failed after multiple retries");
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
// encode graph into pipe | |
// this function should run on a child process, giving us the guarantee that | |
// 1. the cloned graph wouldn't change | |
// 2. due to memory seperation we do not need to hold any locks | |
// 3. we're allowed to make modification to the graph e.g. rename | |
static void _encode_graph | |
// encode graph into pipe | |
// this function should run on a child process, giving us the guarantee that | |
// 1. the cloned graph wouldn't change | |
// 2. due to memory seperation we do not need to hold any locks | |
// 3. we're allowed to make modification to the graph e.g. rename | |
static void _encode_graph | |
int retry_limit = 5; | |
while (retry_limit > 0) { | |
try { | |
self.conn.execute_command("GRAPH.COPY", src, dest); | |
break; | |
} catch (Exception e) { | |
if (str(e) == "Graph copy failed, could not fork, please retry") { | |
time.sleep(0.1); | |
retry_limit--; | |
continue; | |
} else { | |
throw e; | |
} | |
} | |
} | |
if (retry_limit == 0) { | |
throw new Exception("Graph copy failed after multiple retries"); | |
} |
src/commands/cmd_copy.c
Outdated
// when the graph is decoded it is already holds the target name | ||
GraphContext_Rename(ctx, src_graph, dest_graph_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming the source graph during the encoding process might introduce side effects, especially if the operation fails or is interrupted. Consider creating a copy of the graph's metadata with the new name instead of renaming the original graph. This approach would avoid potential inconsistencies in the event of a failure.
- GraphContext_Rename(ctx, src_graph, dest_graph_name);
+ // Instead of renaming, create a copy of the graph's metadata with the new name.
+ GraphContext *cloned_graph_metadata = GraphContext_CloneMetadata(ctx, src_graph, dest_graph_name);
// Proceed with encoding using cloned_graph_metadata instead of src_graph.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
// when the graph is decoded it is already holds the target name | |
GraphContext_Rename(ctx, src_graph, dest_graph_name); | |
// when the graph is decoded it is already holds the target name | |
// Instead of renaming, create a copy of the graph's metadata with the new name. | |
GraphContext *cloned_graph_metadata = GraphContext_CloneMetadata(ctx, src_graph, dest_graph_name); | |
// Proceed with encoding using cloned_graph_metadata instead of src_graph. |
src/commands/cmd_copy.c
Outdated
if(pipe(pipefd) == -1) { | ||
// failed to create pipe, abort | ||
error = true; | ||
RedisModule_ReplyWithError(ctx, | ||
"Graph copy failed, could not create pipe"); | ||
goto cleanup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a pipe for inter-process communication during the graph copy operation is a solid choice for ensuring data integrity and isolation. However, error handling for the pipe
system call is missing. It's crucial to handle potential errors from write
and read
operations on the pipe to prevent data corruption or incomplete transfers.
ssize_t n = write(pipefd, &v, sizeof(t));
+ if (n == -1) {
+ // Handle write error, possibly by logging and aborting the operation
+ }
ASSERT(n == sizeof(t));
And similarly for read
operations.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
if(pipe(pipefd) == -1) { | |
// failed to create pipe, abort | |
error = true; | |
RedisModule_ReplyWithError(ctx, | |
"Graph copy failed, could not create pipe"); | |
goto cleanup; | |
if(pipe(pipefd) == -1) { | |
// failed to create pipe, abort | |
error = true; | |
RedisModule_ReplyWithError(ctx, | |
"Graph copy failed, could not create pipe"); | |
goto cleanup; |
src/commands/cmd_copy.c
Outdated
int pid = RedisModule_Fork(NULL, NULL); | ||
if(pid < 0) { | ||
error = true; | ||
// failed to fork! | ||
RedisModule_ReplyWithError(ctx, | ||
"Graph copy failed, could not fork, please retry"); | ||
goto cleanup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of RedisModule_Fork
for creating a child process to handle the graph encoding is a critical operation. Proper error handling is implemented for the fork failure scenario. However, consider adding more detailed logging or metrics around fork failures to aid in diagnosing system resource limitations or other issues that might cause forking to fail.
if(pid < 0) {
error = true;
// failed to fork!
+ RedisModule_Log(ctx, "warning", "Graph copy failed, could not fork due to system limitations or resource constraints.");
RedisModule_ReplyWithError(ctx,
"Graph copy failed, could not fork, please retry");
goto cleanup;
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
int pid = RedisModule_Fork(NULL, NULL); | |
if(pid < 0) { | |
error = true; | |
// failed to fork! | |
RedisModule_ReplyWithError(ctx, | |
"Graph copy failed, could not fork, please retry"); | |
goto cleanup; | |
int pid = RedisModule_Fork(NULL, NULL); | |
if(pid < 0) { | |
error = true; | |
// failed to fork! | |
RedisModule_Log(ctx, "warning", "Graph copy failed, could not fork due to system limitations or resource constraints."); | |
RedisModule_ReplyWithError(ctx, | |
"Graph copy failed, could not fork, please retry"); | |
goto cleanup; |
src/serializers/serializer_io.c
Outdated
// macro for creating the pipe serializer write functions | ||
#define SERIALIZERIO_WRITE(suffix, t) \ | ||
void SerializerIO_Write##suffix(SerializerIO io, t value) { \ | ||
ASSERT(io != NULL); \ | ||
io->Write##suffix(io->stream, value); \ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The macro SERIALIZERIO_WRITE
and its usage to define write functions for various data types is a clean and DRY approach. However, ensure that all write operations check the return value of the underlying write
system call to handle partial writes or errors gracefully.
ssize_t n = write(pipefd, &v, sizeof(t));
+ if (n != sizeof(t)) {
+ // Handle partial write or error
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
// macro for creating the pipe serializer write functions | |
#define SERIALIZERIO_WRITE(suffix, t) \ | |
void SerializerIO_Write##suffix(SerializerIO io, t value) { \ | |
ASSERT(io != NULL); \ | |
io->Write##suffix(io->stream, value); \ | |
} | |
// macro for creating the pipe serializer write functions | |
#define SERIALIZERIO_WRITE(suffix, t) \ | |
void SerializerIO_Write##suffix(SerializerIO io, t value) { \ | |
ASSERT(io != NULL); \ | |
io->Write##suffix(io->stream, value); \ | |
} |
src/serializers/serializer_io.c
Outdated
// macro for creating pipe serializer read functions | ||
#define PIPE_READ(suffix, t) \ | ||
static t Pipe_Read##suffix(void *pipe) { \ | ||
ASSERT(pipe != NULL); \ | ||
int pipefd = (intptr_t)pipe; \ | ||
t v; \ | ||
ssize_t n = read(pipefd, &v, sizeof(t)); \ | ||
ASSERT(n == sizeof(t)); \ | ||
return v; \ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the write operations, the read operations defined by the PIPE_READ
macro should also check for errors or partial reads from the read
system call. This is crucial for ensuring data integrity during deserialization.
ssize_t n = read(pipefd, &v, sizeof(t));
+ if (n != sizeof(t)) {
+ // Handle partial read or error
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
// macro for creating pipe serializer read functions | |
#define PIPE_READ(suffix, t) \ | |
static t Pipe_Read##suffix(void *pipe) { \ | |
ASSERT(pipe != NULL); \ | |
int pipefd = (intptr_t)pipe; \ | |
t v; \ | |
ssize_t n = read(pipefd, &v, sizeof(t)); \ | |
ASSERT(n == sizeof(t)); \ | |
return v; \ | |
} | |
// macro for creating pipe serializer read functions | |
#define PIPE_READ(suffix, t) \ | |
static t Pipe_Read##suffix(void *pipe) { \ | |
ASSERT(pipe != NULL); \ | |
int pipefd = (intptr_t)pipe; \ | |
t v; \ | |
ssize_t n = read(pipefd, &v, sizeof(t)); \ | |
if (n != sizeof(t)) { \ | |
// Handle partial read or error \ | |
} \ | |
ASSERT(n == sizeof(t)); \ | |
return v; \ | |
} |
src/serializers/serializer_io.c
Outdated
// create a serializer which uses pipe | ||
SerializerIO SerializerIO_FromPipe | ||
( | ||
int pipefd // either the read or write end of a pipe | ||
) { | ||
SerializerIO serializer = rm_calloc(1, sizeof(struct SerializerIO_Opaque)); | ||
|
||
serializer->stream = (void*)(intptr_t)pipefd; | ||
|
||
// set serializer function pointers | ||
serializer->WriteUnsigned = Pipe_WriteUnsigned; | ||
serializer->WriteSigned = Pipe_WriteSigned; | ||
serializer->WriteString = Pipe_WriteString; | ||
serializer->WriteBuffer = Pipe_WriteBuffer; | ||
serializer->WriteDouble = Pipe_WriteDouble; | ||
serializer->WriteFloat = Pipe_WriteFloat; | ||
serializer->WriteLongDouble = Pipe_WriteLongDouble; | ||
|
||
serializer->ReadFloat = Pipe_ReadFloat; | ||
serializer->ReadDouble = Pipe_ReadDouble; | ||
serializer->ReadSigned = Pipe_ReadSigned; | ||
serializer->ReadBuffer = Pipe_ReadBuffer; | ||
serializer->ReadString = Pipe_ReadString; | ||
serializer->ReadUnsigned = Pipe_ReadUnsigned; | ||
serializer->ReadLongDouble = Pipe_ReadLongDouble; | ||
|
||
return serializer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function SerializerIO_FromPipe
correctly sets up a serializer for pipe-based communication. However, it's important to ensure that the pipe file descriptor is closed properly when the serializer is freed to avoid resource leaks.
void SerializerIO_Free(SerializerIO *io) {
ASSERT(io != NULL);
ASSERT(*io != NULL);
+ close((intptr_t)(*io)->stream); // Ensure the pipe is closed
rm_free(*io);
*io = NULL;
}
Note: This suggestion assumes that the serializer is the sole owner of the pipe file descriptor. If the file descriptor is shared or managed elsewhere, adjust the cleanup logic accordingly.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
// create a serializer which uses pipe | |
SerializerIO SerializerIO_FromPipe | |
( | |
int pipefd // either the read or write end of a pipe | |
) { | |
SerializerIO serializer = rm_calloc(1, sizeof(struct SerializerIO_Opaque)); | |
serializer->stream = (void*)(intptr_t)pipefd; | |
// set serializer function pointers | |
serializer->WriteUnsigned = Pipe_WriteUnsigned; | |
serializer->WriteSigned = Pipe_WriteSigned; | |
serializer->WriteString = Pipe_WriteString; | |
serializer->WriteBuffer = Pipe_WriteBuffer; | |
serializer->WriteDouble = Pipe_WriteDouble; | |
serializer->WriteFloat = Pipe_WriteFloat; | |
serializer->WriteLongDouble = Pipe_WriteLongDouble; | |
serializer->ReadFloat = Pipe_ReadFloat; | |
serializer->ReadDouble = Pipe_ReadDouble; | |
serializer->ReadSigned = Pipe_ReadSigned; | |
serializer->ReadBuffer = Pipe_ReadBuffer; | |
serializer->ReadString = Pipe_ReadString; | |
serializer->ReadUnsigned = Pipe_ReadUnsigned; | |
serializer->ReadLongDouble = Pipe_ReadLongDouble; | |
return serializer; | |
void SerializerIO_Free(SerializerIO *io) { | |
ASSERT(io != NULL); | |
ASSERT(*io != NULL); | |
close((intptr_t)(*io)->stream); // Ensure the pipe is closed | |
rm_free(*io); | |
*io = NULL; | |
} |
tests/flow/test_graph_copy.py
Outdated
def graph_copy(self, src, dest): | ||
# invokes the GRAPH.COPY command | ||
# handels exception when the command failed due to failure in creating | ||
# a fork, in which case the command is retried | ||
while True: | ||
try: | ||
# it is possible for GRAPH.COPY to fail in case FalkorDB was unable | ||
# to create a fork (Redis restricts us to a single fork at a time) | ||
self.conn.execute_command("GRAPH.COPY", src, dest) | ||
break | ||
except Exception as e: | ||
# retry if FalkorDB failed to fork | ||
# otherwise raise an exception | ||
if str(e) == "Graph copy failed, could not fork, please retry": | ||
time.sleep(0.1) | ||
continue | ||
else: | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry mechanism in the graph_copy
method is a practical approach to handling temporary failures due to fork limitations. However, as mentioned earlier, consider implementing a maximum retry limit to prevent infinite loops.
+ max_retries = 5
while max_retries > 0:
try:
self.conn.execute_command("GRAPH.COPY", src, dest)
break
except Exception as e:
if str(e) == "Graph copy failed, could not fork, please retry":
time.sleep(0.1)
+ max_retries -= 1
continue
else:
raise e
+ if max_retries == 0:
+ raise Exception("Graph copy failed after multiple retries")
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
def graph_copy(self, src, dest): | |
# invokes the GRAPH.COPY command | |
# handels exception when the command failed due to failure in creating | |
# a fork, in which case the command is retried | |
while True: | |
try: | |
# it is possible for GRAPH.COPY to fail in case FalkorDB was unable | |
# to create a fork (Redis restricts us to a single fork at a time) | |
self.conn.execute_command("GRAPH.COPY", src, dest) | |
break | |
except Exception as e: | |
# retry if FalkorDB failed to fork | |
# otherwise raise an exception | |
if str(e) == "Graph copy failed, could not fork, please retry": | |
time.sleep(0.1) | |
continue | |
else: | |
raise e | |
def graph_copy(self, src, dest): | |
# invokes the GRAPH.COPY command | |
# handels exception when the command failed due to failure in creating | |
# a fork, in which case the command is retried | |
max_retries = 5 | |
while max_retries > 0: | |
try: | |
# it is possible for GRAPH.COPY to fail in case FalkorDB was unable | |
# to create a fork (Redis restricts us to a single fork at a time) | |
self.conn.execute_command("GRAPH.COPY", src, dest) | |
break | |
except Exception as e: | |
# retry if FalkorDB failed to fork | |
# otherwise raise an exception | |
if str(e) == "Graph copy failed, could not fork, please retry": | |
time.sleep(0.1) | |
max_retries -= 1 | |
continue | |
else: | |
raise e | |
if max_retries == 0: | |
raise Exception("Graph copy failed after multiple retries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (2)
- src/commands/cmd_copy.c (1 hunks)
- tests/flow/test_graph_copy.py (1 hunks)
Files skipped from review as they are similar to previous changes (2)
- src/commands/cmd_copy.c
- tests/flow/test_graph_copy.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- tests/flow/test_graph_copy.py (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- tests/flow/test_graph_copy.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- tests/flow/test_graph_copy.py (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- tests/flow/test_graph_copy.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (6)
- src/commands/cmd_copy.c (1 hunks)
- src/serializers/serializer_io.c (1 hunks)
- src/serializers/serializer_io.h (1 hunks)
- tests/flow/graph_utils.py (1 hunks)
- tests/flow/test_effects.py (2 hunks)
- tests/flow/test_graph_copy.py (1 hunks)
Files skipped from review as they are similar to previous changes (4)
- src/commands/cmd_copy.c
- src/serializers/serializer_io.c
- src/serializers/serializer_io.h
- tests/flow/test_graph_copy.py
Additional comments: 2
tests/flow/graph_utils.py (1)
- 10-49: The
graph_eq
function introduces a comprehensive approach to comparing two graphs by executing a series of Cypher queries to fetch and compare various graph characteristics. While the logic and implementation appear correct and maintainable, consider the following points for potential improvement:
- Performance: Executing multiple Cypher queries in a loop could impact database performance, especially for large graphs. It might be beneficial to explore ways to reduce the number of database calls, possibly by combining some queries or leveraging more efficient database features if available.
- Error Handling: Ensure that the database queries' execution is wrapped in appropriate error handling to gracefully manage any potential issues during the comparison process.
- Documentation: Adding a bit more documentation about the function's purpose, parameters, and return value could enhance its readability and maintainability for future contributors.
Overall, the function is a valuable addition to the testing utilities, facilitating thorough graph comparisons.
tests/flow/test_effects.py (1)
- 81-81: Refactoring the
assert_graph_eq
method to use thegraph_eq
function fromgraph_utils
is a commendable improvement. This change simplifies the graph comparison logic within the test suite, promoting code reuse and potentially enhancing test reliability. A few considerations to ensure the effectiveness of this refactoring include:
- Test Coverage: Verify that the
graph_eq
function has comprehensive test coverage to ensure its reliability for use in test assertions.- Behavioral Consistency: Ensure that the refactoring does not change the intended behavior of the tests or reduce their ability to detect discrepancies between graphs.
Overall, this refactoring is a positive change that contributes to the maintainability and clarity of the test suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- src/commands/cmd_copy.c (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/commands/cmd_copy.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (3)
- src/commands/cmd_copy.c (1 hunks)
- src/serializers/decoders/current/v14/decode_graph.c (5 hunks)
- tests/flow/test_graph_copy.py (1 hunks)
Files skipped from review as they are similar to previous changes (3)
- src/commands/cmd_copy.c
- src/serializers/decoders/current/v14/decode_graph.c
- tests/flow/test_graph_copy.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- src/commands/cmd_copy.c (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/commands/cmd_copy.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- tests/unit/test_serializer.c (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- tests/unit/test_serializer.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- tests/unit/test_serializer.c (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- tests/unit/test_serializer.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (4)
- src/serializers/decoders/current/v14/decode_graph_entities.c (10 hunks)
- src/serializers/decoders/current/v14/decode_graph_schema.c (13 hunks)
- src/serializers/encoder/v14/encode_graph_entities.c (14 hunks)
- src/serializers/encoder/v14/encode_schema.c (12 hunks)
Files skipped from review as they are similar to previous changes (4)
- src/serializers/decoders/current/v14/decode_graph_entities.c
- src/serializers/decoders/current/v14/decode_graph_schema.c
- src/serializers/encoder/v14/encode_graph_entities.c
- src/serializers/encoder/v14/encode_schema.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 6
Configuration used: CodeRabbit UI
Files selected for processing (4)
- src/commands/cmd_copy.c (1 hunks)
- src/serializers/serializer_io.c (1 hunks)
- src/serializers/serializer_io.h (1 hunks)
- tests/unit/test_serializer.c (1 hunks)
Files skipped from review as they are similar to previous changes (3)
- src/serializers/serializer_io.c
- src/serializers/serializer_io.h
- tests/unit/test_serializer.c
// return a full path to a temporary dump file | ||
// e.g. /tmp/<UUID>.dump | ||
static char *_temp_file(void) { | ||
char *uuid = UUID_New(); | ||
char *path; | ||
asprintf(&path, "/tmp/%s.dump", uuid); | ||
rm_free(uuid); | ||
|
||
return path; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _temp_file
function generates a temporary file path using a UUID. While this approach is generally safe, it's important to consider the security implications of creating temporary files in a predictable location like /tmp
. An attacker could potentially exploit this by guessing the file name and creating a symlink to a critical file, leading to a symlink attack.
Consider using a more secure method to create temporary files, such as mkstemp
, which creates and opens a unique temporary file in a safe manner, minimizing the risk of symlink attacks.
// create a new graph copy context | ||
static GraphCopyContext *GraphCopyContext_New | ||
( | ||
RedisModuleBlockedClient *bc, // blocked client | ||
RedisModuleString *src, // src graph key name | ||
RedisModuleString *dest // destination graph key name | ||
) { | ||
ASSERT(bc != NULL); | ||
ASSERT(src != NULL); | ||
ASSERT(dest != NULL); | ||
|
||
GraphCopyContext *ctx = rm_malloc(sizeof(GraphCopyContext)); | ||
|
||
ctx->bc = bc; | ||
ctx->path = _temp_file(); | ||
ctx->rm_src = src; | ||
ctx->rm_dest = dest; | ||
ctx->src = RedisModule_StringPtrLen(src, NULL); | ||
ctx->dest = RedisModule_StringPtrLen(dest, NULL); | ||
|
||
return ctx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GraphCopyContext_New
function initializes a new GraphCopyContext
structure. It correctly asserts the non-nullity of its arguments and allocates memory for the context. However, there's no error handling for the memory allocation with rm_malloc
. If rm_malloc
fails and returns NULL
, the subsequent operations on ctx
would lead to undefined behavior.
Add error handling after the rm_malloc
call to check if ctx
is NULL
. If it is, handle the error gracefully, possibly by logging an error message and returning NULL
from the function.
src/commands/cmd_copy.c
Outdated
// encode graph to disk | ||
// this function should run on a child process, giving us the guarantees: | ||
// 1. the cloned graph wouldn't change | ||
// 2. due to memory seperation we do not need to hold any locks | ||
// 3. we're allowed to make modification to the graph e.g. rename | ||
static int encode_graph | ||
( | ||
RedisModuleCtx *ctx, // redis module context | ||
GraphContext *gc, // graph to clone | ||
GraphCopyContext *copy_ctx // graph copy context | ||
) { | ||
// validations | ||
ASSERT(gc != NULL); | ||
ASSERT(ctx != NULL); | ||
ASSERT(copy_ctx != NULL); | ||
|
||
int res = 0; // 0 indicates success | ||
|
||
// rename graph, needed by the decoding procedure | ||
// when the graph is decoded it is already holds the target name | ||
GraphContext_Rename(ctx, gc, copy_ctx->dest); | ||
|
||
//-------------------------------------------------------------------------- | ||
// serialize graph to file | ||
//-------------------------------------------------------------------------- | ||
|
||
// open dump file | ||
// write only, create if missing, truncate if exists | ||
// grant READ access to group (0644) | ||
FILE *f = fopen(copy_ctx->path, "wb"); | ||
if(f == NULL) { | ||
// failed to open file | ||
res = 1; // indicate error | ||
goto cleanup; | ||
} | ||
|
||
// create serializer | ||
SerializerIO io = SerializerIO_FromStream(f); | ||
ASSERT(io != NULL); | ||
|
||
// encode graph to disk | ||
RedisModule_Log(NULL, REDISMODULE_LOGLEVEL_NOTICE, "dump graph: %s to: %s", | ||
copy_ctx->src, copy_ctx->path); | ||
RdbSaveGraph_v14(io, gc); | ||
|
||
cleanup: | ||
|
||
// free serializer | ||
if(io != NULL) SerializerIO_Free(&io); | ||
|
||
// close file | ||
if(f != NULL) fclose(f); | ||
|
||
// all done, no errors | ||
return res; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The encode_graph
function is responsible for encoding a graph to disk. It correctly handles file opening and error checking. However, there are a couple of issues:
- The file permissions are mentioned in the comments as "grant READ access to group (0644)", but the actual
fopen
call with"wb"
mode does not explicitly set these permissions. The comment might be misleading or incorrect. - The function uses
ASSERT
to check for non-null arguments but does not validate the success of theRdbSaveGraph_v14
call. This could lead to silent failures in graph serialization.
- Clarify the comment regarding file permissions or adjust the code to explicitly set the intended permissions using
open
withO_CREAT
andO_WRONLY
flags, followed byfdopen
to convert the file descriptor to aFILE*
. - Add error handling for the
RdbSaveGraph_v14
call to ensure that serialization failures are detected and handled appropriately.
src/commands/cmd_copy.c
Outdated
|
||
// seek to the end of the file | ||
fseek(f, 0, SEEK_END); | ||
|
||
// get current position, which is the size of the file | ||
long fileLength = ftell(f); | ||
|
||
// seek to the beginning of the file | ||
rewind(f); | ||
|
||
// allocate buffer to hold entire dumped graph | ||
buffer = rm_malloc(sizeof(char) * fileLength + 1); | ||
|
||
// read file content into buffer | ||
fread(buffer, 1, fileLength, f); | ||
|
||
buffer[fileLength] = '\0'; // Null-terminate the buffer | ||
|
||
fclose(f); // close file | ||
|
||
//-------------------------------------------------------------------------- | ||
// create memory stream | ||
//-------------------------------------------------------------------------- | ||
|
||
stream = fmemopen(buffer, fileLength, "r"); | ||
if(stream == NULL) { | ||
RedisModule_ReplyWithError(ctx, "copy failed"); | ||
goto cleanup; | ||
} | ||
|
||
// create serializer ontop of file descriptor | ||
io = SerializerIO_FromStream(stream); | ||
ASSERT(io != NULL); | ||
|
||
// decode graph from io | ||
RedisModule_Log(NULL, REDISMODULE_LOGLEVEL_NOTICE, | ||
"Decoding graph: %s from: %s", copy_ctx->dest, copy_ctx->path); | ||
|
||
GraphContext *gc = RdbLoadGraphContext_v14(io, copy_ctx->rm_dest); | ||
ASSERT(gc != NULL); | ||
|
||
// TODO: should we decrase gc ref count? | ||
// TODO: validate by performing graph deletions | ||
|
||
//-------------------------------------------------------------------------- | ||
// add cloned graph to keyspace | ||
//-------------------------------------------------------------------------- | ||
|
||
RedisModule_ThreadSafeContextLock(ctx); // lock GIL | ||
|
||
// make sure dest key does not exists | ||
RedisModuleKey *key = | ||
RedisModule_OpenKey(ctx, copy_ctx->rm_dest, REDISMODULE_READ); | ||
int key_type = RedisModule_KeyType(key); | ||
|
||
RedisModule_CloseKey(key); | ||
|
||
if(key_type != REDISMODULE_KEYTYPE_EMPTY) { | ||
// error! | ||
RedisModule_ThreadSafeContextUnlock(ctx); // release GIL | ||
|
||
// free graph | ||
GraphContext_DecreaseRefCount(gc); | ||
|
||
RedisModule_ReplyWithError(ctx, "copy failed"); | ||
} else { | ||
// create key | ||
key = RedisModule_OpenKey(ctx, copy_ctx->rm_dest, REDISMODULE_WRITE); | ||
|
||
// set value in key | ||
RedisModule_ModuleTypeSetValue(key, GraphContextRedisModuleType, gc); | ||
|
||
RedisModule_CloseKey(key); | ||
|
||
// replicate graph | ||
RedisModule_Replicate(ctx, "RESTORE", "c", buffer); | ||
|
||
RedisModule_ThreadSafeContextUnlock(ctx); // release GIL | ||
|
||
// register graph context for BGSave | ||
GraphContext_RegisterWithModule(gc); | ||
|
||
RedisModule_ReplyWithCString(ctx, "OK"); | ||
} | ||
|
||
cleanup: | ||
|
||
// free serializer | ||
if(io != NULL) SerializerIO_Free(&io); | ||
|
||
// close file descriptor | ||
if(stream != NULL) fclose(stream); | ||
|
||
// free buffer | ||
if(buffer != NULL) rm_free(buffer); | ||
|
||
// free copy context | ||
GraphCopyContext_Free(copy_ctx); | ||
|
||
RedisModule_FreeThreadSafeContext(ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LoadGraphFromFile
function is responsible for loading a graph from a file and decoding it. There are several points to address:
- The function reads the entire file into memory before decoding. This approach might not be scalable for very large graphs, potentially leading to memory exhaustion.
- The use of
fmemopen
is a good approach for creating a stream from memory, but there's no error handling for thefread
call. Iffread
fails, it could lead to undefined behavior during decoding. - The replication command
RedisModule_Replicate(ctx, "RESTORE", "c", buffer);
is used incorrectly. TheRESTORE
command expects a serialized Redis dump payload, not the raw contents of a graph dump file. This misuse could lead to replication issues or data corruption.
- Consider streaming the file during decoding instead of loading it entirely into memory to handle large graphs more efficiently.
- Add error handling for the
fread
call to ensure that the file is read successfully before proceeding with decoding. - Correct the use of the replication command to ensure that it's compatible with the expected payload format. If the intention is to replicate the graph copy operation, a custom replication command or a different approach might be necessary.
src/commands/cmd_copy.c
Outdated
int Graph_Copy | ||
( | ||
RedisModuleCtx *ctx, // redis module context | ||
RedisModuleString **argv, // command argument | ||
int argc // number of argument | ||
) { | ||
// validations | ||
ASSERT(ctx != NULL); | ||
ASSERT(argv != NULL); | ||
|
||
// expecting exactly 3 arguments: | ||
// argv[0] command name | ||
// argv[1] src_graph_id | ||
// argv[2] dest_graph_id | ||
if(argc != 3) { | ||
return RedisModule_WrongArity(ctx); | ||
} | ||
|
||
// block the client | ||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, | ||
NULL, 0); | ||
|
||
// retain arguments | ||
RedisModule_RetainString(ctx, argv[1]); | ||
RedisModule_RetainString(ctx, argv[2]); | ||
|
||
// create command context | ||
GraphCopyContext *context = GraphCopyContext_New(bc, argv[1], argv[2]); | ||
|
||
// add GRAPH.COPY as a cron task to run as soon as possible | ||
Cron_AddTask(0, _Graph_Copy, NULL, context); | ||
|
||
return REDISMODULE_OK; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Graph_Copy
function implements the main logic for the GRAPH.COPY
command. It correctly checks the number of arguments and blocks the client during the operation. However, there are a couple of points to consider:
- The function blocks the client but does not specify any timeout or reply callbacks. This could lead to clients being blocked indefinitely if the copy operation encounters an error or takes a long time.
- The function uses
Cron_AddTask
to schedule the copy operation but does not provide any mechanism for error handling or feedback if the cron task fails or cannot be scheduled.
- Consider specifying timeout and reply callbacks when blocking the client to ensure that clients are not left waiting indefinitely.
- Implement error handling and feedback mechanisms for the cron task scheduling to inform the client if the copy operation cannot be initiated or encounters errors during execution.
src/commands/cmd_copy.c
Outdated
// error alreay omitted by 'GraphContext_Retrieve' | ||
goto cleanup; | ||
} | ||
|
||
//-------------------------------------------------------------------------- | ||
// fork process | ||
//-------------------------------------------------------------------------- | ||
|
||
// child process will encode src graph to a file | ||
// parent process will decode cloned graph from file | ||
|
||
int pid = -1; | ||
while(pid == -1) { | ||
// acquire READ lock on gc | ||
// we do not want to fork while the graph is modified | ||
Graph_AcquireReadLock(gc->g); | ||
|
||
// try to fork | ||
pid = RedisModule_Fork(ForkDoneHandler, copy_ctx); | ||
if(pid < 0) { | ||
// failed to fork! retry in a bit | ||
|
||
// release graph READ lock | ||
Graph_ReleaseLock(gc->g); | ||
|
||
// go to sleep for 1.0ms | ||
struct timespec sleep_time; | ||
sleep_time.tv_sec = 0; | ||
sleep_time.tv_nsec = 1000000; | ||
nanosleep(&sleep_time, NULL); | ||
} else if(pid == 0) { | ||
// managed to fork, in child process | ||
// encode graph to disk | ||
int res = encode_graph(ctx, gc, copy_ctx); | ||
// all done, Redis require us to call 'RedisModule_ExitFromChild' | ||
RedisModule_ExitFromChild(res); | ||
return; | ||
} | ||
} | ||
|
||
// release READ lock | ||
Graph_ReleaseLock(gc->g); | ||
|
||
// clean up | ||
cleanup: | ||
|
||
// decrease src graph ref-count | ||
if(gc != NULL) { | ||
GraphContext_DecreaseRefCount(gc); | ||
} | ||
|
||
if(error) { | ||
// free command context only in the case of an error | ||
// otherwise the fork callback is responsible for freeing this context | ||
GraphCopyContext_Free(copy_ctx); | ||
} | ||
|
||
RedisModule_FreeThreadSafeContext(ctx); | ||
} | ||
|
||
// clone a graph | ||
// this function executes on Redis main thread | ||
// | ||
// usage: | ||
// GRAPH.COPY <src_graph> <dest_graph> | ||
int Graph_Copy | ||
( | ||
RedisModuleCtx *ctx, // redis module context | ||
RedisModuleString **argv, // command argument | ||
int argc // number of argument | ||
) { | ||
// validations | ||
ASSERT(ctx != NULL); | ||
ASSERT(argv != NULL); | ||
|
||
// expecting exactly 3 arguments: | ||
// argv[0] command name | ||
// argv[1] src_graph_id | ||
// argv[2] dest_graph_id | ||
if(argc != 3) { | ||
return RedisModule_WrongArity(ctx); | ||
} | ||
|
||
// block the client | ||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, | ||
NULL, 0); | ||
|
||
// retain arguments | ||
RedisModule_RetainString(ctx, argv[1]); | ||
RedisModule_RetainString(ctx, argv[2]); | ||
|
||
// create command context | ||
GraphCopyContext *context = GraphCopyContext_New(bc, argv[1], argv[2]); | ||
|
||
// add GRAPH.COPY as a cron task to run as soon as possible | ||
Cron_AddTask(0, _Graph_Copy, NULL, context); | ||
|
||
return REDISMODULE_OK; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, the implementation of the GRAPH.COPY
command in cmd_copy.c
introduces a comprehensive approach to graph duplication in RedisGraph. While the logic and flow of the command are well thought out, there are several areas where improvements can be made for security, error handling, and performance. Addressing these issues will enhance the robustness, scalability, and reliability of the graph copy functionality.
- Enhance security measures to prevent potential symlink attacks when creating temporary files.
- Improve error handling throughout the code to ensure graceful handling of exceptional conditions and memory allocation failures.
- Optimize performance by considering alternatives to loading entire graphs into memory for deserialization.
- Clarify and correct the use of replication commands to ensure data integrity and consistency across Redis instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 5
Configuration used: CodeRabbit UI
Files selected for processing (4)
- src/commands/cmd_copy.c (1 hunks)
- src/commands/cmd_restore.c (1 hunks)
- src/commands/commands.h (2 hunks)
- src/module.c (1 hunks)
Files skipped from review as they are similar to previous changes (3)
- src/commands/cmd_copy.c
- src/commands/commands.h
- src/module.c
Additional comments: 2
src/commands/cmd_restore.c (2)
- 20-20: The comment mentions that this function is meant to execute on the Redis main thread. It's crucial to ensure that long-running operations do not block the main thread, potentially affecting the performance of the Redis server. Consider verifying if the operation's duration is acceptable for main thread execution or if it should be offloaded to a background thread.
- 72-82: Registering the graph context for background saving after restoration is a crucial step for persistence. However, ensure that error handling is in place for scenarios where registration might fail, for example, due to memory constraints or issues within the
GraphContext_RegisterWithModule
function. It's important to communicate any failures back to the caller to prevent data loss or inconsistencies.
return RedisModule_WrongArity(ctx); | ||
} | ||
|
||
// TODO: reject GRAPH.RESTORE if caller isn't the master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a TODO comment about rejecting GRAPH.RESTORE
if the caller isn't the master. This is an important security measure to prevent unauthorized graph restoration. It's recommended to implement this check before proceeding with the restoration process to ensure that the command is executed in a secure context.
Would you like assistance in implementing this check?
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); | ||
int key_type = RedisModule_KeyType(key); | ||
RedisModule_CloseKey(key); | ||
|
||
// key exists, fail | ||
if(key_type != REDISMODULE_KEYTYPE_EMPTY) { | ||
RedisModule_ReplyWithError(ctx, "restore graph failed, key already exists"); | ||
return REDISMODULE_OK; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking if the graph key already exists before proceeding with the restore operation is a good practice to avoid unintentional data loss. However, it's important to ensure that this check is performed atomically with the restoration operation to prevent race conditions where the key might be created by another operation after the check but before the restore. Consider using Redis transactions or scripting to ensure atomicity.
FILE *stream = fmemopen((void*)payload, len, "r"); | ||
ASSERT(stream != NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using fmemopen
to create a memory stream from the payload is a clever approach for decoding. However, ensure that fmemopen
is available on all target platforms, as its availability might vary across different environments. Additionally, it's crucial to handle the case where fmemopen
returns NULL
, indicating an error, to prevent subsequent operations from failing unexpectedly.
- ASSERT(stream != NULL);
+ if (stream == NULL) {
+ RedisModule_ReplyWithError(ctx, "Failed to open memory stream for payload");
+ return REDISMODULE_ERR;
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
FILE *stream = fmemopen((void*)payload, len, "r"); | |
ASSERT(stream != NULL); | |
FILE *stream = fmemopen((void*)payload, len, "r"); | |
if (stream == NULL) { | |
RedisModule_ReplyWithError(ctx, "Failed to open memory stream for payload"); | |
return REDISMODULE_ERR; | |
} |
src/commands/cmd_restore.c
Outdated
GraphContext *gc = RdbLoadGraphContext_v14(io, argv[1]); | ||
ASSERT(gc != NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decoding the graph using RdbLoadGraphContext_v14
directly ties this implementation to a specific version of the decoding logic. While this might be necessary for compatibility reasons, consider abstracting the version-specific logic to allow for easier updates and maintenance in the future. This could involve a factory or strategy pattern to select the appropriate decoding function based on the payload's version.
SerializerIO_Free(&io); | ||
|
||
fclose(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup process correctly frees the serializer IO and closes the file stream. This is good practice for resource management. However, ensure that these cleanup operations are also performed in error handling paths to prevent resource leaks. Consider using a goto
statement or similar construct to centralize cleanup logic and ensure it's executed regardless of where the function exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (2)
- src/serializers/serializer_io.c (1 hunks)
- tests/unit/test_serializer.c (1 hunks)
Files skipped from review as they are similar to previous changes (2)
- src/serializers/serializer_io.c
- tests/unit/test_serializer.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (1)
- src/commands/cmd_copy.c (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/commands/cmd_copy.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Configuration used: CodeRabbit UI
Files selected for processing (8)
- src/commands/cmd_copy.c (1 hunks)
- src/commands/cmd_restore.c (1 hunks)
- src/serializers/decoders/current/v14/decode_graph.c (5 hunks)
- src/serializers/decoders/current/v14/decode_v14.h (1 hunks)
- src/serializers/decoders/decode_graph.c (1 hunks)
- src/serializers/encoder/encode_graph.c (1 hunks)
- src/serializers/encoder/v14/encode_graph.c (6 hunks)
- src/serializers/encoder/v14/encode_v14.h (1 hunks)
Files skipped from review as they are similar to previous changes (8)
- src/commands/cmd_copy.c
- src/commands/cmd_restore.c
- src/serializers/decoders/current/v14/decode_graph.c
- src/serializers/decoders/current/v14/decode_v14.h
- src/serializers/decoders/decode_graph.c
- src/serializers/encoder/encode_graph.c
- src/serializers/encoder/v14/encode_graph.c
- src/serializers/encoder/v14/encode_v14.h
* Add self hosted (#584) * add self hosted runner * change self hosted label * Revert "change self hosted label" This reverts commit 9c13255. * re-add arm64 for flow tests * remove runtime tests, remove extra prints (#575) * remove runtime tests, remove extra prints * update tests requirements * remove pathos requierment, switch to python asyncio * switch to asyncio.run * install RLTest directly from github repo * update dockerfile compiler * install redis * remove sudu * remove sudo * fix setup redis * fix install redis * use venv * set venv in path * fix sanitizer * fix sanitizer build * fix sanitier build * fix sanitizer build * fix sanitizer build * add clang to sanitizer * add libc6-dbg * . * add libomp * fix sanitizer * fix redis install * fix redis * reduce flow test parallelism * create connection pool inside async func and make sure to close it * close async connection * remove flushall * specify port for connection pool * address PR comments * Update build.yml * Update test_cache.py * Update test_concurrent_query.py * Update test_concurrent_query.py * Update test_edge_index_scans.py * Update test_encode_decode.py * Update test_graph_deletion.py * Update test_graph_deletion.py * Update test_path_filter.py * address PR comments --------- Co-authored-by: Avi Avni <avi.avni@gmail.com> * update deps (#587) * integrate falkordbrs (#544) * integrate falkordbrs * fix submodule * fix * fix * separate build for debug * build rust with sanitizer flags when needed * add workspace and update * fix leak * update * address review * use alpine image in sanitizer * move to ubuntu * enable cargo * fix * fix * fix * fix * update * use target dir * addres review * use nightly rust in sanitizer * address review * fix * fix * update build * fix * address review * build * update * update * fix build * update * fix codeql and address review * address review * add alpine * update for alpine * update * fix build * remove debian * update * update * use current headers instead of generated one * clean * fix for mac * document alloc funtion * move to ubuntu image * change docker tag * update to latest * update format * revert * fix leak * always compile rust * fix makefile * review * address review * address review * GRAPH.COPY (#585) * initial work graph.copy * pipe serializer * abstract serializer * graph encoder switch to abstract serializer * switch decoder to abstract serializer * copy command wip * first copy * early flow test * additional tests * transition to worker thread execution * switch to worker thread * retry when fork failed * skip graph copy replication test when running under sanitizer * import SANITIZER * plant cloned key only after decoding is done * switch to CRON job * fork writes to file * add logs * fix leak * replicate via RESTORE, switch to FILE* * add graph.restore * fix unit-test wrong stream acquisition * fork requires gil * rename encoder/decoder v14 to latest * version bump --------- Co-authored-by: Dudi <16744955+dudizimber@users.noreply.github.com> Co-authored-by: Avi Avni <avi.avni@gmail.com>
Add support for
GRAPH.COPY
commandUsage:
GRAPH.COPY <SRC> <DEST>
Resolves: #58, #138, #276
Summary by CodeRabbit
New Features
GRAPH.COPY
command for copying graphs, including non-blocking behavior for efficient operation.GRAPH.RESTORE
command for restoring graphs from binary representations directly in the Redis main thread.Refactor
SerializerIO
for better abstraction and compatibility with new serialization mechanisms.