Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use citus_shard_sizes in citus_tables #7018

Merged
merged 6 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
bool firstValue);
static char * GenerateSizeQueryForRelationNameList(List *quotedShardNames,
char *sizeFunction);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
Expand All @@ -104,7 +105,7 @@ static List * OpenConnectionToNodes(List *workerNodeList);
static void ReceiveShardIdAndSizeResults(List *connectionList,
Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval);
static void AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval);

static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes,
uint64 totalBytes);
Expand Down Expand Up @@ -916,6 +917,12 @@ static char *
GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds)
{
StringInfo allShardStatisticsQuery = makeStringInfo();
bool insertedValues = false;

appendStringInfoString(allShardStatisticsQuery, "SELECT shard_id, ");
appendStringInfo(allShardStatisticsQuery, PG_TOTAL_RELATION_SIZE_FUNCTION,
"table_name");
appendStringInfoString(allShardStatisticsQuery, " FROM (VALUES ");

Oid relationId = InvalidOid;
foreach_oid(relationId, citusTableIds)
Expand All @@ -930,46 +937,60 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI
{
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode,
relationId);
char *shardStatisticsQuery =
GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode);
appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery);
if (list_length(shardIntervalsOnNode) == 0)
{
relation_close(relation, AccessShareLock);
continue;
}
char *shardIdNameValues =
GenerateShardIdNameValuesForShardList(shardIntervalsOnNode,
!insertedValues);
insertedValues = true;
appendStringInfoString(allShardStatisticsQuery, shardIdNameValues);
relation_close(relation, AccessShareLock);
}
}

/* Add a dummy entry so that UNION ALL doesn't complain */
appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, 0::bigint;");
if (!insertedValues)
{
return "SELECT 0 AS shard_id, '' AS table_name LIMIT 0";
}

appendStringInfoString(allShardStatisticsQuery, ") t(shard_id, table_name) "
"WHERE to_regclass(table_name) IS NOT NULL");
return allShardStatisticsQuery->data;
}


/*
* GenerateShardStatisticsQueryForShardList generates a query that returns:
* SELECT shard_id, shard_name, shard_size for all shards in the list
* GenerateShardIdNameValuesForShardList generates a list of (shard_id, shard_name) values
* for all shards in the list
*/
static char *
GenerateShardStatisticsQueryForShardList(List *shardIntervalList)
GenerateShardIdNameValuesForShardList(List *shardIntervalList, bool firstValue)
{
StringInfo selectQuery = makeStringInfo();

ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
AppendShardSizeQuery(selectQuery, shardInterval);
appendStringInfo(selectQuery, " UNION ALL ");
if (!firstValue)
{
appendStringInfoString(selectQuery, ", ");
}
firstValue = false;
AppendShardIdNameValues(selectQuery, shardInterval);
}

return selectQuery->data;
}


/*
* AppendShardSizeQuery appends a query in the following form to selectQuery
* SELECT shard_id, shard_name, shard_size
* AppendShardIdNameValues appends (shard_id, shard_name) for shard
*/
static void
AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval)
{
uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
Expand All @@ -981,8 +1002,7 @@ AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);

appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
appendStringInfo(selectQuery, PG_TOTAL_RELATION_SIZE_FUNCTION, quotedShardName);
appendStringInfo(selectQuery, "(" UINT64_FORMAT ", %s)", shardId, quotedShardName);
}


Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/sql/citus--11.3-1--12.0-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ GRANT SELECT ON pg_catalog.pg_dist_schema TO public;
#include "udfs/citus_drop_trigger/12.0-1.sql"

DROP VIEW citus_shards;
DROP VIEW IF EXISTS pg_catalog.citus_tables;
DROP VIEW IF EXISTS public.citus_tables;
DROP FUNCTION citus_shard_sizes;
#include "udfs/citus_shard_sizes/12.0-1.sql"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ DROP FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(Oid, t
DROP VIEW IF EXISTS public.citus_schemas;
DROP VIEW IF EXISTS pg_catalog.citus_schemas;

DROP VIEW IF EXISTS public.citus_tables;
DROP VIEW IF EXISTS pg_catalog.citus_tables;

DROP VIEW pg_catalog.citus_shards;
DROP FUNCTION pg_catalog.citus_shard_sizes;
#include "../udfs/citus_shard_sizes/10.0-1.sql"
Expand Down
9 changes: 8 additions & 1 deletion src/backend/distributed/sql/udfs/citus_tables/12.0-1.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion src/backend/distributed/sql/udfs/citus_tables/latest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ citus_tables_create_query=$CTCQ$
END AS citus_table_type,
coalesce(column_to_column_name(logicalrelid, partkey), '<none>') AS distribution_column,
colocationid AS colocation_id,
pg_size_pretty(citus_total_relation_size(logicalrelid, fail_on_error := false)) AS table_size,
pg_size_pretty(table_sizes.table_size) AS table_size,
(select count(*) from pg_dist_shard where logicalrelid = p.logicalrelid) AS shard_count,
pg_get_userbyid(relowner) AS table_owner,
amname AS access_method
Expand All @@ -24,6 +24,13 @@ citus_tables_create_query=$CTCQ$
pg_class c ON (p.logicalrelid = c.oid)
LEFT JOIN
pg_am a ON (a.oid = c.relam)
JOIN
(
SELECT ds.logicalrelid AS table_id, SUM(css.size) AS table_size
FROM citus_shard_sizes() css, pg_dist_shard ds
WHERE css.shard_id = ds.shardid
GROUP BY ds.logicalrelid
) table_sizes ON (table_sizes.table_id = p.logicalrelid)
WHERE
-- filter out tables owned by extensions
logicalrelid NOT IN (
Expand Down
12 changes: 6 additions & 6 deletions src/test/regress/expected/citus_update_table_statistics.out
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_hash');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT 0 AS shard_id, '' AS table_name LIMIT 0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981000, 'public.test_table_statistics_hash_981000'), (981001, 'public.test_table_statistics_hash_981001'), (981002, 'public.test_table_statistics_hash_981002'), (981003, 'public.test_table_statistics_hash_981003'), (981004, 'public.test_table_statistics_hash_981004'), (981005, 'public.test_table_statistics_hash_981005'), (981006, 'public.test_table_statistics_hash_981006'), (981007, 'public.test_table_statistics_hash_981007')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981000, 'public.test_table_statistics_hash_981000'), (981001, 'public.test_table_statistics_hash_981001'), (981002, 'public.test_table_statistics_hash_981002'), (981003, 'public.test_table_statistics_hash_981003'), (981004, 'public.test_table_statistics_hash_981004'), (981005, 'public.test_table_statistics_hash_981005'), (981006, 'public.test_table_statistics_hash_981006'), (981007, 'public.test_table_statistics_hash_981007')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
Expand Down Expand Up @@ -158,15 +158,15 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_append');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT 0 AS shard_id, '' AS table_name LIMIT 0
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981008, 'public.test_table_statistics_append_981008'), (981009, 'public.test_table_statistics_append_981009')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, 0::bigint;
NOTICE: issuing SELECT shard_id, pg_total_relation_size(table_name) FROM (VALUES (981008, 'public.test_table_statistics_append_981008'), (981009, 'public.test_table_statistics_append_981009')) t(shard_id, table_name) WHERE to_regclass(table_name) IS NOT NULL
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
Expand Down