Skip to content

Commit

Permalink
Remove parallel safe from partialize_agg
Browse files Browse the repository at this point in the history
Previous PR timescale#4307 mark `partialize_agg` and `finalize_agg` as parallel
safe but this change is leading to incorrect results in some cases.

Those functions are supposed work in parallel but seems is not the case
and it is not evident yet the root cause and how to properly use it in
parallel queries so we decided to revert this change and provide correct
results to users.

Fixes timescale#4922
  • Loading branch information
fabriziomello committed Jan 11, 2023
1 parent 396bc6d commit 8de4184
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 5 deletions.
9 changes: 4 additions & 5 deletions sql/partialize_finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,23 @@
-- parallel-safe.

CREATE OR REPLACE FUNCTION _timescaledb_internal.partialize_agg(arg ANYELEMENT)
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C STABLE PARALLEL SAFE;
RETURNS BYTEA AS '@MODULE_PATHNAME@', 'ts_partialize_agg' LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_sfunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS internal
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_sfunc'
LANGUAGE C IMMUTABLE PARALLEL SAFE;
LANGUAGE C IMMUTABLE;

CREATE OR REPLACE FUNCTION _timescaledb_internal.finalize_agg_ffunc(
tstate internal, aggfn TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val ANYELEMENT)
RETURNS anyelement
AS '@MODULE_PATHNAME@', 'ts_finalize_agg_ffunc'
LANGUAGE C IMMUTABLE PARALLEL SAFE;
LANGUAGE C IMMUTABLE;

CREATE OR REPLACE AGGREGATE _timescaledb_internal.finalize_agg(agg_name TEXT, inner_agg_collation_schema NAME, inner_agg_collation_name NAME, inner_agg_input_types NAME[][], inner_agg_serialized_state BYTEA, return_type_dummy_val anyelement) (
SFUNC = _timescaledb_internal.finalize_agg_sfunc,
STYPE = internal,
FINALFUNC = _timescaledb_internal.finalize_agg_ffunc,
FINALFUNC_EXTRA,
PARALLEL = SAFE
FINALFUNC_EXTRA
);
43 changes: 43 additions & 0 deletions tsl/test/expected/partialize_finalize.out
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,46 @@ WARNING: type bigint
t
(1 row)

-- Issue 4922
CREATE TABLE issue4922 (
time TIMESTAMPTZ NOT NULL,
value INTEGER
);
SELECT create_hypertable('issue4922', 'time');
create_hypertable
------------------------
(2,public,issue4922,t)
(1 row)

INSERT INTO issue4922 (time, value)
SELECT time, (random()*30) as value
FROM generate_series(now() - interval '100 hours', now(), '1 sec') AS time;
SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;
-- Compare the results (should return NO ROWS)
(SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922)
EXCEPT
(SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a);
sum | avg | min | max | count
-----+-----+-----+-----+-------
(0 rows)

41 changes: 41 additions & 0 deletions tsl/test/sql/partialize_finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,44 @@ select _timescaledb_internal.finalize_agg( 'aggregate_to_test_ffunc_extra(int, a

with cte as (SELECT _timescaledb_internal.partialize_agg(aggregate_to_test_ffunc_extra(8, 1::bigint)) as part)
select _timescaledb_internal.finalize_agg( 'aggregate_to_test_ffunc_extra(int, anyelement)', null, null, array[array['pg_catalog'::name, 'int4'::name], array['pg_catalog', 'int8']], part, null::text) is null from cte;


-- Issue 4922
CREATE TABLE issue4922 (
time TIMESTAMPTZ NOT NULL,
value INTEGER
);

SELECT create_hypertable('issue4922', 'time');

INSERT INTO issue4922 (time, value)
SELECT time, (random()*30) as value
FROM generate_series(now() - interval '100 hours', now(), '1 sec') AS time;

SET force_parallel_mode = 'on';
SET parallel_setup_cost = 0;

-- Compare the results (should return NO ROWS)
(SELECT
sum(value),
avg(value),
min(value),
max(value),
count(*)
FROM issue4922)
EXCEPT
(SELECT
_timescaledb_internal.finalize_agg('pg_catalog.sum(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_sum, NULL::bigint) AS sum,
_timescaledb_internal.finalize_agg('pg_catalog.avg(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_avg, NULL::numeric) AS avg,
_timescaledb_internal.finalize_agg('pg_catalog.min(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_min, NULL::integer) AS min,
_timescaledb_internal.finalize_agg('pg_catalog.max(integer)'::text, NULL::name, NULL::name, '{{pg_catalog,int4}}'::name[], partial_max, NULL::integer) AS max,
_timescaledb_internal.finalize_agg('pg_catalog.count()'::text, NULL::name, NULL::name, '{}'::name[], partial_count, NULL::bigint) AS count
FROM (
SELECT
_timescaledb_internal.partialize_agg(sum(value)) AS partial_sum,
_timescaledb_internal.partialize_agg(avg(value)) AS partial_avg,
_timescaledb_internal.partialize_agg(min(value)) AS partial_min,
_timescaledb_internal.partialize_agg(max(value)) AS partial_max,
_timescaledb_internal.partialize_agg(count(*)) AS partial_count
FROM public.issue4922) AS a);

0 comments on commit 8de4184

Please sign in to comment.