Skip to content

Commit

Permalink
Remove parallel safe from partialize_agg
Browse files Browse the repository at this point in the history
Previous PR timescale#4307 mark `partialize_agg` and `finalize_agg` as parallel
safe but this change is leading to incorrect results in some cases.

Those functions are supposed work in parallel but seems is not the case
and it is not evident yet the root cause and how to properly use it in
parallel queries so we decided to revert this change and provide correct
results to users.

Fixes timescale#4922
  • Loading branch information
fabriziomello committed Jan 12, 2023
1 parent f36db10 commit 6909733
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 10 deletions.
14 changes: 4 additions & 10 deletions sql/partialize_finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,24 @@
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

-- These wrapper functions are used to push down aggregation to data nodes.
-- They can be marked parallel-safe, and the parallel plan will be chosen
-- depending on whether the underlying aggregate function itself is
-- parallel-safe.

CREATE OR REPLACE FUNCTION _timescaledb_internal.partialize_agg(arg ANYELEMENT)
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C STABLE PARALLEL SAFE;
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_sfunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS internal
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_sfunc'
LANGUAGE C IMMUTABLE PARALLEL SAFE;
LANGUAGE C IMMUTABLE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_ffunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS anyelement
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_ffunc'
LANGUAGE C IMMUTABLE PARALLEL SAFE;
LANGUAGE C IMMUTABLE;

CREATE OR REPLACE AGGREGATE _timescaledb_internal.finalize_agg(agg_name TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val anyelement) (
SFUNC = _timescaledb_internal.finalize_agg_sfunc,
STYPE = internal,
FINALFUNC = _timescaledb_internal.finalize_agg_ffunc,
FINALFUNC_EXTRA,
PARALLEL = SAFE
FINALFUNC_EXTRA
);
197 changes: 197 additions & 0 deletions tsl/test/expected/partialize_finalize.out
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,200 @@ WARNING: type bigint
t
(1 row)

-- Issue 4922
CREATE TABLE issue4922 (
time TIMESTAMPTZ NOT NULL,
value FLOAT4
);
SELECT create_hypertable('issue4922', 'time');
create_hypertable
------------------------
(2,public,issue4922,t)
(1 row)

-- helper function: float -> pseudorandom float [0..1].
CREATE OR REPLACE FUNCTION mix(x FLOAT4) RETURNS FLOAT4 AS $$ SELECT ((hashfloat4(x) / (pow(2., 31) - 1) + 1) / 2)::FLOAT4 $$ LANGUAGE SQL;
INSERT INTO issue4922 (time, value)
SELECT '2022-01-01 00:00:00-03'::timestamptz + interval '1 year' * mix(x), mix(x + 2.) * 10
FROM generate_series(1, 100000) x(x);
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922;
sum | avg | min | max | count
--------+------------------+-------------+---------+--------
498180 | 4.98180054687638 | 3.66662e-05 | 9.99984 | 100000
(1 row)

-- The results should be the EQUAL TO the previous query
SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_sum, NULL::float4) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_avg, NULL::float8) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_min, NULL::float4) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_max, NULL::float4) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a;
sum | avg | min | max | count
--------+------------------+-------------+---------+--------
498180 | 4.98180054687638 | 3.66662e-05 | 9.99984 | 100000
(1 row)

-- Check for parallel planning
EXPLAIN (COSTS OFF)
SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922;
QUERY PLAN
----------------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 2
-> Partial Aggregate
-> Parallel Append
-> Parallel Seq Scan on _hyper_2_16_chunk
-> Parallel Seq Scan on _hyper_2_51_chunk
-> Parallel Seq Scan on _hyper_2_5_chunk
-> Parallel Seq Scan on _hyper_2_6_chunk
-> Parallel Seq Scan on _hyper_2_7_chunk
-> Parallel Seq Scan on _hyper_2_8_chunk
-> Parallel Seq Scan on _hyper_2_9_chunk
-> Parallel Seq Scan on _hyper_2_10_chunk
-> Parallel Seq Scan on _hyper_2_11_chunk
-> Parallel Seq Scan on _hyper_2_12_chunk
-> Parallel Seq Scan on _hyper_2_13_chunk
-> Parallel Seq Scan on _hyper_2_14_chunk
-> Parallel Seq Scan on _hyper_2_15_chunk
-> Parallel Seq Scan on _hyper_2_18_chunk
-> Parallel Seq Scan on _hyper_2_20_chunk
-> Parallel Seq Scan on _hyper_2_21_chunk
-> Parallel Seq Scan on _hyper_2_22_chunk
-> Parallel Seq Scan on _hyper_2_23_chunk
-> Parallel Seq Scan on _hyper_2_24_chunk
-> Parallel Seq Scan on _hyper_2_25_chunk
-> Parallel Seq Scan on _hyper_2_26_chunk
-> Parallel Seq Scan on _hyper_2_28_chunk
-> Parallel Seq Scan on _hyper_2_30_chunk
-> Parallel Seq Scan on _hyper_2_31_chunk
-> Parallel Seq Scan on _hyper_2_32_chunk
-> Parallel Seq Scan on _hyper_2_33_chunk
-> Parallel Seq Scan on _hyper_2_34_chunk
-> Parallel Seq Scan on _hyper_2_35_chunk
-> Parallel Seq Scan on _hyper_2_36_chunk
-> Parallel Seq Scan on _hyper_2_38_chunk
-> Parallel Seq Scan on _hyper_2_41_chunk
-> Parallel Seq Scan on _hyper_2_42_chunk
-> Parallel Seq Scan on _hyper_2_43_chunk
-> Parallel Seq Scan on _hyper_2_44_chunk
-> Parallel Seq Scan on _hyper_2_45_chunk
-> Parallel Seq Scan on _hyper_2_46_chunk
-> Parallel Seq Scan on _hyper_2_47_chunk
-> Parallel Seq Scan on _hyper_2_48_chunk
-> Parallel Seq Scan on _hyper_2_49_chunk
-> Parallel Seq Scan on _hyper_2_50_chunk
-> Parallel Seq Scan on _hyper_2_53_chunk
-> Parallel Seq Scan on _hyper_2_54_chunk
-> Parallel Seq Scan on _hyper_2_55_chunk
-> Parallel Seq Scan on _hyper_2_4_chunk
-> Parallel Seq Scan on _hyper_2_17_chunk
-> Parallel Seq Scan on _hyper_2_19_chunk
-> Parallel Seq Scan on _hyper_2_27_chunk
-> Parallel Seq Scan on _hyper_2_29_chunk
-> Parallel Seq Scan on _hyper_2_37_chunk
-> Parallel Seq Scan on _hyper_2_39_chunk
-> Parallel Seq Scan on _hyper_2_40_chunk
-> Parallel Seq Scan on _hyper_2_52_chunk
-> Parallel Seq Scan on _hyper_2_56_chunk
(58 rows)

-- Make sure even forcing the parallel mode those functions are not safe for parallel
EXPLAIN (COSTS OFF)
SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a;
QUERY PLAN
-------------------------------------------------
Aggregate
-> Partial Aggregate
-> Append
-> Seq Scan on _hyper_2_4_chunk
-> Seq Scan on _hyper_2_5_chunk
-> Seq Scan on _hyper_2_6_chunk
-> Seq Scan on _hyper_2_7_chunk
-> Seq Scan on _hyper_2_8_chunk
-> Seq Scan on _hyper_2_9_chunk
-> Seq Scan on _hyper_2_10_chunk
-> Seq Scan on _hyper_2_11_chunk
-> Seq Scan on _hyper_2_12_chunk
-> Seq Scan on _hyper_2_13_chunk
-> Seq Scan on _hyper_2_14_chunk
-> Seq Scan on _hyper_2_15_chunk
-> Seq Scan on _hyper_2_16_chunk
-> Seq Scan on _hyper_2_17_chunk
-> Seq Scan on _hyper_2_18_chunk
-> Seq Scan on _hyper_2_19_chunk
-> Seq Scan on _hyper_2_20_chunk
-> Seq Scan on _hyper_2_21_chunk
-> Seq Scan on _hyper_2_22_chunk
-> Seq Scan on _hyper_2_23_chunk
-> Seq Scan on _hyper_2_24_chunk
-> Seq Scan on _hyper_2_25_chunk
-> Seq Scan on _hyper_2_26_chunk
-> Seq Scan on _hyper_2_27_chunk
-> Seq Scan on _hyper_2_28_chunk
-> Seq Scan on _hyper_2_29_chunk
-> Seq Scan on _hyper_2_30_chunk
-> Seq Scan on _hyper_2_31_chunk
-> Seq Scan on _hyper_2_32_chunk
-> Seq Scan on _hyper_2_33_chunk
-> Seq Scan on _hyper_2_34_chunk
-> Seq Scan on _hyper_2_35_chunk
-> Seq Scan on _hyper_2_36_chunk
-> Seq Scan on _hyper_2_37_chunk
-> Seq Scan on _hyper_2_38_chunk
-> Seq Scan on _hyper_2_39_chunk
-> Seq Scan on _hyper_2_40_chunk
-> Seq Scan on _hyper_2_41_chunk
-> Seq Scan on _hyper_2_42_chunk
-> Seq Scan on _hyper_2_43_chunk
-> Seq Scan on _hyper_2_44_chunk
-> Seq Scan on _hyper_2_45_chunk
-> Seq Scan on _hyper_2_46_chunk
-> Seq Scan on _hyper_2_47_chunk
-> Seq Scan on _hyper_2_48_chunk
-> Seq Scan on _hyper_2_49_chunk
-> Seq Scan on _hyper_2_50_chunk
-> Seq Scan on _hyper_2_51_chunk
-> Seq Scan on _hyper_2_52_chunk
-> Seq Scan on _hyper_2_53_chunk
-> Seq Scan on _hyper_2_54_chunk
-> Seq Scan on _hyper_2_55_chunk
-> Seq Scan on _hyper_2_56_chunk
(56 rows)

70 changes: 70 additions & 0 deletions tsl/test/sql/partialize_finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,73 @@ select _timescaledb_internal.finalize_agg( 'aggregate_to_test_ffunc_extra(int, a

with cte as (SELECT _timescaledb_internal.partialize_agg(aggregate_to_test_ffunc_extra(8, 1::bigint)) as part)
select _timescaledb_internal.finalize_agg( 'aggregate_to_test_ffunc_extra(int, anyelement)', null, null, array[array['pg_catalog'::name, 'int4'::name], array['pg_catalog', 'int8']], part, null::text) is null from cte;


-- Issue 4922
CREATE TABLE issue4922 (
time TIMESTAMPTZ NOT NULL,
value FLOAT4
);

SELECT create_hypertable('issue4922', 'time');

-- helper function: float -> pseudorandom float [0..1].
CREATE OR REPLACE FUNCTION mix(x FLOAT4) RETURNS FLOAT4 AS $$ SELECT ((hashfloat4(x) / (pow(2., 31) - 1) + 1) / 2)::FLOAT4 $$ LANGUAGE SQL;

INSERT INTO issue4922 (time, value)
SELECT '2022-01-01 00:00:00-03'::timestamptz + interval '1 year' * mix(x), mix(x + 2.) * 10
FROM generate_series(1, 100000) x(x);

SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;

SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922;

-- The results should be the EQUAL TO the previous query
SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_sum, NULL::float4) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_avg, NULL::float8) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_min, NULL::float4) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(float4)'::text, NULL::name, NULL::name, '{{pg_catalog,float4}}'::name[], partial_max, NULL::float4) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a;

-- Check for parallel planning
EXPLAIN (COSTS OFF)
SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922;

-- Make sure even forcing the parallel mode those functions are not safe for parallel
EXPLAIN (COSTS OFF)
SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a;

0 comments on commit 6909733

Please sign in to comment.