Skip to content

Commit

Permalink
Remove parallel safe from partialize_agg
Browse files Browse the repository at this point in the history
Previous PR timescale#4307 mark `partialize_agg` and `finalize_agg` as parallel
safe but this change is leading to incorrect results in some cases.

Those functions are supposed work in parallel but seems is not the case
and it is not evident yet the root cause and how to properly use it in
parallel queries so we decided to revert this change and provide correct
results to users.

Fixes timescale#4922
  • Loading branch information
fabriziomello committed Jan 12, 2023
1 parent f36db10 commit 7544b0a
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 10 deletions.
14 changes: 4 additions & 10 deletions sql/partialize_finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,24 @@
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

-- These wrapper functions are used to push down aggregation to data nodes.
-- They can be marked parallel-safe, and the parallel plan will be chosen
-- depending on whether the underlying aggregate function itself is
-- parallel-safe.

CREATE OR REPLACE FUNCTION _timescaledb_internal.partialize_agg(arg ANYELEMENT)
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C STABLE PARALLEL SAFE;
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_sfunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS internal
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_sfunc'
LANGUAGE C IMMUTABLE PARALLEL SAFE;
LANGUAGE C IMMUTABLE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_ffunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS anyelement
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_ffunc'
LANGUAGE C IMMUTABLE PARALLEL SAFE;
LANGUAGE C IMMUTABLE;

CREATE OR REPLACE AGGREGATE _timescaledb_internal.finalize_agg(agg_name TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val anyelement) (
SFUNC = _timescaledb_internal.finalize_agg_sfunc,
STYPE = internal,
FINALFUNC = _timescaledb_internal.finalize_agg_ffunc,
FINALFUNC_EXTRA,
PARALLEL = SAFE
FINALFUNC_EXTRA
);
293 changes: 293 additions & 0 deletions tsl/test/expected/partialize_finalize.out
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,296 @@ WARNING: type bigint
t
(1 row)

-- Issue 4922
CREATE TABLE issue4922 (
time TIMESTAMPTZ NOT NULL,
value INTEGER
);
SELECT create_hypertable('issue4922', 'time');
create_hypertable
------------------------
(2,public,issue4922,t)
(1 row)

-- helper function: integer -> pseudorandom integer [0..100].
CREATE OR REPLACE FUNCTION mix(x INTEGER) RETURNS INTEGER AS $$ SELECT (((hashint4(x) / (pow(2, 31) - 1) + 1) / 2) * 100)::INTEGER $$ LANGUAGE SQL;
INSERT INTO issue4922 (time, value)
SELECT '2022-01-01 00:00:00-03'::timestamptz + interval '1 year' * mix(x), mix(x)
FROM generate_series(1, 100000) x(x);
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922;
sum | avg | min | max | count
---------+---------------------+-----+-----+--------
5001129 | 50.0112900000000000 | 0 | 100 | 100000
(1 row)

-- The results should be the EQUAL TO the previous query
SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::int4) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::int4) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::int4) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a;
sum | avg | min | max | count
---------+---------------------+-----+-----+--------
5001129 | 50.0112900000000000 | 0 | 100 | 100000
(1 row)

-- Check for parallel planning
EXPLAIN (COSTS OFF)
SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922;
QUERY PLAN
-----------------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 2
-> Partial Aggregate
-> Parallel Append
-> Parallel Seq Scan on _hyper_2_4_chunk
-> Parallel Seq Scan on _hyper_2_5_chunk
-> Parallel Seq Scan on _hyper_2_6_chunk
-> Parallel Seq Scan on _hyper_2_7_chunk
-> Parallel Seq Scan on _hyper_2_8_chunk
-> Parallel Seq Scan on _hyper_2_9_chunk
-> Parallel Seq Scan on _hyper_2_10_chunk
-> Parallel Seq Scan on _hyper_2_11_chunk
-> Parallel Seq Scan on _hyper_2_12_chunk
-> Parallel Seq Scan on _hyper_2_13_chunk
-> Parallel Seq Scan on _hyper_2_14_chunk
-> Parallel Seq Scan on _hyper_2_15_chunk
-> Parallel Seq Scan on _hyper_2_16_chunk
-> Parallel Seq Scan on _hyper_2_17_chunk
-> Parallel Seq Scan on _hyper_2_18_chunk
-> Parallel Seq Scan on _hyper_2_19_chunk
-> Parallel Seq Scan on _hyper_2_20_chunk
-> Parallel Seq Scan on _hyper_2_21_chunk
-> Parallel Seq Scan on _hyper_2_22_chunk
-> Parallel Seq Scan on _hyper_2_23_chunk
-> Parallel Seq Scan on _hyper_2_24_chunk
-> Parallel Seq Scan on _hyper_2_25_chunk
-> Parallel Seq Scan on _hyper_2_26_chunk
-> Parallel Seq Scan on _hyper_2_27_chunk
-> Parallel Seq Scan on _hyper_2_28_chunk
-> Parallel Seq Scan on _hyper_2_29_chunk
-> Parallel Seq Scan on _hyper_2_30_chunk
-> Parallel Seq Scan on _hyper_2_31_chunk
-> Parallel Seq Scan on _hyper_2_32_chunk
-> Parallel Seq Scan on _hyper_2_33_chunk
-> Parallel Seq Scan on _hyper_2_34_chunk
-> Parallel Seq Scan on _hyper_2_35_chunk
-> Parallel Seq Scan on _hyper_2_36_chunk
-> Parallel Seq Scan on _hyper_2_37_chunk
-> Parallel Seq Scan on _hyper_2_38_chunk
-> Parallel Seq Scan on _hyper_2_39_chunk
-> Parallel Seq Scan on _hyper_2_40_chunk
-> Parallel Seq Scan on _hyper_2_41_chunk
-> Parallel Seq Scan on _hyper_2_42_chunk
-> Parallel Seq Scan on _hyper_2_43_chunk
-> Parallel Seq Scan on _hyper_2_44_chunk
-> Parallel Seq Scan on _hyper_2_45_chunk
-> Parallel Seq Scan on _hyper_2_46_chunk
-> Parallel Seq Scan on _hyper_2_47_chunk
-> Parallel Seq Scan on _hyper_2_48_chunk
-> Parallel Seq Scan on _hyper_2_49_chunk
-> Parallel Seq Scan on _hyper_2_50_chunk
-> Parallel Seq Scan on _hyper_2_51_chunk
-> Parallel Seq Scan on _hyper_2_52_chunk
-> Parallel Seq Scan on _hyper_2_53_chunk
-> Parallel Seq Scan on _hyper_2_54_chunk
-> Parallel Seq Scan on _hyper_2_55_chunk
-> Parallel Seq Scan on _hyper_2_56_chunk
-> Parallel Seq Scan on _hyper_2_57_chunk
-> Parallel Seq Scan on _hyper_2_58_chunk
-> Parallel Seq Scan on _hyper_2_59_chunk
-> Parallel Seq Scan on _hyper_2_60_chunk
-> Parallel Seq Scan on _hyper_2_61_chunk
-> Parallel Seq Scan on _hyper_2_62_chunk
-> Parallel Seq Scan on _hyper_2_63_chunk
-> Parallel Seq Scan on _hyper_2_64_chunk
-> Parallel Seq Scan on _hyper_2_65_chunk
-> Parallel Seq Scan on _hyper_2_66_chunk
-> Parallel Seq Scan on _hyper_2_67_chunk
-> Parallel Seq Scan on _hyper_2_68_chunk
-> Parallel Seq Scan on _hyper_2_69_chunk
-> Parallel Seq Scan on _hyper_2_70_chunk
-> Parallel Seq Scan on _hyper_2_71_chunk
-> Parallel Seq Scan on _hyper_2_72_chunk
-> Parallel Seq Scan on _hyper_2_73_chunk
-> Parallel Seq Scan on _hyper_2_74_chunk
-> Parallel Seq Scan on _hyper_2_75_chunk
-> Parallel Seq Scan on _hyper_2_76_chunk
-> Parallel Seq Scan on _hyper_2_77_chunk
-> Parallel Seq Scan on _hyper_2_78_chunk
-> Parallel Seq Scan on _hyper_2_79_chunk
-> Parallel Seq Scan on _hyper_2_80_chunk
-> Parallel Seq Scan on _hyper_2_81_chunk
-> Parallel Seq Scan on _hyper_2_82_chunk
-> Parallel Seq Scan on _hyper_2_83_chunk
-> Parallel Seq Scan on _hyper_2_84_chunk
-> Parallel Seq Scan on _hyper_2_85_chunk
-> Parallel Seq Scan on _hyper_2_86_chunk
-> Parallel Seq Scan on _hyper_2_87_chunk
-> Parallel Seq Scan on _hyper_2_88_chunk
-> Parallel Seq Scan on _hyper_2_89_chunk
-> Parallel Seq Scan on _hyper_2_90_chunk
-> Parallel Seq Scan on _hyper_2_91_chunk
-> Parallel Seq Scan on _hyper_2_92_chunk
-> Parallel Seq Scan on _hyper_2_93_chunk
-> Parallel Seq Scan on _hyper_2_94_chunk
-> Parallel Seq Scan on _hyper_2_95_chunk
-> Parallel Seq Scan on _hyper_2_96_chunk
-> Parallel Seq Scan on _hyper_2_97_chunk
-> Parallel Seq Scan on _hyper_2_98_chunk
-> Parallel Seq Scan on _hyper_2_99_chunk
-> Parallel Seq Scan on _hyper_2_100_chunk
-> Parallel Seq Scan on _hyper_2_101_chunk
-> Parallel Seq Scan on _hyper_2_102_chunk
-> Parallel Seq Scan on _hyper_2_103_chunk
-> Parallel Seq Scan on _hyper_2_104_chunk
(106 rows)

-- Make sure even forcing the parallel mode those functions are not safe for parallel
EXPLAIN (COSTS OFF)
SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::int4) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::int4) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(int4)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::int4) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a;
QUERY PLAN
--------------------------------------------------
Aggregate
-> Partial Aggregate
-> Append
-> Seq Scan on _hyper_2_4_chunk
-> Seq Scan on _hyper_2_5_chunk
-> Seq Scan on _hyper_2_6_chunk
-> Seq Scan on _hyper_2_7_chunk
-> Seq Scan on _hyper_2_8_chunk
-> Seq Scan on _hyper_2_9_chunk
-> Seq Scan on _hyper_2_10_chunk
-> Seq Scan on _hyper_2_11_chunk
-> Seq Scan on _hyper_2_12_chunk
-> Seq Scan on _hyper_2_13_chunk
-> Seq Scan on _hyper_2_14_chunk
-> Seq Scan on _hyper_2_15_chunk
-> Seq Scan on _hyper_2_16_chunk
-> Seq Scan on _hyper_2_17_chunk
-> Seq Scan on _hyper_2_18_chunk
-> Seq Scan on _hyper_2_19_chunk
-> Seq Scan on _hyper_2_20_chunk
-> Seq Scan on _hyper_2_21_chunk
-> Seq Scan on _hyper_2_22_chunk
-> Seq Scan on _hyper_2_23_chunk
-> Seq Scan on _hyper_2_24_chunk
-> Seq Scan on _hyper_2_25_chunk
-> Seq Scan on _hyper_2_26_chunk
-> Seq Scan on _hyper_2_27_chunk
-> Seq Scan on _hyper_2_28_chunk
-> Seq Scan on _hyper_2_29_chunk
-> Seq Scan on _hyper_2_30_chunk
-> Seq Scan on _hyper_2_31_chunk
-> Seq Scan on _hyper_2_32_chunk
-> Seq Scan on _hyper_2_33_chunk
-> Seq Scan on _hyper_2_34_chunk
-> Seq Scan on _hyper_2_35_chunk
-> Seq Scan on _hyper_2_36_chunk
-> Seq Scan on _hyper_2_37_chunk
-> Seq Scan on _hyper_2_38_chunk
-> Seq Scan on _hyper_2_39_chunk
-> Seq Scan on _hyper_2_40_chunk
-> Seq Scan on _hyper_2_41_chunk
-> Seq Scan on _hyper_2_42_chunk
-> Seq Scan on _hyper_2_43_chunk
-> Seq Scan on _hyper_2_44_chunk
-> Seq Scan on _hyper_2_45_chunk
-> Seq Scan on _hyper_2_46_chunk
-> Seq Scan on _hyper_2_47_chunk
-> Seq Scan on _hyper_2_48_chunk
-> Seq Scan on _hyper_2_49_chunk
-> Seq Scan on _hyper_2_50_chunk
-> Seq Scan on _hyper_2_51_chunk
-> Seq Scan on _hyper_2_52_chunk
-> Seq Scan on _hyper_2_53_chunk
-> Seq Scan on _hyper_2_54_chunk
-> Seq Scan on _hyper_2_55_chunk
-> Seq Scan on _hyper_2_56_chunk
-> Seq Scan on _hyper_2_57_chunk
-> Seq Scan on _hyper_2_58_chunk
-> Seq Scan on _hyper_2_59_chunk
-> Seq Scan on _hyper_2_60_chunk
-> Seq Scan on _hyper_2_61_chunk
-> Seq Scan on _hyper_2_62_chunk
-> Seq Scan on _hyper_2_63_chunk
-> Seq Scan on _hyper_2_64_chunk
-> Seq Scan on _hyper_2_65_chunk
-> Seq Scan on _hyper_2_66_chunk
-> Seq Scan on _hyper_2_67_chunk
-> Seq Scan on _hyper_2_68_chunk
-> Seq Scan on _hyper_2_69_chunk
-> Seq Scan on _hyper_2_70_chunk
-> Seq Scan on _hyper_2_71_chunk
-> Seq Scan on _hyper_2_72_chunk
-> Seq Scan on _hyper_2_73_chunk
-> Seq Scan on _hyper_2_74_chunk
-> Seq Scan on _hyper_2_75_chunk
-> Seq Scan on _hyper_2_76_chunk
-> Seq Scan on _hyper_2_77_chunk
-> Seq Scan on _hyper_2_78_chunk
-> Seq Scan on _hyper_2_79_chunk
-> Seq Scan on _hyper_2_80_chunk
-> Seq Scan on _hyper_2_81_chunk
-> Seq Scan on _hyper_2_82_chunk
-> Seq Scan on _hyper_2_83_chunk
-> Seq Scan on _hyper_2_84_chunk
-> Seq Scan on _hyper_2_85_chunk
-> Seq Scan on _hyper_2_86_chunk
-> Seq Scan on _hyper_2_87_chunk
-> Seq Scan on _hyper_2_88_chunk
-> Seq Scan on _hyper_2_89_chunk
-> Seq Scan on _hyper_2_90_chunk
-> Seq Scan on _hyper_2_91_chunk
-> Seq Scan on _hyper_2_92_chunk
-> Seq Scan on _hyper_2_93_chunk
-> Seq Scan on _hyper_2_94_chunk
-> Seq Scan on _hyper_2_95_chunk
-> Seq Scan on _hyper_2_96_chunk
-> Seq Scan on _hyper_2_97_chunk
-> Seq Scan on _hyper_2_98_chunk
-> Seq Scan on _hyper_2_99_chunk
-> Seq Scan on _hyper_2_100_chunk
-> Seq Scan on _hyper_2_101_chunk
-> Seq Scan on _hyper_2_102_chunk
-> Seq Scan on _hyper_2_103_chunk
-> Seq Scan on _hyper_2_104_chunk
(104 rows)

0 comments on commit 7544b0a

Please sign in to comment.