Skip to content

Commit

Permalink
Fix writeable CTE assign reader gang to writer slice.
Browse files Browse the repository at this point in the history
For the gang allocation, our current implementation required the first
allocated gang must be the writer gang.
This has several reasons:
- For lock holding, Because of our MPP structure, we assign a LockHolder
  for each segment when executing a query. lockHolder is the gang member that
  should hold and manage locks for this transaction. On the QEs, it should
  normally be the Writer gang member. More details please refer to
  lockHolderProcPtr in lock.c.
- For SharedSnapshot among session's gang processes on a particular segment.
  During initPostgres(), reader QE will try to lookup the shared slot written
  by writer QE. More details please reger to sharedsnapshot.c.

Normally, the writer slice will be assign writer gang first when iterate the
slice table. But this is not true for writable CTE (with only one writer gang).
For below statement:

explain (slicetable) WITH updated AS (update tbl set rank = 6 where id = 5 returning
rank) select * from tbl where rank in (select rank from updated);
                        QUERY PLAN
--------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  Seq Scan on tbl
         Filter: (hashed SubPlan 1)
         SubPlan 1
           ->  Broadcast Motion 1:3  (slice2; segments: 1)
                 ->  Update on tbl
                       ->  Seq Scan on tbl
                             Filter: (id = 5)
 Slice 0: Dispatcher; root 0; parent -1; gang size 0
 Slice 1: Reader; root 0; parent 0; gang size 3
 Slice 2: Primary Writer; root 0; parent 1; gang size 1

If we sill assign writer gang to Slice 1 here, the writer process will execute
on reader gang. So, find the writer slice and assign writer gang first.
  • Loading branch information
JunfengYang committed Nov 25, 2020
1 parent 91eaeac commit f124d98
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 4 deletions.
68 changes: 67 additions & 1 deletion src/backend/executor/execUtils.c
Expand Up @@ -1429,6 +1429,7 @@ sliceRunsOnQE(ExecSlice *slice)
}

/* Forward declarations */
static bool AssignWriterGangFirst(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceIndex);
static void InventorySliceTree(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceIndex);

/*
Expand Down Expand Up @@ -1463,9 +1464,74 @@ AssignGangs(CdbDispatcherState *ds, QueryDesc *queryDesc)
for (int i = 0; i < sliceTable->numSlices; i++)
sliceTable->slices[i].processesMap = NULL;

AssignWriterGangFirst(ds, sliceTable, rootIdx);
InventorySliceTree(ds, sliceTable, rootIdx);
}

/*
* AssignWriterGangFirst() - Try to assign writer gang first.
*
* For the gang allocation, our current implementation required the first
* allocated gang must be the writer gang.
* This has several reasons:
* - For lock holding, Because of our MPP structure, we assign a LockHolder
* for each segment when executing a query. lockHolder is the gang member that
* should hold and manage locks for this transaction. On the QEs, it should
* normally be the Writer gang member. More details please refer to
* lockHolderProcPtr in lock.c.
* - For SharedSnapshot among session's gang processes on a particular segment.
* During initPostgres(), reader QE will try to lookup the shared slot written
* by writer QE. More details please reger to sharedsnapshot.c.
*
* Normally, the writer slice will be assign writer gang first when iterate the
* slice table. But this is not true for writable CTE (with only one writer gang).
* For below statement:
*
* WITH updated AS (update tbl set rank = 6 where id = 5 returning rank)
* select * from tbl where rank in (select rank from updated);
* QUERY PLAN
* ----------------------------------------------------------------------------------------------
* Gather Motion 3:1 (slice1; segments: 3)
* -> Seq Scan on tbl
* Filter: (hashed SubPlan 1)
* SubPlan 1
* -> Broadcast Motion 1:3 (slice2; segments: 1)
* -> Update on tbl
* -> Seq Scan on tbl
* Filter: (id = 5)
* Slice 0: Dispatcher; root 0; parent -1; gang size 0
* Slice 1: Reader; root 0; parent 0; gang size 3
* Slice 2: Primary Writer; root 0; parent 1; gang size 1
*
* If we sill assign writer gang to Slice 1 here, the writer process will execute
* on reader gang. So, find the writer slice and assign writer gang first.
*/
static bool
AssignWriterGangFirst(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceIndex)
{
ExecSlice *slice = &sliceTable->slices[sliceIndex];

if (slice->gangType == GANGTYPE_PRIMARY_WRITER)
{
Assert(slice->primaryGang == NULL);
Assert(slice->segments != NIL);
slice->primaryGang = AllocateGang(ds, slice->gangType, slice->segments);
setupCdbProcessList(slice);
return true;
}
else
{
ListCell *cell;
foreach(cell, slice->children)
{
int childIndex = lfirst_int(cell);
if (AssignWriterGangFirst(ds, sliceTable, childIndex))
return true;
}
}
return false;
}

/*
* Helper for AssignGangs takes a simple inventory of the gangs required
* by a slice tree. Recursive. Closely coupled with AssignGangs. Not
Expand All @@ -1482,7 +1548,7 @@ InventorySliceTree(CdbDispatcherState *ds, SliceTable *sliceTable, int sliceInde
slice->primaryGang = NULL;
slice->primaryProcesses = getCdbProcessesForQD(true);
}
else
else if (!slice->primaryGang)
{
Assert(slice->segments != NIL);
slice->primaryGang = AllocateGang(ds, slice->gangType, slice->segments);
Expand Down
11 changes: 9 additions & 2 deletions src/test/regress/expected/insert.out
Expand Up @@ -576,11 +576,18 @@ alter table mlparted4 drop a;
alter table mlparted4 add a int not null;
alter table mlparted4 set distributed by (a);
alter table mlparted attach partition mlparted4 for values from (1, 30) to (1, 40);
-- this upstream query doesn't work on GPDB at the moment.
with ins (a, b, c) as
(insert into mlparted (b, a) select s.a, 1 from generate_series(2, 39) s(a) returning tableoid::regclass, *)
select a, b, min(c), max(c) from ins group by a, b order by 1;
ERROR: INSERT/UPDATE/DELETE must be executed by a writer segworker group
a | b | min | max
------------+---+-----+-----
mlparted11 | 1 | 2 | 4
mlparted12 | 1 | 5 | 9
mlparted2 | 1 | 10 | 19
mlparted3 | 1 | 20 | 29
mlparted4 | 1 | 30 | 39
(5 rows)

alter table mlparted add c text;
create table mlparted5 (c text, a int not null, b int not null) partition by list (c);
alter table mlparted5 set distributed by (a);
Expand Down
1 change: 0 additions & 1 deletion src/test/regress/sql/insert.sql
Expand Up @@ -373,7 +373,6 @@ alter table mlparted4 drop a;
alter table mlparted4 add a int not null;
alter table mlparted4 set distributed by (a);
alter table mlparted attach partition mlparted4 for values from (1, 30) to (1, 40);
-- this upstream query doesn't work on GPDB at the moment.
with ins (a, b, c) as
(insert into mlparted (b, a) select s.a, 1 from generate_series(2, 39) s(a) returning tableoid::regclass, *)
select a, b, min(c), max(c) from ins group by a, b order by 1;
Expand Down

0 comments on commit f124d98

Please sign in to comment.