Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement delta matrix in rust and move to use incidence matrix #595

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
615fc27
wip delta matrix
AviAvni Mar 25, 2024
b1ef4a1
Merge branch 'master' into delta-matrix
AviAvni Mar 25, 2024
fa17331
fix tabs and comments
AviAvni Mar 25, 2024
cbbb241
fix tests
AviAvni Mar 26, 2024
89d6c98
update
AviAvni Mar 26, 2024
33940a5
fix build
AviAvni Mar 28, 2024
abdfff5
update
AviAvni Mar 28, 2024
f3e5f01
update
AviAvni Mar 28, 2024
ace5c2b
update
AviAvni Apr 2, 2024
5c9f4d8
Merge branch 'master' into delta-matrix
AviAvni Apr 2, 2024
81f5a8d
Merge branch 'master' into delta-matrix
AviAvni Apr 7, 2024
d8b72a7
Merge branch 'master' into delta-matrix
AviAvni Apr 9, 2024
4937899
address review
AviAvni Apr 16, 2024
f5ac937
Merge branch 'master' into delta-matrix
AviAvni Apr 16, 2024
4cf3f0d
rename RG to Delta
AviAvni Apr 16, 2024
7111fc4
address review
AviAvni May 5, 2024
70e02f0
address review
AviAvni May 5, 2024
a25da1c
Merge branch 'master' into delta-matrix
AviAvni May 5, 2024
b53a776
fix merge
AviAvni May 5, 2024
eeb387b
address review
AviAvni May 5, 2024
d6332b1
address review
AviAvni May 5, 2024
5ab1b5a
implement new incident matrix approach
AviAvni May 8, 2024
f1b57ab
Merge branch 'master' into delta-matrix
AviAvni May 15, 2024
2ae6ed8
address review
AviAvni May 15, 2024
def1a16
address review
AviAvni May 16, 2024
ebfe445
address review
AviAvni May 16, 2024
09d1b7a
fix
AviAvni May 16, 2024
903c00a
address review
AviAvni May 19, 2024
4f340b4
Merge branch 'master' into delta-matrix
AviAvni May 19, 2024
a558020
address review
AviAvni May 19, 2024
7ebafeb
address review
AviAvni May 20, 2024
b5d14f7
address review
AviAvni May 20, 2024
a58a624
address review
AviAvni May 20, 2024
f8b3713
Merge branch 'master' into delta-matrix
AviAvni May 20, 2024
41ec2d2
address review
AviAvni May 20, 2024
a05199b
Merge branch 'delta-matrix' of https://github.com/FalkorDB/FalkorDB i…
AviAvni May 20, 2024
2aa805f
iterators mutch more efficient
AviAvni May 21, 2024
fec36c1
run rust unit tests
AviAvni May 21, 2024
980eb5a
rewrite test
AviAvni May 21, 2024
3bcf342
rewrite tests
AviAvni May 22, 2024
b7b5dda
address review
AviAvni May 22, 2024
93088e6
rewrite tests
AviAvni May 26, 2024
e7deba3
fix make file
AviAvni May 26, 2024
96508e8
rewrite tests
AviAvni May 28, 2024
144d546
update
AviAvni May 29, 2024
d04f36e
address review
AviAvni May 29, 2024
81e21f6
address review
AviAvni May 29, 2024
7e1dbab
fix
AviAvni May 29, 2024
fdcf95e
Merge branch 'master' into delta-matrix
AviAvni May 30, 2024
23d5056
Merge branch 'master' into delta-matrix
AviAvni Jun 6, 2024
eb46032
address review
AviAvni Jun 16, 2024
929bf84
improve perf
AviAvni Jun 17, 2024
43308cf
fix
AviAvni Jun 17, 2024
7be62f0
fix cond
AviAvni Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ set(FALKORDB_OBJECTS $<TARGET_OBJECTS:falkordb>)
find_package(OpenSSL)

lists_from_env(GRAPHBLAS LIBXXHASH RAX LIBCYPHER_PARSER REDISEARCH_LIBS UTF8PROC ONIGURUMA FalkorDBRS)
set(FALKORDB_LIBS ${GRAPHBLAS} ${LIBXXHASH} ${RAX} ${LIBCYPHER_PARSER} ${REDISEARCH_LIBS} ${UTF8PROC} ${ONIGURUMA} ${FalkorDBRS} OpenSSL::SSL)
set(FALKORDB_LIBS ${FalkorDBRS} ${GRAPHBLAS} ${LIBXXHASH} ${RAX} ${LIBCYPHER_PARSER} ${REDISEARCH_LIBS} ${UTF8PROC} ${ONIGURUMA} OpenSSL::SSL)

target_link_options(falkordb PRIVATE ${CMAKE_LD_FLAGS_LIST} ${CMAKE_SO_LD_FLAGS_LIST})
target_link_libraries(falkordb PRIVATE ${FALKORDB_LIBS} ${CMAKE_LD_LIBS})
Expand Down
2 changes: 1 addition & 1 deletion deps/FalkorDB-rs
2 changes: 1 addition & 1 deletion src/algorithms/all_neighbors.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ EntityID AllNeighborsCtx_NextNeighbor
RG_MatrixTupleIter *it = &ctx->levels[ctx->current_level];

GrB_Index dest_id;
GrB_Info info = RG_MatrixTupleIter_next_UINT64(it, NULL, &dest_id, NULL);
GrB_Info info = RG_MatrixTupleIter_next_BOOL(it, NULL, &dest_id, NULL);

if(info == GxB_EXHAUSTED) {
// backtrack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ RG_Matrix _Eval_Add
// `res` is in use, create an additional matrix
RG_Matrix_nrows(&nrows, res);
RG_Matrix_ncols(&ncols, res);
info = RG_Matrix_new(&inter, GrB_BOOL, nrows, ncols);
info = RG_Matrix_new(&inter, GrB_BOOL, nrows, ncols, false);
ASSERT(info == GrB_SUCCESS);
B = AlgebraicExpression_Eval(right, inter);
} else {
Expand Down Expand Up @@ -78,7 +78,7 @@ RG_Matrix _Eval_Add
// can't use `res`, use an intermidate matrix
RG_Matrix_nrows(&nrows, res);
RG_Matrix_ncols(&ncols, res);
info = RG_Matrix_new(&inter, GrB_BOOL, nrows, ncols);
info = RG_Matrix_new(&inter, GrB_BOOL, nrows, ncols, false);
ASSERT(info == GrB_SUCCESS);
}
AlgebraicExpression_Eval(right, inter);
Expand Down
144 changes: 73 additions & 71 deletions src/bulk_insert/bulk_insert.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ static int* _BulkInsert_ReadHeaderLabels
ASSERT(data != NULL);
ASSERT(data_idx != NULL);

// first sequence is entity label(s)
const char* labels = data + *data_idx;
int labels_len = strlen(labels);
*data_idx += labels_len + 1;
// first sequence is entity label(s)
const char* labels = data + *data_idx;
int labels_len = strlen(labels);
*data_idx += labels_len + 1;

// array of all label IDs
int* label_ids = array_new(int, 1);
// stack variable to contain a single label
char label[labels_len + 1];
// array of all label IDs
int* label_ids = array_new(int, 1);
// stack variable to contain a single label
char label[labels_len + 1];

while (true) {
// look for a colon delimiting another label
Expand Down Expand Up @@ -82,7 +82,7 @@ static int* _BulkInsert_ReadHeaderLabels
if (!found) break;
}

return label_ids;
return label_ids;
}

// read the property keys from a header
Expand All @@ -99,15 +99,15 @@ static AttributeID* _BulkInsert_ReadHeaderProperties
ASSERT(data_idx != NULL);
ASSERT(prop_count != NULL);

// next 4 bytes are property count
*prop_count = *(uint*)&data[*data_idx];
*data_idx += sizeof(unsigned int);
// next 4 bytes are property count
*prop_count = *(uint*)&data[*data_idx];
*data_idx += sizeof(unsigned int);

if (*prop_count == 0) return NULL;
if (*prop_count == 0) return NULL;

AttributeID* prop_indices = rm_malloc(*prop_count * sizeof(AttributeID));
AttributeID* prop_indices = rm_malloc(*prop_count * sizeof(AttributeID));

// the rest of the line is [char *prop_key] * prop_count
// the rest of the line is [char *prop_key] * prop_count
for (uint j = 0; j < *prop_count; j++) {
char* prop_key = (char*)data + *data_idx;
*data_idx += strlen(prop_key) + 1;
Expand All @@ -116,7 +116,7 @@ static AttributeID* _BulkInsert_ReadHeaderProperties
prop_indices[j] = GraphContext_FindOrAddAttribute(gc, prop_key, NULL);
}

return prop_indices;
return prop_indices;
}

// read an SIValue from the data stream and update the index appropriately
Expand All @@ -125,7 +125,7 @@ static SIValue _BulkInsert_ReadProperty
const char* data,
size_t* data_idx
) {
// binary property format:
// binary property format:
// - property type : 1-byte integer corresponding to TYPE enum
// - Nothing if type is NULL
// - 1-byte true/false if type is boolean
Expand All @@ -134,16 +134,16 @@ static SIValue _BulkInsert_ReadProperty
// - Null-terminated C string if type is string
// - 8-byte array length followed by N values if type is array

// possible property values
bool b;
double d;
int64_t i;
int64_t len;
const char* s;
// possible property values
bool b;
double d;
int64_t i;
int64_t len;
const char* s;

SIValue v = SI_NullVal();
TYPE t = data[*data_idx];
*data_idx += 1;
SIValue v = SI_NullVal();
TYPE t = data[*data_idx];
*data_idx += 1;

switch (t) {
case BI_NULL:
Expand Down Expand Up @@ -191,7 +191,7 @@ static SIValue _BulkInsert_ReadProperty
break;
}

return v;
return v;
}

static int _BulkInsert_ProcessNodeFile
Expand All @@ -200,30 +200,30 @@ static int _BulkInsert_ProcessNodeFile
const char* data,
size_t data_len
) {
uint prop_count;
size_t data_idx = 0;

// read the CSV file header labels and update all schemas
int* label_ids = _BulkInsert_ReadHeaderLabels(gc, SCHEMA_NODE, data, &data_idx);
uint label_count = array_len(label_ids);
// read the CSV header properties and collect their indices
AttributeID* prop_indices = _BulkInsert_ReadHeaderProperties(gc, SCHEMA_NODE, data,
uint prop_count;
size_t data_idx = 0;

// read the CSV file header labels and update all schemas
int* label_ids = _BulkInsert_ReadHeaderLabels(gc, SCHEMA_NODE, data, &data_idx);
uint label_count = array_len(label_ids);
// read the CSV header properties and collect their indices
AttributeID* prop_indices = _BulkInsert_ReadHeaderProperties(gc, SCHEMA_NODE, data,
&data_idx, &prop_count);

// sync each matrix once
ASSERT(Graph_GetMatrixPolicy(gc->g) == SYNC_POLICY_RESIZE);
// sync each matrix once
ASSERT(Graph_GetMatrixPolicy(gc->g) == SYNC_POLICY_RESIZE);

for (uint i = 0; i < label_count; i++) {
Graph_GetLabelMatrix(gc->g, label_ids[i]);
}

// sync node-label matrix
Graph_GetNodeLabelMatrix(gc->g);
Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_NOP);
// sync node-label matrix
Graph_GetNodeLabelMatrix(gc->g);
Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_NOP);

//--------------------------------------------------------------------------
// load nodes
//--------------------------------------------------------------------------
//--------------------------------------------------------------------------
// load nodes
//--------------------------------------------------------------------------

while (data_idx < data_len) {
Node n = GE_NEW_NODE();
Expand All @@ -240,11 +240,11 @@ static int _BulkInsert_ProcessNodeFile
}
}

Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_RESIZE);
if (prop_indices) rm_free(prop_indices);
array_free(label_ids);
Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_RESIZE);
if (prop_indices) rm_free(prop_indices);
array_free(label_ids);

return BULK_OK;
return BULK_OK;
}

static int _BulkInsert_ProcessEdgeFile
Expand All @@ -253,31 +253,33 @@ static int _BulkInsert_ProcessEdgeFile
const char* data,
size_t data_len
) {
int relation_id;
uint prop_count;
size_t data_idx = 0;
int relation_id;
uint prop_count;
size_t data_idx = 0;

// read the CSV file header
// and commit all labels and properties it introduces
int* type_ids = _BulkInsert_ReadHeaderLabels(gc, SCHEMA_EDGE, data, &data_idx);
uint type_count = array_len(type_ids);
// read the CSV file header
// and commit all labels and properties it introduces
int* type_ids = _BulkInsert_ReadHeaderLabels(gc, SCHEMA_EDGE, data, &data_idx);
uint type_count = array_len(type_ids);

// edges can only have one type
ASSERT(type_count == 1);
// edges can only have one type
ASSERT(type_count == 1);

int type_id = type_ids[0];
AttributeID* prop_indices = _BulkInsert_ReadHeaderProperties(gc, SCHEMA_EDGE,
int type_id = type_ids[0];
AttributeID* prop_indices = _BulkInsert_ReadHeaderProperties(gc, SCHEMA_EDGE,
data, &data_idx, &prop_count);

// sync matrix once
ASSERT(Graph_GetMatrixPolicy(gc->g) == SYNC_POLICY_RESIZE);
Graph_GetRelationMatrix(gc->g, type_id, false);
Graph_GetAdjacencyMatrix(gc->g, false);
Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_NOP);
// sync matrix once
ASSERT(Graph_GetMatrixPolicy(gc->g) == SYNC_POLICY_RESIZE);
Graph_GetRelationMatrix(gc->g, type_id, false);
Graph_GetSourceRelationMatrix(gc->g, type_id, false);
Graph_GetTargetRelationMatrix(gc->g, type_id, false);
Graph_GetAdjacencyMatrix(gc->g, false);
Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_NOP);

//--------------------------------------------------------------------------
// load edges
//--------------------------------------------------------------------------
//--------------------------------------------------------------------------
// load edges
//--------------------------------------------------------------------------

while (data_idx < data_len) {
Edge e;
Expand Down Expand Up @@ -305,11 +307,11 @@ static int _BulkInsert_ProcessEdgeFile
}
}

array_free(type_ids);
if (prop_indices) rm_free(prop_indices);
Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_RESIZE);
array_free(type_ids);
if (prop_indices) rm_free(prop_indices);
Graph_SetMatrixPolicy(gc->g, SYNC_POLICY_RESIZE);

return BULK_OK;
return BULK_OK;
}

static int _BulkInsert_ProcessTokens
Expand All @@ -330,7 +332,7 @@ static int _BulkInsert_ProcessTokens
ASSERT(rc == BULK_OK);
}

return BULK_OK;
return BULK_OK;
}

int BulkInsert
Expand Down
39 changes: 12 additions & 27 deletions src/constraint/constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ void Constraint_EnforceEdges
EntityID dest_id = 0; // current processed column idx
EntityID edge_id = 0; // current processed edge id
EntityID prev_src_id = 0; // last processed row idx
EntityID prev_dest_id = 0; // last processed column idx
EntityID prev_edge_id = 0; // last processed column idx
int enforced = 0; // # entities enforced in batch
int schema_id = c->schema_id; // edge relationship type ID
int batch_size = 1000; // max number of entities to enforce
Expand All @@ -523,11 +523,11 @@ void Constraint_EnforceEdges
// reset number of enforced edges in batch
enforced = 0;
prev_src_id = src_id;
prev_dest_id = dest_id;
prev_edge_id = edge_id;

// fetch relation matrix
ASSERT(Graph_GetMatrixPolicy(g) == SYNC_POLICY_FLUSH_RESIZE);
const RG_Matrix m = Graph_GetRelationMatrix(g, schema_id, false);
const RG_Matrix m = Graph_GetSourceRelationMatrix(g, schema_id, false);
ASSERT(m != NULL);

//----------------------------------------------------------------------
Expand All @@ -538,10 +538,10 @@ void Constraint_EnforceEdges
ASSERT(info == GrB_SUCCESS);

// skip previously enforced edges
while((info = RG_MatrixTupleIter_next_UINT64(&it, &src_id, &dest_id,
&edge_id)) == GrB_SUCCESS &&
while((info = RG_MatrixTupleIter_next_BOOL(&it, &src_id, &edge_id,
NULL)) == GrB_SUCCESS &&
src_id == prev_src_id &&
dest_id < prev_dest_id);
edge_id != prev_edge_id);

// process only if iterator is on an active entry
if(info != GrB_SUCCESS) {
Expand All @@ -558,30 +558,15 @@ void Constraint_EnforceEdges
e.dest_id = dest_id;
e.relationID = schema_id;

if(SINGLE_EDGE(edge_id)) {
bool res = Graph_GetEdge(g, edge_id, &e);
assert(res == true);
if(!c->enforce(c, (GraphEntity*)&e, NULL)) {
holds = false;
break;
}
} else {
EdgeID *edgeIds = (EdgeID *)(CLEAR_MSB(edge_id));
uint edgeCount = array_len(edgeIds);

for(uint i = 0; i < edgeCount; i++) {
edge_id = edgeIds[i];
bool res = Graph_GetEdge(g, edge_id, &e);
assert(res == true);
if(!c->enforce(c, (GraphEntity*)&e, NULL)) {
holds = false;
break;
}
}
bool res = Graph_GetEdge(g, edge_id, &e);
assert(res == true);
if(!c->enforce(c, (GraphEntity*)&e, NULL)) {
holds = false;
break;
}
enforced++; // single/multi edge are counted similarly
} while(enforced < batch_size &&
RG_MatrixTupleIter_next_UINT64(&it, &src_id, &dest_id, &edge_id)
RG_MatrixTupleIter_next_BOOL(&it, &src_id, &edge_id, NULL)
== GrB_SUCCESS && holds);

//----------------------------------------------------------------------
Expand Down
8 changes: 4 additions & 4 deletions src/execution_plan/ops/op_conditional_traverse.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ static void CondTraverseToString(const OpBase *ctx, sds *buf) {
}

static void _populate_filter_matrix(OpCondTraverse *op) {
GrB_Matrix FM = RG_MATRIX_M(op->F);
GrB_Matrix FM = RG_Matrix_M(op->F);

// clear filter matrix
GrB_Matrix_clear(FM);
Expand All @@ -51,8 +51,8 @@ void _traverse(OpCondTraverse *op) {
if(op->F == NULL) {
// create both filter and result matrices
size_t required_dim = Graph_RequiredMatrixDim(op->graph);
RG_Matrix_new(&op->M, GrB_BOOL, op->record_cap, required_dim);
RG_Matrix_new(&op->F, GrB_BOOL, op->record_cap, required_dim);
RG_Matrix_new(&op->M, GrB_BOOL, op->record_cap, required_dim, false);
RG_Matrix_new(&op->F, GrB_BOOL, op->record_cap, required_dim, false);

// prepend filter matrix to algebraic expression as the leftmost operand
AlgebraicExpression_MultiplyToTheLeft(&op->ae, op->F);
Expand Down Expand Up @@ -142,7 +142,7 @@ static Record CondTraverseConsume(OpBase *opBase) {
NodeID dest_id = INVALID_ENTITY_ID;

while(true) {
GrB_Info info = RG_MatrixTupleIter_next_UINT64(&op->iter, &src_id, &dest_id, NULL);
GrB_Info info = RG_MatrixTupleIter_next_BOOL(&op->iter, &src_id, &dest_id, NULL);

// Managed to get a tuple, break.
if(info == GrB_SUCCESS) break;
Expand Down