digoal
2023-01-25
PostgreSQL , PolarDB , string_agg , array_agg , 并行
string_agg, array_agg支持combine, serial and deserial functions, 使得这两个聚合函数支持了并行(包括分区情况下和非分区情况的并行操作).
这两个函数常用于分析场景.
Allow parallel aggregate on string_agg and array_agg
author David Rowley <drowley@postgresql.org>
Mon, 23 Jan 2023 04:35:01 +0000 (17:35 +1300)
committer David Rowley <drowley@postgresql.org>
Mon, 23 Jan 2023 04:35:01 +0000 (17:35 +1300)
commit 16fd03e956540d1b47b743f6a84f37c54ac93dd4
tree 5d4e04184fcc5e119b92d48529b60bc160f99633 tree
parent 5a3a95385bd5a8f1a4fd50545b7efe9338581899 commit | diff
Allow parallel aggregate on string_agg and array_agg
This adds combine, serial and deserial functions for the array_agg() and
string_agg() aggregate functions, thus allowing these aggregates to
partake in partial aggregations. This allows both parallel aggregation to
take place when these aggregates are present and also allows additional
partition-wise aggregation plan shapes to include plans that require
additional aggregation once the partially aggregated results from the
partitions have been combined.
Author: David Rowley
Reviewed-by: Andres Freund, Tomas Vondra, Stephen Frost, Tom Lane
Discussion: https://postgr.es/m/CAKJS1f9sx_6GTcvd6TMuZnNtCh0VhBzhX6FZqw17TgVFH-ga_A@mail.gmail.com
+-- Test parallel string_agg and array_agg
+create table pagg_test (x int, y int);
+insert into pagg_test
+select (case x % 4 when 1 then null else x end), x % 10
+from generate_series(1,5000) x;
+set parallel_setup_cost TO 0;
+set parallel_tuple_cost TO 0;
+set parallel_leader_participation TO 0;
+set min_parallel_table_scan_size = 0;
+set bytea_output = 'escape';
+-- create a view as we otherwise have to repeat this query a few times.
+create view v_pagg_test AS
+select
+ y,
+ min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
+ min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
+ min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
+ min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
+from (
+ select
+ y,
+ unnest(regexp_split_to_array(a1.t, ','))::int AS t,
+ unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
+ unnest(a1.a) AS a,
+ unnest(a1.aa) AS aa
+ from (
+ select
+ y,
+ string_agg(x::text, ',') AS t,
+ string_agg(x::text::bytea, ',') AS b,
+ array_agg(x) AS a,
+ array_agg(ARRAY[x]) AS aa
+ from pagg_test
+ group by y
+ ) a1
+) a2
+group by y;
+-- Ensure results are correct.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500
+ 1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250
+ 2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500
+ 3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250
+ 4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500
+ 5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250
+ 6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500
+ 7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250
+ 8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500
+ 9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250
+(10 rows)
+
+-- Ensure parallel aggregation is actually being used.
+explain (costs off) select * from v_pagg_test order by y;
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------------
+ GroupAggregate
+ Group Key: pagg_test.y
+ -> Sort
+ Sort Key: pagg_test.y, (((unnest(regexp_split_to_array((string_agg((pagg_test.x)::text, ','::text)), ','::text))))::integer)
+ -> Result
+ -> ProjectSet
+ -> Finalize HashAggregate
+ Group Key: pagg_test.y
+ -> Gather
+ Workers Planned: 2
+ -> Partial HashAggregate
+ Group Key: pagg_test.y
+ -> Parallel Seq Scan on pagg_test
+(13 rows)
+
+set max_parallel_workers_per_gather = 0;
+-- Ensure results are the same without parallel aggregation.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 | 10 | 5000 | 500 | 10 | 990 | 500 | 10 | 5000 | 500 | 10 | 5000 | 500
+ 1 | 11 | 4991 | 250 | 1011 | 991 | 250 | 11 | 4991 | 250 | 11 | 4991 | 250
+ 2 | 2 | 4992 | 500 | 1002 | 992 | 500 | 2 | 4992 | 500 | 2 | 4992 | 500
+ 3 | 3 | 4983 | 250 | 1003 | 983 | 250 | 3 | 4983 | 250 | 3 | 4983 | 250
+ 4 | 4 | 4994 | 500 | 1004 | 994 | 500 | 4 | 4994 | 500 | 4 | 4994 | 500
+ 5 | 15 | 4995 | 250 | 1015 | 995 | 250 | 15 | 4995 | 250 | 15 | 4995 | 250
+ 6 | 6 | 4996 | 500 | 1006 | 996 | 500 | 6 | 4996 | 500 | 6 | 4996 | 500
+ 7 | 7 | 4987 | 250 | 1007 | 987 | 250 | 7 | 4987 | 250 | 7 | 4987 | 250
+ 8 | 8 | 4998 | 500 | 1008 | 998 | 500 | 8 | 4998 | 500 | 8 | 4998 | 500
+ 9 | 19 | 4999 | 250 | 1019 | 999 | 250 | 19 | 4999 | 250 | 19 | 4999 | 250
+(10 rows)
+
+-- Clean up
+reset max_parallel_workers_per_gather;
+reset bytea_output;
+reset min_parallel_table_scan_size;
+reset parallel_leader_participation;
+reset parallel_tuple_cost;
+reset parallel_setup_cost;
+drop view v_pagg_test;
+drop table pagg_test;