Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a knob for merge parallelism and add log for max shard size. #11342

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( WIGGLING_RELOCATION_PARALLELISM_PER_SOURCE_SERVER, 2 ); if( randomize && BUGGIFY ) WIGGLING_RELOCATION_PARALLELISM_PER_SOURCE_SERVER = 1;
init( RELOCATION_PARALLELISM_PER_SOURCE_SERVER, 2 ); if( randomize && BUGGIFY ) RELOCATION_PARALLELISM_PER_SOURCE_SERVER = 1;
init( RELOCATION_PARALLELISM_PER_DEST_SERVER, 10 ); if( randomize && BUGGIFY ) RELOCATION_PARALLELISM_PER_DEST_SERVER = 1; // Note: if this is smaller than FETCH_KEYS_PARALLELISM, this will artificially reduce performance. The current default of 10 is probably too high but is set conservatively for now.
init( MERGE_RELOCATION_PARALLELISM_PER_TEAM, 6 ); if (randomize && BUGGIFY ) MERGE_RELOCATION_PARALLELISM_PER_TEAM = 1;
init( DD_QUEUE_MAX_KEY_SERVERS, 100 ); // Do not buggify
init( DD_REBALANCE_PARALLELISM, 50 );
init( DD_REBALANCE_RESET_AMOUNT, 30 );
Expand Down
1 change: 1 addition & 0 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
// than healthy priority
double RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
double RELOCATION_PARALLELISM_PER_DEST_SERVER;
double MERGE_RELOCATION_PARALLELISM_PER_TEAM;
int DD_QUEUE_MAX_KEY_SERVERS;
int DD_REBALANCE_PARALLELISM;
int DD_REBALANCE_RESET_AMOUNT;
Expand Down
7 changes: 6 additions & 1 deletion fdbserver/DDRelocationQueue.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,13 @@ int getSrcWorkFactor(RelocateData const& relocation, int singleRegionTeamSize) {
// we want to set PRIORITY_PERPETUAL_STORAGE_WIGGLE to a reasonably large value
// to make this parallelism take effect
return WORK_FULL_UTILIZATION / SERVER_KNOBS->WIGGLING_RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
else // for now we assume that any message at a lower priority can best be assumed to have a full team left for work
else if (relocation.priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD)
return WORK_FULL_UTILIZATION / SERVER_KNOBS->MERGE_RELOCATION_PARALLELISM_PER_TEAM;
else { // for now we assume that any message at a lower priority can best be assumed to have a full team left for
// work

return WORK_FULL_UTILIZATION / singleRegionTeamSize / SERVER_KNOBS->RELOCATION_PARALLELISM_PER_SOURCE_SERVER;
}
}

int getDestWorkFactor() {
Expand Down
13 changes: 6 additions & 7 deletions fdbserver/DDShardTracker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ int64_t getMaxShardSize(double dbSizeEstimate) {
size = std::max(size, static_cast<int64_t>(SERVER_KNOBS->MAX_LARGE_SHARD_BYTES));
}

TraceEvent("MaxShardSize")
.suppressFor(60.0)
.detail("Bytes", size)
.detail("EstimatedDbSize", dbSizeEstimate)
.detail("SqrtBytes", SERVER_KNOBS->SHARD_BYTES_PER_SQRT_BYTES)
.detail("AllowLargeShard", SERVER_KNOBS->ALLOW_LARGE_SHARD);
return size;
}

Expand Down Expand Up @@ -933,9 +939,6 @@ Future<Void> shardMerger(DataDistributionTracker* self,
const UID actionId = deterministicRandom()->randomUniqueID();
const Severity stSev = static_cast<Severity>(SERVER_KNOBS->DD_SHARD_TRACKING_LOG_SEVERITY);
int64_t maxShardSize = self->maxShardSize->get().get();
if (SERVER_KNOBS->ALLOW_LARGE_SHARD) {
maxShardSize = SERVER_KNOBS->MAX_LARGE_SHARD_BYTES;
}

auto prevIter = self->shards->rangeContaining(keys.begin);
auto nextIter = self->shards->rangeContaining(keys.begin);
Expand Down Expand Up @@ -1125,10 +1128,6 @@ ACTOR Future<Void> shardEvaluator(DataDistributionTracker* self,
StorageMetrics const& stats = shardSize->get().get().metrics;
auto bandwidthStatus = getBandwidthStatus(stats);

if (SERVER_KNOBS->ALLOW_LARGE_SHARD) {
shardBounds.max.bytes = SERVER_KNOBS->MAX_LARGE_SHARD_BYTES;
}

bool sizeSplit = stats.bytes > shardBounds.max.bytes,
writeSplit = bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin;
bool shouldSplit = sizeSplit || writeSplit;
Expand Down