Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid rebuilding MetadataCache for each placement insertion #7163

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
CreateTruncateTrigger(relationId);
}

/* create shards for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED)
{
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId,
localTableEmpty);
}
else if (tableType == REFERENCE_TABLE)
{
/* create shards for reference table */
CreateReferenceTableShard(relationId);
}
else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
/* create the shard of given single-shard distributed table */
CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId);
}
Expand Down Expand Up @@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,


/*
* CreateHashDistributedTableShards creates the shard of given single-shard
* CreateSingleShardTableShard creates the shard of given single-shard
* distributed table.
*/
static void
Expand Down
66 changes: 47 additions & 19 deletions src/backend/distributed/operations/create_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;

/* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId);
Expand Down Expand Up @@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId();
uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*shardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, shardIdPtr);

/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
Expand All @@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);

InsertShardRow(distributedTableId, shardId, shardStorageType,
InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText);

List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,
shardId,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
*shardIdPtr,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
}

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements);
placementsForShard);
}

CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
Expand Down Expand Up @@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool

/*
* load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
Expand Down Expand Up @@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId)
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);

List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
shardId,
nodeList,
workerStartIndex,
replicationFactor);

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);

CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection);
Expand Down Expand Up @@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
minHashTokenText, maxHashTokenText);

int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
InsertShardPlacementRows(relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);

/*
* We don't need to force using exclusive connections because we're anyway
Expand Down
15 changes: 6 additions & 9 deletions src/backend/distributed/operations/stage_protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,

/*
* InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements.
* the coordinator node.
*/
List *
void
InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerStartIndex, int replicationFactor)
{
int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;

for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++)
{
Expand All @@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
uint32 nodeGroupId = workerNode->groupId;
const uint64 shardSize = 0;

uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID,
shardSize, nodeGroupId);
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
InsertShardPlacementRow(shardId,
INVALID_PLACEMENT_ID,
shardSize,
nodeGroupId);
}

return insertedShardPlacements;
}


Expand Down
6 changes: 3 additions & 3 deletions src/include/distributed/coordinator_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,
Expand Down