Skip to content

Commit

Permalink
added clone for create, merge, merge_create, update (RedisGraph#969)
Browse files Browse the repository at this point in the history
* added clone for create, merge, merge_create, update

* Update ast_shared.c

* Update ast_shared.c

Co-authored-by: Roi Lipman <swilly22@users.noreply.github.com>
  • Loading branch information
DvirDukhan and swilly22 authored Feb 27, 2020
1 parent b4d5a1d commit ced050d
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 4 deletions.
40 changes: 40 additions & 0 deletions src/ast/ast_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ PropertyMap *PropertyMap_New(GraphContext *gc, const cypher_astnode_t *props) {
return map;
}

static PropertyMap *_PropertyMap_Clone(PropertyMap *map) {
PropertyMap *clone = rm_malloc(sizeof(PropertyMap));
uint prop_count = map->property_count;
clone->keys = rm_malloc(prop_count * sizeof(Attribute_ID));
clone->values = rm_malloc(prop_count * sizeof(AR_ExpNode *));
clone->property_count = prop_count;
memcpy(clone->keys, map->keys, prop_count * sizeof(Attribute_ID));
for(uint i = 0; i < prop_count; i++) clone->values[i] = AR_EXP_Clone(map->values[i]);

return clone;
}

void PropertyMap_Free(PropertyMap *map) {
if(map == NULL) return;

Expand All @@ -90,3 +102,31 @@ void PropertyMap_Free(PropertyMap *map) {
rm_free(map->values);
rm_free(map);
}

EntityUpdateEvalCtx EntityUpdateEvalCtx_Clone(EntityUpdateEvalCtx ctx) {
EntityUpdateEvalCtx clone;
clone.alias = ctx.alias;
clone.attribute = ctx.attribute;
clone.attribute_idx = ctx.attribute_idx;
clone.exp = AR_EXP_Clone(ctx.exp);
clone.record_idx = ctx.record_idx;
return clone;
}

NodeCreateCtx NodeCreateCtx_Clone(NodeCreateCtx ctx) {
NodeCreateCtx clone;
clone.node = ctx.node;
clone.node_idx = ctx.node_idx;
clone.properties = _PropertyMap_Clone(ctx.properties);
return clone;
}

EdgeCreateCtx EdgeCreateCtx_Clone(EdgeCreateCtx ctx) {
EdgeCreateCtx clone;
clone.edge = ctx.edge;
clone.src_idx = ctx.src_idx;
clone.dest_idx = ctx.dest_idx;
clone.edge_idx = ctx.edge_idx;
clone.properties = _PropertyMap_Clone(ctx.properties);
return clone;
}
9 changes: 9 additions & 0 deletions src/ast/ast_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,14 @@ AST_Operator AST_ConvertOperatorNode(const cypher_operator_t *op);
// Convert a map of properties from the AST into a set of attribute ID keys and AR_ExpNode values.
PropertyMap *PropertyMap_New(GraphContext *gc, const cypher_astnode_t *props);

// Clone EntityUpdateEvalCtx.
EntityUpdateEvalCtx EntityUpdateEvalCtx_Clone(EntityUpdateEvalCtx ctx);

// Clone NodeCreateCtx.
NodeCreateCtx NodeCreateCtx_Clone(NodeCreateCtx ctx);

// Clone EdgeCreateCtx.
EdgeCreateCtx EdgeCreateCtx_Clone(EdgeCreateCtx ctx);

void PropertyMap_Free(PropertyMap *map);

13 changes: 12 additions & 1 deletion src/execution_plan/ops/op_create.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

/* Forward declarations. */
static Record CreateConsume(OpBase *opBase);
static OpBase *CreateClone(const ExecutionPlan *plan, const OpBase *opBase);
static void CreateFree(OpBase *opBase);

OpBase *NewCreateOp(const ExecutionPlan *plan, NodeCreateCtx *nodes, EdgeCreateCtx *edges) {
Expand All @@ -21,7 +22,7 @@ OpBase *NewCreateOp(const ExecutionPlan *plan, NodeCreateCtx *nodes, EdgeCreateC
op->pending = NewPendingCreationsContainer(nodes, edges); // Prepare all creation variables.
// Set our Op operations
OpBase_Init((OpBase *)op, OPType_CREATE, "Create", NULL, CreateConsume,
NULL, NULL, NULL, CreateFree, true, plan);
NULL, NULL, CreateClone, CreateFree, true, plan);

uint node_blueprint_count = array_len(nodes);
uint edge_blueprint_count = array_len(edges);
Expand Down Expand Up @@ -147,6 +148,16 @@ static Record CreateConsume(OpBase *opBase) {
return _handoff(op);
}

static OpBase *CreateClone(const ExecutionPlan *plan, const OpBase *opBase) {
assert(opBase->type == OPType_CREATE);
OpCreate *op = (OpCreate *)opBase;
NodeCreateCtx *nodes;
EdgeCreateCtx *edges;
array_clone_with_cb(nodes, op->pending.nodes_to_create, NodeCreateCtx_Clone);
array_clone_with_cb(edges, op->pending.edges_to_create, EdgeCreateCtx_Clone);
return NewCreateOp(plan, nodes, edges);
}

static void CreateFree(OpBase *ctx) {
OpCreate *op = (OpCreate *)ctx;

Expand Down
13 changes: 12 additions & 1 deletion src/execution_plan/ops/op_merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
/* Forward declarations. */
static OpResult MergeInit(OpBase *opBase);
static Record MergeConsume(OpBase *opBase);
static OpBase *MergeClone(const ExecutionPlan *plan, const OpBase *opBase);
static void MergeFree(OpBase *opBase);

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -94,7 +95,7 @@ OpBase *NewMergeOp(const ExecutionPlan *plan, EntityUpdateEvalCtx *on_match,
op->on_match = on_match;
op->on_create = on_create;
// Set our Op operations
OpBase_Init((OpBase *)op, OPType_MERGE, "Merge", MergeInit, MergeConsume, NULL, NULL, NULL,
OpBase_Init((OpBase *)op, OPType_MERGE, "Merge", MergeInit, MergeConsume, NULL, NULL, MergeClone,
MergeFree, true, plan);

if(op->on_match) {
Expand Down Expand Up @@ -307,6 +308,16 @@ static Record MergeConsume(OpBase *opBase) {
return _handoff(op);
}

static OpBase *MergeClone(const ExecutionPlan *plan, const OpBase *opBase) {
assert(opBase->type == OPType_MERGE);
OpMerge *op = (OpMerge *)opBase;
EntityUpdateEvalCtx *on_match;
EntityUpdateEvalCtx *on_create;
array_clone_with_cb(on_match, op->on_match, EntityUpdateEvalCtx_Clone);
array_clone_with_cb(on_create, op->on_create, EntityUpdateEvalCtx_Clone);
return NewMergeOp(plan, on_match, on_create);
}

static void MergeFree(OpBase *opBase) {
OpMerge *op = (OpMerge *)opBase;
if(op->input_records) {
Expand Down
13 changes: 12 additions & 1 deletion src/execution_plan/ops/op_merge_create.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

/* Forward declarations. */
static Record MergeCreateConsume(OpBase *opBase);
static OpBase *MergeCreateClone(const ExecutionPlan *plan, const OpBase *opBase);
static void MergeCreateFree(OpBase *opBase);

// Convert a graph entity's components into an identifying hash code.
Expand Down Expand Up @@ -62,7 +63,7 @@ OpBase *NewMergeCreateOp(const ExecutionPlan *plan, NodeCreateCtx *nodes, EdgeCr

// Set our Op operations
OpBase_Init((OpBase *)op, OPType_MERGE_CREATE, "MergeCreate", NULL, MergeCreateConsume,
NULL, NULL, NULL, MergeCreateFree, true, plan);
NULL, NULL, MergeCreateClone, MergeCreateFree, true, plan);

uint node_blueprint_count = array_len(nodes);
uint edge_blueprint_count = array_len(edges);
Expand Down Expand Up @@ -223,6 +224,16 @@ void MergeCreate_Commit(OpBase *opBase) {
CommitNewEntities(opBase, &op->pending);
}

static OpBase *MergeCreateClone(const ExecutionPlan *plan, const OpBase *opBase) {
assert(opBase->type == OPType_MERGE_CREATE);
OpMergeCreate *op = (OpMergeCreate *)opBase;
NodeCreateCtx *nodes;
EdgeCreateCtx *edges;
array_clone_with_cb(nodes, op->pending.nodes_to_create, NodeCreateCtx_Clone);
array_clone_with_cb(edges, op->pending.edges_to_create, EdgeCreateCtx_Clone);
return NewMergeCreateOp(plan, nodes, edges);
}

static void MergeCreateFree(OpBase *ctx) {
OpMergeCreate *op = (OpMergeCreate *)ctx;

Expand Down
11 changes: 10 additions & 1 deletion src/execution_plan/ops/op_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
static OpResult UpdateInit(OpBase *opBase);
static Record UpdateConsume(OpBase *opBase);
static OpResult UpdateReset(OpBase *opBase);
static OpBase *UpdateClone(const ExecutionPlan *plan, const OpBase *opBase);
static void UpdateFree(OpBase *opBase);

/* Delay updates until all entities are processed,
Expand Down Expand Up @@ -152,7 +153,7 @@ OpBase *NewUpdateOp(const ExecutionPlan *plan, EntityUpdateEvalCtx *update_exps)

// Set our Op operations
OpBase_Init((OpBase *)op, OPType_UPDATE, "Update", UpdateInit, UpdateConsume,
UpdateReset, NULL, NULL, UpdateFree, true, plan);
UpdateReset, NULL, UpdateClone, UpdateFree, true, plan);

for(uint i = 0; i < op->update_expressions_count; i ++) {
op->update_expressions[i].record_idx = OpBase_Modifies((OpBase *)op, update_exps[i].alias);
Expand Down Expand Up @@ -213,6 +214,14 @@ static Record UpdateConsume(OpBase *opBase) {
return _handoff(op);
}

static OpBase *UpdateClone(const ExecutionPlan *plan, const OpBase *opBase) {
assert(opBase->type == OPType_UPDATE);
OpUpdate *op = (OpUpdate *)opBase;
EntityUpdateEvalCtx *update_exps;
array_clone_with_cb(update_exps, op->update_expressions, EntityUpdateEvalCtx_Clone);
return NewUpdateOp(plan, update_exps);
}

static OpResult UpdateReset(OpBase *ctx) {
OpUpdate *op = (OpUpdate *)ctx;
// Reset all pending updates.
Expand Down

0 comments on commit ced050d

Please sign in to comment.