Skip to content

Commit

Permalink
fix merge init (#609)
Browse files Browse the repository at this point in the history
* fix merge init

* remove unused function, simplified branch assignment logic

* clean

---------

Co-authored-by: Roi Lipman <roilipman@gmail.com>
  • Loading branch information
AviAvni and swilly22 authored Apr 4, 2024
1 parent 55355a7 commit 3ea7a3b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 79 deletions.
93 changes: 15 additions & 78 deletions src/execution_plan/ops/op_merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,32 +142,6 @@ OpBase *NewMergeOp
return (OpBase *)op;
}

// modification of ExecutionPlan_LocateOp that only follows LHS child
// Otherwise, the assumptions of Merge_SetStreams fail in MERGE..MERGE queries
// Match and Create streams are always guaranteed to not branch
// (have any ops with multiple children)
static OpBase *_LocateOp
(
OpBase *root,
OPType type
) {
OpBase *ret;

if(!root) {
ret = NULL;
}
else if(root->type == type) {
ret = root;
}
else if(root->childCount > 0) {
ret = _LocateOp(root->children[0], type);
} else {
ret = NULL;
}

return ret;
}

static OpResult MergeInit
(
OpBase *opBase
Expand All @@ -181,75 +155,38 @@ static OpResult MergeInit
ASSERT(opBase->childCount == 2 || opBase->childCount == 3);
OpMerge *op = (OpMerge *)opBase;
if(opBase->childCount == 2) {
// if we only have 2 streams
// we simply need to determine which has a MergeCreate op
if(_LocateOp(opBase->children[0], OPType_MERGE_CREATE)) {
// if the Create op is in the first stream, swap the children
// otherwise, the order is already correct
OpBase *tmp = opBase->children[0];
opBase->children[0] = opBase->children[1];
opBase->children[1] = tmp;
}

op->match_stream = opBase->children[0];
// if we only have 2 streams, the first one is the bound variable stream
// and the second is the match stream
op->match_stream = opBase->children[0];
op->create_stream = opBase->children[1];

ASSERT(OpBase_Type(op->create_stream) == OPType_MERGE_CREATE);
return OP_OK;
}

// handling the three-stream case
for(int i = 0; i < opBase->childCount; i ++) {
OpBase *child = opBase->children[i];

bool child_has_merge = _LocateOp(child, OPType_MERGE);
// neither Match stream and Create stream have a Merge op
// the bound variable stream will have a Merge op in-case of a merge merge query
// MERGE (a:A) MERGE (b:B)
// In which case the first Merge has yet to order its streams!
if(!op->bound_variable_stream && child_has_merge) {
op->bound_variable_stream = child;
continue;
}

bool child_has_argument = _LocateOp(child, OPType_ARGUMENT);
// The bound variable stream is the only stream not populated by an Argument op.
if(!op->bound_variable_stream && !child_has_argument) {
op->bound_variable_stream = child;
continue;
}

// The Create stream is the only stream with a MergeCreate op and Argument op.
if(!op->create_stream && _LocateOp(child, OPType_MERGE_CREATE) && child_has_argument) {
op->create_stream = child;
continue;
}

// The Match stream has an unknown set of operations, but is the only other stream
// populated by an Argument op.
if(!op->match_stream && child_has_argument) {
op->match_stream = child;
continue;
}
}
// if we have 3 streams, the first is the bound variable stream
// the second is the match stream, and the third is the create stream
op->bound_variable_stream = opBase->children[0];
op->match_stream = opBase->children[1];
op->create_stream = opBase->children[2];

ASSERT(OpBase_Type(op->create_stream) == OPType_MERGE_CREATE);
ASSERT(op->bound_variable_stream != NULL &&
op->match_stream != NULL &&
op->create_stream != NULL);

// migrate the children so that EXPLAIN calls print properly
opBase->children[0] = op->bound_variable_stream;
opBase->children[1] = op->match_stream;
opBase->children[2] = op->create_stream;
op->match_stream != NULL &&
op->create_stream != NULL);

// find and store references to the:
// Argument taps for the Match and Create streams
// the Match stream is populated by an Argument tap
// store a reference to it
op->match_argument_tap =
(Argument *)ExecutionPlan_LocateOp(op->match_stream, OPType_ARGUMENT);
ASSERT(op->match_argument_tap != NULL);

// if the create stream is populated by an Argument tap, store a reference to it.
op->create_argument_tap =
(Argument *)ExecutionPlan_LocateOp(op->create_stream, OPType_ARGUMENT);
ASSERT(op->create_argument_tap != NULL);

// set up an array to store records produced by the bound variable stream
op->input_records = array_new(Record, 1);
Expand Down
16 changes: 15 additions & 1 deletion tests/flow/test_call_subquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2104,7 +2104,7 @@ def test31_following_scans(self):
scan = locate_operation(plan.structured_plan, "Conditional Traverse")
self.env.assertEquals(str(scan), "Conditional Traverse | (n:N)->(n:N)")

def test32_rewrite_callsubquery(self):
def test32_rewrite_call_subquery(self):
self.graph.delete()

# create the node (:N {v: 1})
Expand All @@ -2120,3 +2120,17 @@ def test32_rewrite_callsubquery(self):
}
RETURN 0""")
self.env.assertEquals(res.result_set, [[0]])

def test33_merge_after_call_subquery(self):
self.graph.delete()

# create the node (:N {v: 1})
self.graph.query("CREATE (:N {v: 1})")
res = self.graph.query("""
CALL {
RETURN 1 AS x
}
MERGE (n:N {v: 1})
RETURN n.v
""")
self.env.assertEquals(res.result_set, [[1]])

0 comments on commit 3ea7a3b

Please sign in to comment.