Skip to content

Commit

Permalink
Pass join related structs to the cagg rte
Browse files Browse the repository at this point in the history
In case of joins in the continuous aggregates, pass the required
structs to the new rte created. These values are required by the
planner to finally query the materialized view.

Fixes timescale#5433
  • Loading branch information
RafiaSabih committed Mar 30, 2023
1 parent ff5959f commit f9d759f
Show file tree
Hide file tree
Showing 19 changed files with 826 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/update-test.yaml
Expand Up @@ -71,7 +71,7 @@ jobs:

- name: Downgrade tests ${{ matrix.pg }}
env:
TEST_VERSION: v7
TEST_VERSION: v9
run: |
DOWNGRADE_TO=$(grep '^downgrade_to_version = ' version.config | sed -e 's!^[^=]\+ = !!')
PG_MAJOR=$(echo "${{ matrix.pg }}" | sed -e 's![.].*!!')
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -29,12 +29,14 @@ accidentally triggering the load of a previous DB version.**
* #5499 Do not segfault on large histogram() parameters
* #5497 Allow named time_bucket arguments in Cagg definition
* #5500 Fix when no FROM clause in continuous aggregate definition
* #5433 Fix join rte in CAggs with joins

**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher
* @S-imo-n for reporting the issue on Background Worker Scheduler crash
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @geezhu for reporting issue on segfault in historgram()
* @mwahlthuetter for reporting the issue with joins in CAggs

## 2.10.1 (2023-03-07)

Expand Down
1 change: 1 addition & 0 deletions scripts/test_updates_pg12.sh
Expand Up @@ -20,3 +20,4 @@ run_tests "$@" -v7 \
run_tests "$@" -v8 \
2.5.0-pg12 2.5.1-pg12 2.5.2-pg12 2.6.0-pg12 2.6.1-pg12 2.7.0-pg12 2.7.1-pg12 2.7.2-pg12 \
2.8.0-pg12 2.8.1-pg12 2.9.0-pg12 2.9.1-pg12 2.9.2-pg12 2.9.3-pg12 2.10.0-pg12 2.10.1-pg12

2 changes: 2 additions & 0 deletions scripts/test_updates_pg13.sh
Expand Up @@ -13,4 +13,6 @@ run_tests "$@" -v7 \
run_tests "$@" -v8 \
2.5.0-pg13 2.5.1-pg13 2.5.2-pg13 2.6.0-pg13 2.6.1-pg13 2.7.0-pg13 2.7.1-pg13 2.7.2-pg13 \
2.8.0-pg13 2.8.1-pg13 2.9.0-pg13 2.9.1-pg13 2.9.2-pg13 2.9.3-pg13 2.10.0-pg13 2.10.1-pg13
run_tests "$@" -v9 \
2.10.0-pg13 2.10.1-pg13

3 changes: 2 additions & 1 deletion scripts/test_updates_pg14.sh
Expand Up @@ -12,4 +12,5 @@ run_tests "$@" -v7 \
run_tests "$@" -v8 \
2.5.0-pg14 2.5.1-pg14 2.5.2-pg14 2.6.0-pg14 2.6.1-pg14 2.7.0-pg14 2.7.1-pg14 2.7.2-pg14 \
2.8.0-pg14 2.8.1-pg14 2.9.0-pg14 2.9.1-pg14 2.9.2-pg14 2.9.3-pg14 2.10.0-pg14 2.10.1-pg14

run_tests "$@" -v9 \
2.10.0-pg14 2.10.1-pg14
2 changes: 2 additions & 0 deletions scripts/test_updates_pg15.sh
Expand Up @@ -9,3 +9,5 @@ source ${SCRIPT_DIR}/test_functions.inc

run_tests "$@" -v8 \
2.9.0-pg15 2.9.1-pg15 2.9.2-pg15 2.9.3-pg15 2.10.0-pg15 2.10.1-pg15
run_tests "$@" -v9 \
2.10.0-pg15 2.10.1-pg15
28 changes: 26 additions & 2 deletions sql/updates/post-update.sql
Expand Up @@ -8,7 +8,7 @@ BEGIN
SELECT extversion INTO ts_version FROM pg_extension WHERE extname = 'timescaledb';
IF ts_version >= '2.7.0' THEN
CREATE PROCEDURE _timescaledb_internal.post_update_cagg_try_repair(
cagg_view REGCLASS
cagg_view REGCLASS, force_rebuild boolean
) AS '@MODULE_PATHNAME@', 'ts_cagg_try_repair' LANGUAGE C;
END IF;
FOR vname, materialized_only IN select format('%I.%I', cagg.user_view_schema, cagg.user_view_name)::regclass, cagg.materialized_only from _timescaledb_catalog.continuous_agg cagg
Expand All @@ -26,7 +26,7 @@ BEGIN
EXECUTE format('ALTER MATERIALIZED VIEW %s SET (timescaledb.materialized_only=%L) ', vname::text, materialized_only);
ELSE
SET log_error_verbosity TO VERBOSE;
CALL _timescaledb_internal.post_update_cagg_try_repair(vname);
CALL _timescaledb_internal.post_update_cagg_try_repair(vname, false);
END IF;
END LOOP;
IF ts_version >= '2.7.0' THEN
Expand All @@ -36,6 +36,30 @@ BEGIN
END
$$;

--Check for correct upgrade for the caggs with joins
DO $$
DECLARE
vname regclass;
ts_version TEXT;
BEGIN
SELECT extversion INTO ts_version FROM pg_extension WHERE extname = 'timescaledb';
IF ts_version >= '2.10.0' THEN
CREATE PROCEDURE _timescaledb_internal.post_update_cagg_try_repair(
cagg_view REGCLASS, force_rebuild boolean
) AS '@MODULE_PATHNAME@', 'ts_cagg_try_repair' LANGUAGE C;
FOR vname IN select format('%I.%I', cagg.user_view_schema, cagg.user_view_name)::regclass from _timescaledb_catalog.continuous_agg cagg
LOOP
SET log_error_verbosity TO VERBOSE;
CALL _timescaledb_internal.post_update_cagg_try_repair(vname, true);
END LOOP;
END IF;
IF ts_version >= '2.10.0' THEN
DROP PROCEDURE IF EXISTS _timescaledb_internal.post_update_cagg_try_repair;
END IF;
EXCEPTION WHEN OTHERS THEN RAISE;
END
$$;

-- can only be dropped after views have been rebuilt
DROP FUNCTION IF EXISTS _timescaledb_internal.cagg_watermark(oid);

Expand Down
9 changes: 9 additions & 0 deletions test/sql/updates/cleanup.continuous_aggs.v9.sql
@@ -0,0 +1,9 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

DROP MATERIALIZED VIEW cagg_joins_upgrade_test_with_realtime;
DROP MATERIALIZED VIEW cagg_joins_upgrade_test;

DROP TABLE ht_cagg_joins CASCADE;
DROP TABLE nt_cagg_joins CASCADE;
6 changes: 6 additions & 0 deletions test/sql/updates/cleanup.v9.sql
@@ -0,0 +1,6 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

\ir cleanup.v7.sql
\ir cleanup.continuous_aggs.v9.sql
7 changes: 7 additions & 0 deletions test/sql/updates/post.continuous_aggs.v9.sql
@@ -0,0 +1,7 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

SELECT * FROM cagg_joins_upgrade_test_with_realtime ORDER BY bucket;
SELECT * FROM cagg_joins_upgrade_test ORDER BY bucket;

5 changes: 5 additions & 0 deletions test/sql/updates/post.v9.sql
@@ -0,0 +1,5 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

--\ir post.v8.sql
53 changes: 53 additions & 0 deletions test/sql/updates/setup.continuous_aggs.v9.sql
@@ -0,0 +1,53 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

CREATE TABLE ht_cagg_joins(
day DATE NOT NULL,
city text NOT NULL,
temperature INT NOT NULL,
device_id int NOT NULL);
SELECT create_hypertable(
'ht_cagg_joins', 'day',
chunk_time_interval => INTERVAL '1 day'
);
INSERT INTO ht_cagg_joins (day, city, temperature, device_id) VALUES
('2021-06-14', 'Moscow', 26,1),
('2021-06-15', 'Moscow', 22,2),
('2021-06-16', 'Moscow', 24,3),
('2021-06-17', 'Moscow', 24,4),
('2021-06-18', 'Moscow', 27,4),
('2021-06-19', 'Moscow', 28,4),
('2021-06-20', 'Moscow', 30,1),
('2021-06-21', 'Moscow', 31,1),
('2021-06-22', 'Moscow', 34,1),
('2021-06-23', 'Moscow', 34,2),
('2021-06-24', 'Moscow', 34,2),
('2021-06-25', 'Moscow', 32,3),
('2021-06-26', 'Moscow', 32,3),
('2021-06-27', 'Moscow', 31,3);

CREATE TABLE nt_cagg_joins ( device_id int not null, name text, location text);
INSERT INTO nt_cagg_joins values (1, 'thermo_1', 'Moscow'), (2, 'thermo_2', 'Berlin'),(3, 'thermo_3', 'London'),(4, 'thermo_4', 'Stockholm');

--Create a cagg with join between a hypertable and a normal table
-- with equality condition on inner join type and realtime aggregation enabled
CREATE MATERIALIZED VIEW cagg_joins_upgrade_test_with_realtime
WITH (timescaledb.continuous, timescaledb.materialized_only = FALSE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
AVG(temperature),
name
FROM ht_cagg_joins JOIN nt_cagg_joins
ON ht_cagg_joins.device_id = nt_cagg_joins.device_id
GROUP BY 1,3;

--Create a cagg with join between a hypertable and a normal table
-- with equality condition on inner join type and realtime aggregation disabled
CREATE MATERIALIZED VIEW cagg_joins_upgrade_test
WITH (timescaledb.continuous, timescaledb.materialized_only = TRUE) AS
SELECT time_bucket(INTERVAL '1 day', day) AS bucket,
AVG(temperature),
name
FROM ht_cagg_joins JOIN nt_cagg_joins
ON ht_cagg_joins.device_id = nt_cagg_joins.device_id
GROUP BY 1,3;
6 changes: 6 additions & 0 deletions test/sql/updates/setup.v9.sql
@@ -0,0 +1,6 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.

\ir setup.v8.sql
\ir setup.continuous_aggs.v9.sql
91 changes: 69 additions & 22 deletions tsl/src/continuous_aggs/create.c
Expand Up @@ -2509,33 +2509,57 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
ListCell *l;
foreach (l, inp->final_userquery->jointree->fromlist)
{
/*
* In case of joins, update the rte with all the join related struct.
*/
Node *jtnode = (Node *) lfirst(l);
JoinExpr *join = NULL;
if (IsA(jtnode, JoinExpr))
{
join = castNode(JoinExpr, jtnode);
RangeTblEntry *jrte = rt_fetch(join->rtindex, inp->final_userquery->rtable);
rte->joinaliasvars = jrte->joinaliasvars;
rte->eref = copyObject(jrte->eref);
#if PG13_GE
rte->joinleftcols = jrte->joinleftcols;
rte->joinrightcols = jrte->joinrightcols;
#endif
#if PG14_GE
rte->join_using_alias = jrte->join_using_alias;
#endif
rte->selectedCols = jrte->selectedCols;
}
}
}
else
{
rte = llast_node(RangeTblEntry, inp->final_userquery->rtable);
rte->eref->colnames = NIL;
rte->selectedCols = NULL;
}
if (rte->eref->colnames == NIL)
{
/*
* We only need to do this for the case when there is no Join node in the query.
* In the case of join, rte->eref is already populated by jrte->eref and hence the
* relevant info, so need not to do this.
*/

/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = (ColumnDef *) lfirst(lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
rte->selectedCols = bms_add_member(rte->selectedCols,
list_length(rte->eref->colnames) -
FirstLowInvalidHeapAttributeNumber);
}
}
rte->relid = mattbladdress->objectId;
rte->rtekind = RTE_RELATION;
rte->relkind = RELKIND_RELATION;
rte->tablesample = NULL;
rte->eref->colnames = NIL;
rte->selectedCols = NULL;
/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = (ColumnDef *) lfirst(lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
rte->selectedCols =
bms_add_member(rte->selectedCols,
list_length(rte->eref->colnames) - FirstLowInvalidHeapAttributeNumber);
}

rte->requiredPerms |= ACL_SELECT;
rte->insertedCols = NULL;
rte->updatedCols = NULL;
Expand All @@ -2544,7 +2568,7 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
foreach (lc, inp->final_seltlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (IsA(tle->expr, Var))
if (IsA(tle->expr, Var) && tle->resorigtbl == 0)
{
tle->resorigtbl = rte->relid;
tle->resorigcol = ((Var *) tle->expr)->varattno;
Expand All @@ -2553,7 +2577,9 @@ finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,

CAGG_MAKEQUERY(final_selquery, inp->final_userquery);
final_selquery->hasAggs = !inp->finalized;
if (list_length(inp->final_userquery->jointree->fromlist) >= CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
RangeTblRef *rtr;
final_selquery->rtable = list_make1(rte);
Expand Down Expand Up @@ -2971,7 +2997,7 @@ remove_old_and_new_rte_from_query(Query *query)
* for errors and attempt to rebuild it if required.
*/
static void
cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht, bool force_rebuild)
{
bool test_failed = false;
char *relname = agg->data.user_view_name.data;
Expand All @@ -2984,22 +3010,41 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
Relation user_view_rel = relation_open(user_view_oid, AccessShareLock);
Query *user_query = get_view_query(user_view_rel);
bool finalized = ContinuousAggIsFinalized(agg);
bool has_joins = false;
bool skip_rebuild = false;

/* Extract final query from user view query. */
Query *final_query = copyObject(user_query);
remove_old_and_new_rte_from_query(final_query);
if (!agg->data.materialized_only)
{
final_query = destroy_union_query(final_query);
}

if (finalized)
if (finalized && force_rebuild == false)
{
/* This continuous aggregate does not have partials, do not check for defects. */
relation_close(user_view_rel, NoLock);
return;
}

if (user_query->jointree && user_query->jointree->fromlist)
{
if ((list_length(user_query->jointree->fromlist) == CONTINUOUS_AGG_MAX_JOIN_RELATIONS) ||
(!IsA(linitial(user_query->jointree->fromlist), RangeTblRef)))
has_joins = true;
}
else if (final_query && final_query->rtable &&
list_length(final_query->rtable) == CONTINUOUS_AGG_MAX_JOIN_RELATIONS)
has_joins = true;

if (!has_joins || !force_rebuild)
skip_rebuild = true;
if (skip_rebuild)
{
relation_close(user_view_rel, NoLock);
return;
}
if (!agg->data.materialized_only)
{
final_query = destroy_union_query(final_query);
}
FinalizeQueryInfo fqi;
MatTableColumnInfo mattblinfo;
ObjectAddress mataddress = {
Expand All @@ -3021,7 +3066,8 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
fqi.finalized = finalized;
finalizequery_init(&fqi, direct_query, &mattblinfo);

mattablecolumninfo_addinternal(&mattblinfo);
if (!finalized)
mattablecolumninfo_addinternal(&mattblinfo);

Query *view_query =
finalizequery_get_select_query(&fqi, mattblinfo.matcollist, &mataddress, relname);
Expand All @@ -3041,7 +3087,6 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
* rebuild those views since the materialization table can not be queried correctly.
*/
test_failed = true;

/*
* When calling StoreViewQuery the target list names of the query have to
* match the view's tuple descriptor attribute names. But if a column of the continuous
Expand All @@ -3059,6 +3104,7 @@ cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht)
FormData_pg_attribute *attr = TupleDescAttr(desc, i);
view_tle = lfirst_node(TargetEntry, lc1);
user_tle = lfirst_node(TargetEntry, lc2);

if (view_tle->resjunk && user_tle->resjunk)
break;
else if (view_tle->resjunk || user_tle->resjunk)
Expand Down Expand Up @@ -3106,6 +3152,7 @@ tsl_cagg_try_repair(PG_FUNCTION_ARGS)
{
Oid relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
char relkind = get_rel_relkind(relid);
bool force_rebuild = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
ContinuousAgg *cagg = NULL;

if (RELKIND_VIEW == relkind)
Expand All @@ -3124,7 +3171,7 @@ tsl_cagg_try_repair(PG_FUNCTION_ARGS)
Hypertable *mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg->data.mat_hypertable_id);
Assert(mat_ht != NULL);

cagg_rebuild_view_definition(cagg, mat_ht);
cagg_rebuild_view_definition(cagg, mat_ht, force_rebuild);

ts_cache_release(hcache);

Expand Down

0 comments on commit f9d759f

Please sign in to comment.