Skip to content
Permalink
Browse files
Fix bug in openCypher CREATE AGE2-345
This fixes a bug in the openCypher execution logic for the CREATE
command. See Jira ticket AGE2-337 and Github issue #88 for more
details.

This bug causes block transactions to not see the previous creates.
This is due to the currentCommandId not actually being incremented.
This patch corrects that by using the following functions to
manipulate it -

     GetCurrentCommandId(true); and
     CommandCounterIncrement();

There is a bit of a rewrite of the function exec_cypher_create in
order to accomodate this change.

While working on this I noticed that we probably should revisit the
logic for the openCypher commands set and delete as they also use
the previous method for incrementing the currentCommandId. But, these
look to be too involved for adding it here.

Added regression tests.

jrg
  • Loading branch information
jrgemignani committed Aug 19, 2021
1 parent 7c990fa commit a6fba6daf2354e5aeedda338d687d3b278964f70
Showing 3 changed files with 111 additions and 43 deletions.
@@ -591,13 +591,61 @@ SELECT count(*) FROM simple_path;
1
(1 row)

--
-- check the cypher CREATE clause inside of a BEGIN/END/COMMIT block
--
BEGIN;
SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '670'}) $$) as (a agtype);
a
---
(0 rows)

SELECT * FROM cypher('cypher_create', $$ MATCH (a:Part) RETURN a $$) as (a agtype);
a
--------------------------------------------------------------------------------------
{"id": 3659174697238529, "label": "Part", "properties": {"part_num": "670"}}::vertex
(1 row)

SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '671'}) $$) as (a agtype);
a
---
(0 rows)

SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '672'}) $$) as (a agtype);
a
---
(0 rows)

SELECT * FROM cypher('cypher_create', $$ MATCH (a:Part) RETURN a $$) as (a agtype);
a
--------------------------------------------------------------------------------------
{"id": 3659174697238529, "label": "Part", "properties": {"part_num": "670"}}::vertex
{"id": 3659174697238530, "label": "Part", "properties": {"part_num": "671"}}::vertex
{"id": 3659174697238531, "label": "Part", "properties": {"part_num": "672"}}::vertex
(3 rows)

SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '673'}) $$) as (a agtype);
a
---
(0 rows)

SELECT * FROM cypher('cypher_create', $$ MATCH (a:Part) RETURN a $$) as (a agtype);
a
--------------------------------------------------------------------------------------
{"id": 3659174697238529, "label": "Part", "properties": {"part_num": "670"}}::vertex
{"id": 3659174697238530, "label": "Part", "properties": {"part_num": "671"}}::vertex
{"id": 3659174697238531, "label": "Part", "properties": {"part_num": "672"}}::vertex
{"id": 3659174697238532, "label": "Part", "properties": {"part_num": "673"}}::vertex
(4 rows)

END;
--
-- Clean up
--
DROP TABLE simple_path;
DROP FUNCTION create_test;
SELECT drop_graph('cypher_create', true);
NOTICE: drop cascades to 12 other objects
NOTICE: drop cascades to 13 other objects
DETAIL: drop cascades to table cypher_create._ag_label_vertex
drop cascades to table cypher_create._ag_label_edge
drop cascades to table cypher_create.v
@@ -610,6 +658,7 @@ drop cascades to table cypher_create.new_vertex
drop cascades to table cypher_create.existing_vlabel
drop cascades to table cypher_create.existing_elabel
drop cascades to table cypher_create.knows
drop cascades to table cypher_create."Part"
NOTICE: graph "cypher_create" has been dropped
drop_graph
------------
@@ -283,6 +283,22 @@ INSERT INTO simple_path(SELECT * FROM cypher('cypher_create',
$$) AS (u agtype, e agtype, v agtype));

SELECT count(*) FROM simple_path;

--
-- check the cypher CREATE clause inside of a BEGIN/END/COMMIT block
--
BEGIN;
SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '670'}) $$) as (a agtype);
SELECT * FROM cypher('cypher_create', $$ MATCH (a:Part) RETURN a $$) as (a agtype);

SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '671'}) $$) as (a agtype);
SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '672'}) $$) as (a agtype);
SELECT * FROM cypher('cypher_create', $$ MATCH (a:Part) RETURN a $$) as (a agtype);

SELECT * FROM cypher('cypher_create', $$ CREATE (a:Part {part_num: '673'}) $$) as (a agtype);
SELECT * FROM cypher('cypher_create', $$ MATCH (a:Part) RETURN a $$) as (a agtype);
END;

--
-- Clean up
--
@@ -143,7 +143,6 @@ static void begin_cypher_create(CustomScanState *node, EState *estate,
if (estate->es_output_cid == 0)
estate->es_output_cid = estate->es_snapshot->curcid;

CommandCounterIncrement();
Increment_Estate_CommandId(estate);
}

@@ -197,55 +196,59 @@ static TupleTableSlot *exec_cypher_create(CustomScanState *node)
EState *estate = css->css.ss.ps.state;
ExprContext *econtext = css->css.ss.ps.ps_ExprContext;
TupleTableSlot *slot;
bool terminal = CYPHER_CLAUSE_IS_TERMINAL(css->flags);
bool used = false;

if (CYPHER_CLAUSE_IS_TERMINAL(css->flags))
{
/*
* If the CREATE clause was the final cypher clause written
* then we aren't returning anything from this result node.
* So the exec_cypher_create function will only be called once.
* Therefore we will process all tuples from the subtree at once.
*/
while(true)
{
//Process the subtree first
Decrement_Estate_CommandId(estate)
slot = ExecProcNode(node->ss.ps.lefttree);
Increment_Estate_CommandId(estate)

if (TupIsNull(slot))
break;

// setup the scantuple that the process_pattern needs
econtext->ecxt_scantuple =
node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple;

process_pattern(css);
}

return NULL;
}
else
/*
* If the CREATE clause was the final cypher clause written then we aren't
* returning anything from this result node. So the exec_cypher_create
* function will only be called once. Therefore we will process all tuples
* from the subtree at once.
*/
do
{
//Process the subtree first
/*Process the subtree first */
Decrement_Estate_CommandId(estate)
slot = ExecProcNode(node->ss.ps.lefttree);
Increment_Estate_CommandId(estate)

/* break when there are no tuples */
if (TupIsNull(slot))
return NULL;

// setup the scantuple that the process_delete_list needs
{
break;
}
/* setup the scantuple that the process_pattern needs */
econtext->ecxt_scantuple =
node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple;

process_pattern(css);

econtext->ecxt_scantuple =
ExecProject(node->ss.ps.lefttree->ps_ProjInfo);

return ExecProject(node->ss.ps.ps_ProjInfo);
/*
* This may not be necessary. If we have an empty pattern, nothing was
* inserted and the current command Id was not used. So, only flag it
* if there is a non empty pattern.
*/
if (list_length(css->pattern) > 0)
{
/* the current command Id has been used */
used = true;
}
} while (terminal);
/*
* If the current command Id wasn't used, nothing was inserted and we're
* done.
*/
if (!used)
{
return NULL;
}
/* update the current command Id */
CommandCounterIncrement();
/* if this was a terminal CREATE just return NULL */
if (terminal)
{
return NULL;
}

econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo);
return ExecProject(node->ss.ps.ps_ProjInfo);
}

static void end_cypher_create(CustomScanState *node)
@@ -651,8 +654,8 @@ static HeapTuple insert_entity_tuple(ResultRelInfo *resultRelInfo,
ExecConstraints(resultRelInfo, elemTupleSlot, estate);

// Insert the tuple normally
heap_insert(resultRelInfo->ri_RelationDesc, tuple, estate->es_output_cid,
0, NULL);
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
GetCurrentCommandId(true), 0, NULL);

// Insert index entries for the tuple
if (resultRelInfo->ri_NumIndices > 0)

0 comments on commit a6fba6d

Please sign in to comment.