Skip to content

Commit

Permalink
avoid rebuilding MetadataCache for each placement insertion (#7163)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhjwpku authored and francisjodi committed Nov 13, 2023
1 parent a62e6f6 commit 6c0c3fa
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 33 deletions.
6 changes: 4 additions & 2 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
CreateTruncateTrigger(relationId);
}

/* create shards for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED)
{
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId,
localTableEmpty);
}
else if (tableType == REFERENCE_TABLE)
{
/* create shards for reference table */
CreateReferenceTableShard(relationId);
}
else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
/* create the shard of given single-shard distributed table */
CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId);
}
Expand Down Expand Up @@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,


/*
* CreateHashDistributedTableShards creates the shard of given single-shard
* CreateSingleShardTableShard creates the shard of given single-shard
* distributed table.
*/
static void
Expand Down
66 changes: 47 additions & 19 deletions src/backend/distributed/operations/create_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;

/* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId);
Expand Down Expand Up @@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId();
uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*shardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, shardIdPtr);

/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
Expand All @@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);

InsertShardRow(distributedTableId, shardId, shardStorageType,
InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText);

List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,
shardId,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
*shardIdPtr,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
}

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements);
placementsForShard);
}

CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
Expand Down Expand Up @@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool

/*
* load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
Expand Down Expand Up @@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId)
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);

List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
shardId,
nodeList,
workerStartIndex,
replicationFactor);

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);

CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection);
Expand Down Expand Up @@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
minHashTokenText, maxHashTokenText);

int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
InsertShardPlacementRows(relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);

/*
* We don't need to force using exclusive connections because we're anyway
Expand Down
15 changes: 6 additions & 9 deletions src/backend/distributed/operations/stage_protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,

/*
* InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements.
* the coordinator node.
*/
List *
void
InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerStartIndex, int replicationFactor)
{
int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;

for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++)
{
Expand All @@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
uint32 nodeGroupId = workerNode->groupId;
const uint64 shardSize = 0;

uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID,
shardSize, nodeGroupId);
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
InsertShardPlacementRow(shardId,
INVALID_PLACEMENT_ID,
shardSize,
nodeGroupId);
}

return insertedShardPlacements;
}


Expand Down
6 changes: 3 additions & 3 deletions src/include/distributed/coordinator_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,
Expand Down

0 comments on commit 6c0c3fa

Please sign in to comment.