Skip to content

Commit

Permalink
Fix wrong allocation of writer slice in case of modifying shared CTE (#…
Browse files Browse the repository at this point in the history
…660)

Initially, when the CTE was first encountered, the create_ctescan_plan()
function constructed a subplan, and its slice was assigned the
GANGTYPE_PRIMARY_WRITER gang type. However, for subsequent occurrences of the
CTE, the subplan construction was bypassed. If these occurrences were located in
different slices and the CTE involved DML, their slices would sometimes be
assigned the GANGTYPE_PRIMARY_WRITER gang type. This lead to issues when the
ShareInputScans were assigned producer and consumer roles by the
apply_shareinput_dag_to_tree(), which uses a specific tree traversal order, and
in some cases, there was a scenario when the consumer was assigned to a
GANGTYPE_PRIMARY_WRITER slice, while the producer was placed in a
GANGTYPE_PRIMARY_READER slice, resulting in an execution error when the reader
gang attempted to execute a ModifyTable node.

To resolve this issue, the patch introduces a check for the gang type of
root->curSlice when creating the shared plan for the first time. If the gang
type has changed compared to its previous value during planning, the
ShareInputScan is marked as a writer slice maker by setting the
rootSliceIsWriter flag to true. This state is also applied to all subsequent
occurrences of the CTE. After assigning consumer and producer roles to each
ShareInputScan, the correct gang type is set for the producer
(GANGTYPE_PRIMARY_WRITER) and for the consumers (GANGTYPE_PRIMARY_READER, if the
consumer is in a different slice from the producer) within the
shareinput_mutator_xslice_2() function. This ensures that the gang type is
correctly assigned, preventing the execution error encountered in the original
scenario.

Co-authored-by: Alexander Kondakov <a.kondakov@arenadata.io>
  • Loading branch information
trexxet2 and bimboterminator1 committed Jun 6, 2024
1 parent 0f4ebf6 commit 908a851
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 1 deletion.
19 changes: 18 additions & 1 deletion src/backend/cdb/cdbmutate.c
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,7 @@ shareinput_mutator_xslice_2(Node *node, PlannerInfo *root, bool fPop)
ShareInputScan *sisc = (ShareInputScan *) plan;
int motId = shareinput_peekmot(ctxt);
ApplyShareInputContextPerShare *pershare;
PlanSlice *currentSlice = &ctxt->slices[motId];

pershare = &ctxt->shared_inputs[sisc->share_id];

Expand All @@ -993,12 +994,28 @@ shareinput_mutator_xslice_2(Node *node, PlannerInfo *root, bool fPop)
sisc->producer_slice_id = pershare->producer_slice_id;
sisc->nconsumers = bms_num_members(pershare->participant_slices) - 1;

/*
* The SISC's plan contains modifying operation, which
* creates a writer gang. Due to specific tree traverse order
* during apply_shareinput_dag_to_tree, the producer could get
* to the reader slice, while the consumer could get to the
* writer slice (gangType is chosen before shareinput fixing).
* Therefore, in order to prevent this, we set up the correct
* gangType back.
*/
if (sisc->rootSliceIsWriter)
{
if (plan->lefttree)
currentSlice->gangType = GANGTYPE_PRIMARY_WRITER;
else if (sisc->this_slice_id != sisc->producer_slice_id)
currentSlice->gangType = GANGTYPE_PRIMARY_READER;
}

/*
* If this share needs to run in the QD, mark the slice accordingly.
*/
if (bms_is_member(sisc->share_id, ctxt->qdShares))
{
PlanSlice *currentSlice = &ctxt->slices[motId];

switch (currentSlice->gangType)
{
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,7 @@ _copyShareInputScan(const ShareInputScan *from)
COPY_SCALAR_FIELD(producer_slice_id);
COPY_SCALAR_FIELD(this_slice_id);
COPY_SCALAR_FIELD(nconsumers);
COPY_SCALAR_FIELD(rootSliceIsWriter);

return newnode;
}
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,7 @@ _outShareInputScan(StringInfo str, const ShareInputScan *node)
WRITE_INT_FIELD(producer_slice_id);
WRITE_INT_FIELD(this_slice_id);
WRITE_INT_FIELD(nconsumers);
WRITE_BOOL_FIELD(rootSliceIsWriter);

_outPlanInfo(str, (Plan *) node);
}
Expand Down
1 change: 1 addition & 0 deletions src/backend/nodes/readfast.c
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ _readShareInputScan(void)
READ_INT_FIELD(producer_slice_id);
READ_INT_FIELD(this_slice_id);
READ_INT_FIELD(nconsumers);
READ_BOOL_FIELD(rootSliceIsWriter);

ReadCommonPlan(&local_node->scan.plan);

Expand Down
17 changes: 17 additions & 0 deletions src/backend/optimizer/plan/createplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -4410,13 +4410,30 @@ create_ctescan_plan(PlannerInfo *root, Path *best_path,
if (!cteplaninfo->shared_plan)
{
RelOptInfo *sub_final_rel;
GangType saved_gangType = root->curSlice->gangType;

sub_final_rel = fetch_upper_rel(best_path->parent->subroot, UPPERREL_FINAL, NULL);
subplan = create_plan(best_path->parent->subroot, sub_final_rel->cheapest_total_path, root->curSlice);
cteplaninfo->shared_plan = prepare_plan_for_sharing(cteroot, subplan);

/*
* If gangType has switched, it means that CTE's plan contains
* modifying operation without motion above (otherwise, the
* gangType wouldn't switch). In this case we should mark each
* ShareInputScan as writing slice creator, in order to prevent
* the situation, when consumer gets to the writer gang and producer
* gets to the reader gang (it depends of tree traverse order
* inside the apply_shareinput_dag_to_tree function)
*/
if (root->curSlice->gangType != saved_gangType &&
root->curSlice->gangType == GANGTYPE_PRIMARY_WRITER)
cteplaninfo->rootSliceIsWriter = true;
}
/* Wrap the common Plan tree in a ShareInputScan node */
subplan = share_prepared_plan(cteroot, cteplaninfo->shared_plan);

if (cteplaninfo->rootSliceIsWriter)
((ShareInputScan *) subplan)->rootSliceIsWriter = true;
}

scan_plan = (Plan *) make_subqueryscan(tlist,
Expand Down
1 change: 1 addition & 0 deletions src/backend/optimizer/plan/planshare.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ make_shareinputscan(PlannerInfo *root, Plan *inputplan)
sisc->this_slice_id = -1;
sisc->nconsumers = 0;
sisc->discard_output = false;
sisc->rootSliceIsWriter = false;

sisc->scan.plan.qual = NIL;
sisc->scan.plan.righttree = NULL;
Expand Down
9 changes: 9 additions & 0 deletions src/include/nodes/pathnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,15 @@ typedef struct CtePlanInfo
* The subroot corresponding to the subplan.
*/
PlannerInfo *subroot;

/*
* For shared plans only. Indicates that CTE's root slice is writing.
* I.e. in CTE's plan there is a modifying operation without a motion
* above. The flag can't be used before create_ctescan_plan processing of
* the shared plan. This flag is needed for correct gangType allocation
* after consumer's plan is cut.
*/
bool rootSliceIsWriter;
} CtePlanInfo;

/*
Expand Down
6 changes: 6 additions & 0 deletions src/include/nodes/plannodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,12 @@ typedef struct ShareInputScan

/* Discard the scan output? True for ORCA CTE producer, false otherwise. */
bool discard_output;

/*
* Indicates that producer's root slice is writing. I.e. in the shared plan
* there is a modifying operation without a motion above.
*/
bool rootSliceIsWriter;
} ShareInputScan;

/* ----------------
Expand Down
49 changes: 49 additions & 0 deletions src/test/regress/expected/with.out
Original file line number Diff line number Diff line change
Expand Up @@ -2490,3 +2490,52 @@ SELECT * FROM cte UNION ALL SELECT * FROM cte;
(2 rows)

DROP TABLE with_test;
-- Test that planner correctly assigns writer and reader slices in case of
-- shared modifying CTE.
CREATE TABLE with_test (i int, j int) DISTRIBUTED RANDOMLY;
-- start_matchsubs
-- m/segment \d$/
-- s/segment \d$/segment N/
-- end_matchsubs
EXPLAIN (SLICETABLE, COSTS OFF)
WITH cte AS (
INSERT INTO with_test VALUES (1, 2) RETURNING *
)
SELECT i FROM cte a
JOIN cte b USING (i);
QUERY PLAN
------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (a.i = b.i)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: a.i
-> Subquery Scan on a
-> Shared Scan (share slice:id 2:0)
-> Hash
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: b.i
-> Subquery Scan on b
-> Shared Scan (share slice:id 3:0)
-> Insert on with_test
-> Redistribute Motion 1:3 (slice4; segments: 1)
-> Result
Optimizer: Postgres-based planner
Slice 0: Dispatcher; root 0; parent -1; gang size 0
Slice 1: Reader; root 0; parent 0; gang size 3
Slice 2: Reader; root 0; parent 1; gang size 3
Slice 3: Primary Writer; root 0; parent 1; gang size 3
Slice 4: Singleton Reader; root 0; parent 3; gang size 1; segment N
(21 rows)

WITH cte AS (
INSERT INTO with_test VALUES (1, 2) RETURNING *
)
SELECT i FROM cte a
JOIN cte b USING (i);
i
---
1
(1 row)

DROP TABLE with_test;
49 changes: 49 additions & 0 deletions src/test/regress/expected/with_optimizer.out
Original file line number Diff line number Diff line change
Expand Up @@ -2506,3 +2506,52 @@ SELECT * FROM cte UNION ALL SELECT * FROM cte;
(2 rows)

DROP TABLE with_test;
-- Test that planner correctly assigns writer and reader slices in case of
-- shared modifying CTE.
CREATE TABLE with_test (i int, j int) DISTRIBUTED RANDOMLY;
-- start_matchsubs
-- m/segment \d$/
-- s/segment \d$/segment N/
-- end_matchsubs
EXPLAIN (SLICETABLE, COSTS OFF)
WITH cte AS (
INSERT INTO with_test VALUES (1, 2) RETURNING *
)
SELECT i FROM cte a
JOIN cte b USING (i);
QUERY PLAN
------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (a.i = b.i)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: a.i
-> Subquery Scan on a
-> Shared Scan (share slice:id 2:0)
-> Hash
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: b.i
-> Subquery Scan on b
-> Shared Scan (share slice:id 3:0)
-> Insert on with_test
-> Redistribute Motion 1:3 (slice4; segments: 1)
-> Result
Optimizer: Postgres-based planner
Slice 0: Dispatcher; root 0; parent -1; gang size 0
Slice 1: Reader; root 0; parent 0; gang size 3
Slice 2: Reader; root 0; parent 1; gang size 3
Slice 3: Primary Writer; root 0; parent 1; gang size 3
Slice 4: Singleton Reader; root 0; parent 3; gang size 1; segment N
(21 rows)

WITH cte AS (
INSERT INTO with_test VALUES (1, 2) RETURNING *
)
SELECT i FROM cte a
JOIN cte b USING (i);
i
---
1
(1 row)

DROP TABLE with_test;
24 changes: 24 additions & 0 deletions src/test/regress/sql/with.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1203,3 +1203,27 @@ WITH cte AS (
SELECT * FROM cte UNION ALL SELECT * FROM cte;

DROP TABLE with_test;

-- Test that planner correctly assigns writer and reader slices in case of
-- shared modifying CTE.
CREATE TABLE with_test (i int, j int) DISTRIBUTED RANDOMLY;

-- start_matchsubs
-- m/segment \d$/
-- s/segment \d$/segment N/
-- end_matchsubs

EXPLAIN (SLICETABLE, COSTS OFF)
WITH cte AS (
INSERT INTO with_test VALUES (1, 2) RETURNING *
)
SELECT i FROM cte a
JOIN cte b USING (i);

WITH cte AS (
INSERT INTO with_test VALUES (1, 2) RETURNING *
)
SELECT i FROM cte a
JOIN cte b USING (i);

DROP TABLE with_test;

0 comments on commit 908a851

Please sign in to comment.