Skip to content

Commit

Permalink
Ensure Execution of Shared Scan Writer On Squelch [#149182449]
Browse files Browse the repository at this point in the history
SharedInputScan (a.k.a. "Shared Scan" in EXPLAIN) is the operator
through which Greenplum implements Common Table Expression execution. It
executes in two modes: writer (a.k.a. producer) and reader (a.k.a.
consumer). Writers will execute the common table expression definition
and materialize the output, and readers can read the materialized output
(potentially in parallel).

Because of the parallel nature of Greenplum execution, slices containing
Shared Scans need to synchronize among themselves to ensure that readers
don't start until writers are finished writing. Specifically, a slice
with readers depending on writers on a different slice will block during
`ExecutorRun`, before even pulling the first tuple from the executor
tree.

Greenplum's Hash Join implementation will skip executing its outer
("probe side") subtree if it detects an empty inner ("hash side"), and
declare all motions in the skipped subtree as "stopped" (we call this
"squelching"). That means we can potentially squelch a subtree that
contains a shared scan writer, leaving cross-slice readers waiting
forever.

For example, with ORCA enabled, the following query:

```SQL
CREATE TABLE foo (a int, b int);
CREATE TABLE bar (c int, d int);
CREATE TABLE jazz(e int, f int);

INSERT INTO bar  VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO jazz VALUES (2, 2), (3, 3);

ANALYZE foo;
ANALYZE bar;
ANALYZE jazz;

SET statement_timeout = '15s';

SELECT * FROM
        (
        WITH cte AS (SELECT * FROM foo)
        SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
        AS X
        JOIN bar ON b = c
        ) AS XY
        JOIN jazz on c = e AND b = f;
```
leads to a plan that will expose this problem:

```
                                                 QUERY PLAN
------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice2; segments: 3)  (cost=0.00..2155.00 rows=1 width=24)
   ->  Hash Join  (cost=0.00..2155.00 rows=1 width=24)
         Hash Cond: bar.c = jazz.e AND share0_ref2.b = jazz.f AND share0_ref2.b = jazz.e AND bar.c = jazz.f
         ->  Sequence  (cost=0.00..1724.00 rows=1 width=16)
               ->  Shared Scan (share slice:id 2:0)  (cost=0.00..431.00 rows=1 width=1)
                     ->  Materialize  (cost=0.00..431.00 rows=1 width=1)
                           ->  Table Scan on foo  (cost=0.00..431.00 rows=1 width=8)
               ->  Hash Join  (cost=0.00..1293.00 rows=1 width=16)
                     Hash Cond: share0_ref2.b = bar.c
                     ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=8)
                           Hash Key: share0_ref2.b
                           ->  Append  (cost=0.00..862.00 rows=1 width=8)
                                 ->  Shared Scan (share slice:id 1:0)  (cost=0.00..431.00 rows=1 width=8)
                                 ->  Shared Scan (share slice:id 1:0)  (cost=0.00..431.00 rows=1 width=8)
                     ->  Hash  (cost=431.00..431.00 rows=1 width=8)
                           ->  Table Scan on bar  (cost=0.00..431.00 rows=1 width=8)
         ->  Hash  (cost=431.00..431.00 rows=1 width=8)
               ->  Table Scan on jazz  (cost=0.00..431.00 rows=1 width=8)
                     Filter: e = f
 Optimizer status: PQO version 2.39.1
(20 rows)
```
where processes executing slice1 on the segments that have an empty
`jazz` will hang.

We fix this by ensuring we execute the Shared Scan writer even if it's
in the sub tree that we're squelching.

Signed-off-by: Melanie Plageman <mplageman@pivotal.io>

Signed-off-by: Sambitesh Dash <sdash@pivotal.io>
  • Loading branch information
d authored and melanieplageman committed Jul 27, 2017
1 parent d50f429 commit 9fbd2da
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 4 deletions.
27 changes: 24 additions & 3 deletions src/backend/executor/execProcnode.c
Expand Up @@ -1484,15 +1484,36 @@ squelchNodeWalker(PlanState *node,
{
ShareInputScanState* sisc_state = (ShareInputScanState *)node;
ShareType share_type = ((ShareInputScan *)sisc_state->ss.ps.plan)->share_type;
bool isWriter = outerPlanState(sisc_state) != NULL;
bool tuplestoreInitialized = sisc_state->ts_state != NULL;

/*
* If there is a SharedInputScan that is shared within the same slice
* then its subtree may still need to be executed and the motions in the
* subtree cannot yet be stopped. Thus, we short-circuit
* squelchNodeWalker in this case.
*/
if (share_type == SHARE_MATERIAL || share_type == SHARE_SORT)
*
* In squelching a cross-slice SharedInputScan writer, we need to
* ensure we don't block any reader on other slices as a result of
* not materializing the shared plan.
*
* Note that we emphatically can't "fake" an empty tuple store
* and just go ahead waking up the readers because that can
* lead to wrong results. c.f. nodeShareInputScan.c
*/
switch (share_type)
{
return CdbVisit_Skip;
case SHARE_MATERIAL:
case SHARE_SORT:
return CdbVisit_Skip;

case SHARE_MATERIAL_XSLICE:
case SHARE_SORT_XSLICE:
if (isWriter && !tuplestoreInitialized)
ExecProcNode(node);
break;
case SHARE_NOTSHARED:
break;
}
}
else if (IsA(node, MotionState))
Expand Down
32 changes: 32 additions & 0 deletions src/test/regress/expected/shared_scan.out
@@ -0,0 +1,32 @@
--
-- Queries that lead to hanging (not dead lock) when we don't handle synchronization properly in shared scan
--
CREATE SCHEMA shared_scan;
SET search_path = shared_scan;
CREATE TABLE foo (a int, b int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE bar (c int, d int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE jazz(e int, f int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'e' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
INSERT INTO bar VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO jazz VALUES (2, 2), (3, 3);
ANALYZE foo;
ANALYZE bar;
ANALYZE jazz;
SET statement_timeout = '15s';
SELECT * FROM
(
WITH cte AS (SELECT * FROM foo)
SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
AS X
JOIN bar ON b = c
) AS XY
JOIN jazz on c = e AND b = f;
a | b | c | d | e | f
---+---+---+---+---+---
(0 rows)

2 changes: 1 addition & 1 deletion src/test/regress/greenplum_schedule
Expand Up @@ -15,7 +15,7 @@
# hitting max_connections limit on segments.
#

test: gp_metadata variadic_parameters default_parameters function_extensions spi gp_xml pgoptions
test: gp_metadata variadic_parameters default_parameters function_extensions spi gp_xml pgoptions shared_scan

test: leastsquares opr_sanity_gp decode_expr bitmapscan bitmapscan_ao case_gp limit_gp notin percentile join_gp union_gp gpcopy gp_create_table
test: filter gpctas gpdist matrix toast sublink table_functions olap_setup complex opclass_ddl information_schema guc_env_var gp_explain
Expand Down
29 changes: 29 additions & 0 deletions src/test/regress/sql/shared_scan.sql
@@ -0,0 +1,29 @@
--
-- Queries that lead to hanging (not dead lock) when we don't handle synchronization properly in shared scan
--

CREATE SCHEMA shared_scan;

SET search_path = shared_scan;

CREATE TABLE foo (a int, b int);
CREATE TABLE bar (c int, d int);
CREATE TABLE jazz(e int, f int);

INSERT INTO bar VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO jazz VALUES (2, 2), (3, 3);

ANALYZE foo;
ANALYZE bar;
ANALYZE jazz;

SET statement_timeout = '15s';

SELECT * FROM
(
WITH cte AS (SELECT * FROM foo)
SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
AS X
JOIN bar ON b = c
) AS XY
JOIN jazz on c = e AND b = f;

0 comments on commit 9fbd2da

Please sign in to comment.