Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make citus_stat_tenants work with schema-based tenants. #6936

Merged
merged 27 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b86bea2
Make ColocationIdGetTenantSchemaId accessable
gokhangulbiz Jun 5, 2023
495c7e2
Set colocationId even it has no partition key.
gokhangulbiz Jun 5, 2023
d888966
Set colocationId and partitionKeyValue for tasks even it has no parti…
gokhangulbiz Jun 5, 2023
b9c4ca4
Make tenant monitoring work with schema-based tenants
gokhangulbiz Jun 5, 2023
e847a22
Add regression tests.
gokhangulbiz May 29, 2023
83b5f0e
Comments & indents
gokhangulbiz Jun 5, 2023
fea3b1c
Update tenants statistics annotations normalization
gokhangulbiz Jun 5, 2023
bcae49d
Indent
gokhangulbiz Jun 7, 2023
80a52a0
Set colocationId and partitionKey if workerJob has a valid colocationId.
gokhangulbiz Jun 7, 2023
8ff5d36
Move schema lookup logic to UDF's
gokhangulbiz Jun 7, 2023
7241615
Merge branch 'main' into gokhangulbiz/schema-based-tenant-monitoring
gokhangulbiz Jun 8, 2023
e2ba016
Merge branch 'main' into gokhangulbiz/schema-based-tenant-monitoring
gokhangulbiz Jun 8, 2023
50194fe
Fix permissions for newly introduced udfs
gokhangulbiz Jun 8, 2023
6341c0a
Merge branch 'main' into gokhangulbiz/schema-based-tenant-monitoring
gokhangulbiz Jun 9, 2023
95ebd12
Indent
gokhangulbiz Jun 9, 2023
56691d7
Fix downgrade sql scripts
gokhangulbiz Jun 9, 2023
ad0e7c3
Merge branch 'main' into gokhangulbiz/schema-based-tenant-monitoring
gokhangulbiz Jun 9, 2023
a65ea36
magic number fix
gokhangulbiz Jun 9, 2023
b712ffa
Add missing header
gokhangulbiz Jun 9, 2023
5eb6c76
Add tests for single shard distributed tables
gokhangulbiz Jun 9, 2023
7b762cb
Merge branch 'main' into gokhangulbiz/schema-based-tenant-monitoring
gokhangulbiz Jun 9, 2023
6284039
Merge branch 'main' of https://github.com/citusdata/citus into gokhan…
gokhangulbiz Jun 11, 2023
54ead77
Add null-check in FillTenantStatsHashkey
gokhangulbiz Jun 11, 2023
9fca27b
Fix expected test output
gokhangulbiz Jun 11, 2023
dcfe045
Set client_min_messages to warning before drop schema to prevent flak…
gokhangulbiz Jun 12, 2023
91661a6
Merge branch 'main' into gokhangulbiz/schema-based-tenant-monitoring
gokhangulbiz Jun 12, 2023
4dc9d23
Merge branch 'main' of https://github.com/citusdata/citus into gokhan…
gokhangulbiz Jun 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -896,12 +896,6 @@ SetJobColocationId(Job *job)
{
uint32 jobColocationId = INVALID_COLOCATION_ID;

if (!job->partitionKeyValue)
{
/* if the Job has no shard key, nothing to do */
return;
}

List *rangeTableList = ExtractRangeTableEntryList(job->jobQuery);
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
Expand Down
6 changes: 3 additions & 3 deletions src/backend/distributed/executor/local_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/query_utils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
Expand Down Expand Up @@ -382,13 +383,12 @@ ExecuteLocalTaskListExtended(List *taskList,

/*
* SetColocationIdAndPartitionKeyValueForTasks sets colocationId and partitionKeyValue
* for the tasks in the taskList if workerJob has a colocationId and partitionKeyValue.
* for the tasks in the taskList.
*/
static void
SetColocationIdAndPartitionKeyValueForTasks(List *taskList, Job *workerJob)
{
if (workerJob->colocationId != 0 &&
Copy link
Contributor

@JelteF JelteF Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep the workerJob->colocationId != 0 check, might be a nice optimization for multi shard queries with many tasks.

workerJob->partitionKeyValue != NULL)
if (workerJob->colocationId != INVALID_COLOCATION_ID)
{
Task *task = NULL;
foreach_ptr(task, taskList)
Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/sql/citus--11.3-1--12.0-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ GRANT SELECT ON pg_catalog.pg_dist_tenant_schema TO public;

#include "udfs/citus_tables/12.0-1.sql"
#include "udfs/citus_shards/12.0-1.sql"

-- udfs used to include schema-based tenants in tenant monitoring
#include "udfs/citus_stat_tenants_local/12.0-1.sql"
14 changes: 14 additions & 0 deletions src/backend/distributed/sql/downgrades/citus--12.0-1--11.3-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,17 @@ DROP FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(Oid, t
#include "../udfs/citus_shards/11.1-1.sql"

DROP TABLE pg_catalog.pg_dist_tenant_schema;

DROP VIEW pg_catalog.citus_stat_tenants_local;
DROP FUNCTION pg_catalog.citus_stat_tenants_local_internal(
BOOLEAN,
OUT INT,
OUT TEXT,
OUT INT,
OUT INT,
OUT INT,
OUT INT,
OUT DOUBLE PRECISION,
OUT DOUBLE PRECISION,
OUT BIGINT);
#include "../udfs/citus_stat_tenants_local/11.3-1.sql"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local_internal(
return_all_tenants BOOLEAN DEFAULT FALSE,
OUT colocation_id INT,
OUT tenant_attribute TEXT,
Expand All @@ -13,8 +13,40 @@ RETURNS SETOF RECORD
LANGUAGE C
AS 'citus', $$citus_stat_tenants_local$$;

CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
return_all_tenants BOOLEAN DEFAULT FALSE,
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT read_count_in_this_period INT,
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT cpu_usage_in_this_period DOUBLE PRECISION,
OUT cpu_usage_in_last_period DOUBLE PRECISION,
OUT score BIGINT)
RETURNS SETOF RECORD
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN QUERY
SELECT
L.colocation_id,
CASE WHEN L.tenant_attribute IS NULL THEN N.nspname ELSE L.tenant_attribute END COLLATE "default" as tenant_attribute,
L.read_count_in_this_period,
L.read_count_in_last_period,
L.query_count_in_this_period,
L.query_count_in_last_period,
L.cpu_usage_in_this_period,
L.cpu_usage_in_last_period,
L.score
FROM pg_catalog.citus_stat_tenants_local_internal(return_all_tenants) L
LEFT JOIN pg_dist_tenant_schema S ON L.tenant_attribute IS NULL AND L.colocation_id = S.colocationid
LEFT JOIN pg_namespace N ON N.oid = S.schemaid
ORDER BY L.score DESC;
END;
$function$;

CREATE OR REPLACE VIEW citus.citus_stat_tenants_local AS
CREATE OR REPLACE VIEW pg_catalog.citus_stat_tenants_local AS
SELECT
colocation_id,
tenant_attribute,
Expand All @@ -27,7 +59,8 @@ SELECT
FROM pg_catalog.citus_stat_tenants_local()
ORDER BY score DESC;

ALTER VIEW citus.citus_stat_tenants_local SET SCHEMA pg_catalog;
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local_internal(BOOLEAN) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local_internal(BOOLEAN) TO pg_monitor;

REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) TO pg_monitor;
Expand Down
102 changes: 82 additions & 20 deletions src/backend/distributed/utils/citus_stat_tenants.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/tuplestore.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "executor/execdesc.h"
Expand All @@ -30,16 +31,18 @@
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/json.h"

#include "utils/lsyscache.h"
#include "utils/syscache.h"

#include <time.h>

static void AttributeMetricsIfApplicable(void);

ExecutorEnd_hook_type prev_ExecutorEnd = NULL;

#define ATTRIBUTE_PREFIX "/*{\"tId\":"
#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
#define ATTRIBUTE_PREFIX "/*{\"cId\":"
#define ATTRIBUTE_STRING_FORMAT "/*{\"cId\":%d,\"tId\":%s}*/"
#define ATTRIBUTE_STRING_FORMAT_WITHOUT_TID "/*{\"cId\":%d}*/"
#define STAT_TENANTS_COLUMNS 9
#define ONE_QUERY_SCORE 1000000000

Expand Down Expand Up @@ -155,7 +158,17 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
TenantStats *tenantStats = stats[i];

values[0] = Int32GetDatum(tenantStats->key.colocationGroupId);
values[1] = PointerGetDatum(cstring_to_text(tenantStats->key.tenantAttribute));

if (tenantStats->key.tenantAttribute[0] == '\0')
{
isNulls[1] = true;
}
else
{
values[1] = PointerGetDatum(cstring_to_text(
tenantStats->key.tenantAttribute));
}

values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
Expand Down Expand Up @@ -221,7 +234,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
return;
}

strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
AttributeToColocationGroupId = INVALID_COLOCATION_ID;

if (query_string == NULL)
{
Expand Down Expand Up @@ -258,7 +271,7 @@ void
AttributeTask(char *tenantId, int colocationId, CmdType commandType)
{
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
tenantId == NULL || colocationId == INVALID_COLOCATION_ID)
colocationId == INVALID_COLOCATION_ID)
{
return;
}
Expand All @@ -281,36 +294,80 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)
}
}

/*
* if tenantId is NULL, it must be a schema-based tenant and
* we try to get the tenantId from the colocationId to lookup schema name and use it as a tenantId
*/
if (tenantId == NULL)
{
if (!IsTenantSchemaColocationGroup(colocationId))
{
return;
}
}

AttributeToColocationGroupId = colocationId;
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
if (tenantId != NULL)
{
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
}
else
{
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
}
AttributeToCommandType = commandType;
QueryStartClock = clock();
}


/*
* AnnotateQuery annotates the query with tenant attributes.
* if the query has a partition key, we annotate it with the partition key value and colocationId
* if the query doesn't have a partition key and if it's a schema-based tenant, we annotate it with the colocationId only.
*/
char *
AnnotateQuery(char *queryString, Const *partitionKeyValue, int colocationId)
{
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionKeyValue == NULL)
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
colocationId == INVALID_COLOCATION_ID)
{
return queryString;
}

char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue,
partitionKeyValue->consttype);
StringInfo newQuery = makeStringInfo();

char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString);
StringInfo escapedSourceName = makeStringInfo();
/* if the query doesn't have a parititon key value, check if it is a tenant schema */
if (partitionKeyValue == NULL)
{
if (IsTenantSchemaColocationGroup(colocationId))
{
/* If it is a schema-based tenant, we only annotate the query with colocationId */
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT_WITHOUT_TID,
colocationId);
}
else
{
/* If it is not a schema-based tenant query and doesn't have a parititon key,
* we don't annotate it
*/
return queryString;
}
}
else
{
/* if the query has a partition key value, we annotate it with both tenantId and colocationId */
char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue,
partitionKeyValue->consttype);

escape_json(escapedSourceName, commentCharsEscaped);
char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString);
StringInfo escapedSourceName = makeStringInfo();
escape_json(escapedSourceName, commentCharsEscaped);

StringInfo newQuery = makeStringInfo();
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
colocationId);
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, colocationId,
escapedSourceName->data
);
}

appendStringInfoString(newQuery, queryString);

Expand Down Expand Up @@ -372,7 +429,7 @@ static void
AttributeMetricsIfApplicable()
{
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
AttributeToTenant[0] == '\0')
AttributeToColocationGroupId == INVALID_COLOCATION_ID)
{
return;
}
Expand Down Expand Up @@ -449,7 +506,7 @@ AttributeMetricsIfApplicable()
}
LWLockRelease(&monitor->lock);

strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
AttributeToColocationGroupId = INVALID_COLOCATION_ID;
}


Expand Down Expand Up @@ -761,7 +818,12 @@ FillTenantStatsHashKey(TenantStatsHashKey *key, char *tenantAttribute, uint32
colocationGroupId)
{
memset(key->tenantAttribute, 0, MAX_TENANT_ATTRIBUTE_LENGTH);
strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH);

if (tenantAttribute != NULL)
{
strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH);
}

key->colocationGroupId = colocationGroupId;
}

Expand Down
5 changes: 1 addition & 4 deletions src/backend/distributed/utils/tenant_schema_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
#include "utils/fmgroids.h"


static Oid ColocationIdGetTenantSchemaId(uint32 colocationId);


/*
* IsTenantSchema returns true if there is a tenant schema with given schemaId.
*/
Expand Down Expand Up @@ -125,7 +122,7 @@ SchemaIdGetTenantColocationId(Oid schemaId)
*
* Returns InvalidOid if there is no such tenant schema.
*/
static Oid
Oid
ColocationIdGetTenantSchemaId(uint32 colocationId)
{
if (colocationId == INVALID_COLOCATION_ID)
Expand Down