Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tenant monitoring performance improvements #6868

Merged
merged 27 commits into from
Jun 11, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b437aa9
Use spinlock instead of lwlock per tenant
gokhangulbiz Apr 19, 2023
ccd464b
Initial hashtable implementation for tenant stats
gokhangulbiz Apr 24, 2023
779af6a
Merge remote-tracking branch 'upstream/main' into gokhangulbiz/tenant…
gokhangulbiz Apr 26, 2023
817b1df
Remove unneccesarry locks and add comments
gokhangulbiz Apr 26, 2023
4bfb2a0
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Apr 26, 2023
7269755
Indent
gokhangulbiz Apr 27, 2023
f5befff
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz May 4, 2023
7649b9d
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz May 8, 2023
301c258
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz May 9, 2023
9c4c402
Use HASH_BLOB flag for tenants
gokhangulbiz May 9, 2023
f53b4cb
Simplfy CreateTenantStats()
gokhangulbiz May 10, 2023
0466b81
Refactor CreateTenantStatsHashKey() to FillTenantStatsHashKey
gokhangulbiz May 10, 2023
5027321
Merge remote-tracking branch 'upstream/main' into gokhangulbiz/tenant…
gokhangulbiz May 10, 2023
7575f36
Indent
gokhangulbiz May 10, 2023
f88e988
Store TenantsStatsHashKey on the stack
gokhangulbiz May 11, 2023
dc7aa52
Indent
gokhangulbiz May 11, 2023
a8d3805
Probabilistic tracking for new tenants.
gokhangulbiz May 22, 2023
f6c4a78
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Jun 9, 2023
b960d40
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Jun 9, 2023
ebbbf40
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Jun 9, 2023
adcaff6
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Jun 9, 2023
593d86d
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Jun 9, 2023
29b4ee4
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Jun 9, 2023
04bc313
Merge branch 'main' into gokhangulbiz/tenant-stats-perf-improvements
gokhangulbiz Jun 9, 2023
fdeba17
Fix sorting issue.
gokhangulbiz Jun 11, 2023
ac90c5d
Increase tenant scores to prevent flakyness due to eviction.
gokhangulbiz Jun 11, 2023
648b53c
Refactor probabilistic tracking implementation.
gokhangulbiz Jun 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2012,7 +2012,7 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
* overlaps with the current move's target node.
* The earlier/first move might make space for the later/second move.
* So we could run out of disk space (or at least overload the node)
* if we move the second shard to it before the first one is moved away. 
* if we move the second shard to it before the first one is moved away.
*/
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND,
Expand Down
205 changes: 132 additions & 73 deletions src/backend/distributed/utils/citus_stat_tenants.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "postgres.h"
#include "unistd.h"

#include "access/hash.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/colocation_utils.h"
#include "distributed/distributed_planner.h"
Expand Down Expand Up @@ -50,7 +51,6 @@

static const char *SharedMemoryNameForMultiTenantMonitor =
"Shared memory for multi tenant monitor";
static char *TenantTrancheName = "Tenant Tranche";
static char *MonitorTrancheName = "Multi Tenant Monitor Tranche";

static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
Expand All @@ -60,12 +60,14 @@
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void EvictTenantsIfNecessary(TimestampTz queryTime);
static void RecordTenantStats(TenantStats *tenantStats, TimestampTz queryTime);
static void CreateMultiTenantMonitor(void);
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
static MultiTenantMonitor * GetMultiTenantMonitor(void);
static void MultiTenantMonitorSMInit(void);
static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime);
static int FindTenantStats(MultiTenantMonitor *monitor);
static TenantStats * CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz
queryTime);
static void FillTenantStatsHashKey(TenantStatsHashKey *key, char *tenantAttribute, uint32
colocationGroupId);
static TenantStats * FindTenantStats(MultiTenantMonitor *monitor);
static size_t MultiTenantMonitorshmemSize(void);
static char * ExtractTopComment(const char *inputString);
static char * EscapeCommentChars(const char *str);
Expand Down Expand Up @@ -113,32 +115,47 @@
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);

int numberOfRowsToReturn = 0;
int tenantStatsCount = hash_get_num_entries(monitor->tenants);
if (returnAllTenants)
{
numberOfRowsToReturn = monitor->tenantCount;
numberOfRowsToReturn = tenantStatsCount;
}
else
{
numberOfRowsToReturn = Min(monitor->tenantCount, StatTenantsLimit);
numberOfRowsToReturn = Min(tenantStatsCount,
StatTenantsLimit);
}

for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
/* Allocate an array to hold the tenants. */
TenantStats **stats = palloc(tenantStatsCount *
sizeof(TenantStats *));

HASH_SEQ_STATUS hash_seq;
TenantStats *stat;

/* Get all the tenants from the hash table. */
int j = 0;
hash_seq_init(&hash_seq, monitor->tenants);
while ((stat = hash_seq_search(&hash_seq)) != NULL)
{
UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
stats[j++] = stat;
UpdatePeriodsIfNecessary(stat, monitoringTime);
ReduceScoreIfNecessary(stat, monitoringTime);
}
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),

/* Sort the tenants by their score. */
SafeQsort(stats, j, sizeof(TenantStats *),
CompareTenantScore);

for (int i = 0; i < numberOfRowsToReturn; i++)
{
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));

TenantStats *tenantStats = &monitor->tenants[i];
TenantStats *tenantStats = stats[i];

values[0] = Int32GetDatum(tenantStats->colocationGroupId);
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
values[0] = Int32GetDatum(tenantStats->key.colocationGroupId);
values[1] = PointerGetDatum(cstring_to_text(tenantStats->key.tenantAttribute));
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
Expand All @@ -152,21 +169,46 @@
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}

pfree(stats);

LWLockRelease(&monitor->lock);

PG_RETURN_VOID();
}


#include "miscadmin.h"

/*
* citus_stat_tenants_local_reset resets monitor for tenant statistics
* on the local node.
*/
Datum
citus_stat_tenants_local_reset(PG_FUNCTION_ARGS)
{
/* ereport(NOTICE, (errmsg("MyProcPid: %d", MyProcPid))); */
/*sleep(10); */

MultiTenantMonitor *monitor = GetMultiTenantMonitor();
monitor->tenantCount = 0;

/* if monitor is not created yet, there is nothing to reset */
if (monitor == NULL)
{
PG_RETURN_VOID();
}

HASH_SEQ_STATUS hash_seq;
TenantStats *stats;

LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);

hash_seq_init(&hash_seq, monitor->tenants);
while ((stats = hash_seq_search(&hash_seq)) != NULL)
{
hash_search(monitor->tenants, &stats->key, HASH_REMOVE, NULL);
}

LWLockRelease(&monitor->lock);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -353,45 +395,43 @@
*/
LWLockAcquire(&monitor->lock, LW_SHARED);

int currentTenantIndex = FindTenantStats(monitor);
TenantStats *tenantStats = FindTenantStats(monitor);

if (currentTenantIndex != -1)
if (tenantStats != NULL)
{
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
SpinLockAcquire(&tenantStats->lock);

Check warning on line 402 in src/backend/distributed/utils/citus_stat_tenants.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/utils/citus_stat_tenants.c#L402

Added line #L402 was not covered by tests

UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats, queryTime);

LWLockRelease(&tenantStats->lock);
SpinLockRelease(&tenantStats->lock);
}
else
{
LWLockRelease(&monitor->lock);

LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
currentTenantIndex = FindTenantStats(monitor);
tenantStats = FindTenantStats(monitor);

if (currentTenantIndex == -1)
if (tenantStats == NULL)
{
currentTenantIndex = CreateTenantStats(monitor, queryTime);
tenantStats = CreateTenantStats(monitor, queryTime);
}

LWLockRelease(&monitor->lock);

LWLockAcquire(&monitor->lock, LW_SHARED);
currentTenantIndex = FindTenantStats(monitor);
if (currentTenantIndex != -1)
tenantStats = FindTenantStats(monitor);
if (tenantStats != NULL)
{
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
SpinLockAcquire(&tenantStats->lock);

Check warning on line 428 in src/backend/distributed/utils/citus_stat_tenants.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/utils/citus_stat_tenants.c#L428

Added line #L428 was not covered by tests

UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats, queryTime);

LWLockRelease(&tenantStats->lock);
SpinLockRelease(&tenantStats->lock);
}
}
LWLockRelease(&monitor->lock);
Expand Down Expand Up @@ -507,15 +547,29 @@
*
* Every time tenant count hits StatTenantsLimit * 3, we reduce it back to StatTenantsLimit * 2.
*/
if (monitor->tenantCount >= StatTenantsLimit * 3)
long tenantStatsCount = hash_get_num_entries(monitor->tenants);
if (tenantStatsCount >= StatTenantsLimit * 3)
{
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
HASH_SEQ_STATUS hash_seq;
TenantStats *stat;
TenantStats **stats = palloc(tenantStatsCount *
sizeof(TenantStats *));

int i = 0;
hash_seq_init(&hash_seq, monitor->tenants);
while ((stat = hash_seq_search(&hash_seq)) != NULL)
{
stats[i++] = stat;
}

SafeQsort(stats, i, sizeof(TenantStats *), CompareTenantScore);

for (i = StatTenantsLimit * 2; i < tenantStatsCount; i++)
{
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime);
hash_search(monitor->tenants, &stats[i]->key, HASH_REMOVE, NULL);
}
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
CompareTenantScore);
monitor->tenantCount = StatTenantsLimit * 2;

pfree(stats);
}
}

Expand Down Expand Up @@ -553,17 +607,6 @@
}


/*
* CreateMultiTenantMonitor creates the data structure for multi tenant monitor.
*/
static void
CreateMultiTenantMonitor()
{
MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor();
monitor->tenantCount = 0;
}


/*
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
*/
Expand All @@ -586,6 +629,17 @@
monitor->namedLockTranche.trancheName);
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);

HASHCTL info;

memset(&info, 0, sizeof(info));
info.keysize = sizeof(TenantStatsHashKey);
info.entrysize = sizeof(TenantStats);

monitor->tenants = ShmemInitHash("citus_stats_tenants hash",
StatTenantsLimit * 3, StatTenantsLimit * 3,
&info, HASH_ELEM |
HASH_SHARED_MEM | HASH_BLOBS);

return monitor;
}

Expand Down Expand Up @@ -629,7 +683,7 @@
static void
MultiTenantMonitorSMInit()
{
CreateMultiTenantMonitor();
CreateSharedMemoryForMultiTenantMonitor();

if (prev_shmem_startup_hook != NULL)
{
Expand All @@ -643,7 +697,7 @@
*
* Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode.
*/
static int
static TenantStats *
CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
{
/*
Expand All @@ -652,45 +706,50 @@
*/
EvictTenantsIfNecessary(queryTime);

int tenantIndex = monitor->tenantCount;

memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex]));
TenantStatsHashKey key = { 0 };
FillTenantStatsHashKey(&key, AttributeToTenant, AttributeToColocationGroupId);

strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
sizeof(monitor->tenants[tenantIndex].tenantAttribute), AttributeToTenant);
monitor->tenants[tenantIndex].colocationGroupId = AttributeToColocationGroupId;
TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, &key,
HASH_ENTER, NULL);

monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
monitor->tenants[tenantIndex].namedLockTranche.trancheName = TenantTrancheName;
stats->writesInLastPeriod = 0;
stats->writesInThisPeriod = 0;
stats->readsInLastPeriod = 0;
stats->readsInThisPeriod = 0;
stats->cpuUsageInLastPeriod = 0;
stats->cpuUsageInThisPeriod = 0;
stats->score = 0;
stats->lastScoreReduction = 0;

LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId,
monitor->tenants[tenantIndex].namedLockTranche.trancheName);
LWLockInitialize(&monitor->tenants[tenantIndex].lock,
monitor->tenants[tenantIndex].namedLockTranche.trancheId);
SpinLockInit(&stats->lock);

monitor->tenantCount++;

return tenantIndex;
return stats;
}


/*
* FindTenantStats finds the index for the current tenant's statistics.
* FindTenantStats finds the current tenant's statistics.
*/
static int
static TenantStats *
FindTenantStats(MultiTenantMonitor *monitor)
{
for (int i = 0; i < monitor->tenantCount; i++)
{
TenantStats *tenantStats = &monitor->tenants[i];
if (strcmp(tenantStats->tenantAttribute, AttributeToTenant) == 0 &&
tenantStats->colocationGroupId == AttributeToColocationGroupId)
{
return i;
}
}
TenantStatsHashKey key = { 0 };
FillTenantStatsHashKey(&key, AttributeToTenant, AttributeToColocationGroupId);

return -1;
TenantStats *stats = (TenantStats *) hash_search(monitor->tenants, &key,
HASH_FIND, NULL);

return stats;
}


static void
FillTenantStatsHashKey(TenantStatsHashKey *key, char *tenantAttribute, uint32
colocationGroupId)
{
memset(key->tenantAttribute, 0, MAX_TENANT_ATTRIBUTE_LENGTH);
strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH);
key->colocationGroupId = colocationGroupId;
}


Expand Down