Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A wide variety of bug fixes and performance improvements related to multi region configurations #892

Merged
merged 35 commits into from Nov 10, 2018
Merged
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
18509ac
account for the overhead of tags on a message when batching transacti…
etschannen Nov 2, 2018
278dbd5
call debug transaction on timekeeper
etschannen Nov 2, 2018
30fbc29
Renamed TimeKeeperStarted to TimeKeeperCommit
etschannen Nov 2, 2018
1d591ac
removed the countHealthyTeams check, because it was incorrect if it t…
etschannen Nov 2, 2018
ad98acf
fix: if the team started unhealthy and initialFailureReactionDelay wa…
etschannen Nov 2, 2018
e68c07a
fix: trackShardBytes was called with the incorrect range, resulting i…
etschannen Nov 2, 2018
2d9a670
fix: nested multCursors would improperly hang on getMore, because an …
etschannen Nov 2, 2018
1b5d283
fix: the Tlog would not update the durable version properly when vers…
etschannen Nov 2, 2018
979597a
fix: upgraded tags must be popped from all log sets
etschannen Nov 2, 2018
3b97f5a
fix: the storage server still has to pop old tags, even if it does no…
etschannen Nov 2, 2018
bf6545a
clients cache storage server interfaces individually, instead of as a…
etschannen Nov 2, 2018
f045c04
fix: if a storage server already exists in a remote region after conv…
etschannen Nov 2, 2018
2a8c628
fix: even if a peek cursor cannot find a local set for the most recen…
etschannen Nov 2, 2018
45c8f2d
restarting tests will sometimes configure to a fearless configuration…
etschannen Nov 2, 2018
accba4f
keep track of the last time a process became available to set a bette…
etschannen Nov 4, 2018
3304c83
added additional checks in peek which determine when a tag will never…
etschannen Nov 5, 2018
86916ac
fix: configuring regions to an empty string results in error
etschannen Nov 5, 2018
c026904
added protection against configuration changes which cannot be immedi…
etschannen Nov 5, 2018
593fa09
added a fixme
etschannen Nov 5, 2018
bd60027
test region priority changes
etschannen Nov 5, 2018
c1bd279
addressed review comments
etschannen Nov 5, 2018
87d0b4c
fix: the remote region does not have a full replica is usable_regions==1
etschannen Nov 5, 2018
87295cc
suppressed spammy trace events, and avoid reporting a long master rec…
etschannen Nov 5, 2018
04fa2a7
fix: we could recover in a region with priority < 0
etschannen Nov 5, 2018
6bb283a
fix: dcId to Locality changes could be lost if an emergency transacti…
etschannen Nov 5, 2018
599cc62
fix: data distribution who not always add all subsets of emergency teams
etschannen Nov 8, 2018
fb9d05a
suppressed a spammy trace event
etschannen Nov 8, 2018
1cf5689
fix: workers could only create a shared transaction log for one store…
etschannen Nov 8, 2018
19ae063
fix: storage servers need to be rebooted when increasing replication …
etschannen Nov 8, 2018
6874e37
fix: set the simulator’s view of usable regions to one during configu…
etschannen Nov 9, 2018
3e2484b
fix: a team tracker could downgrade the priority of a relocation issu…
etschannen Nov 9, 2018
56c51c1
fix: usableRegions was uninitialized
etschannen Nov 9, 2018
7c23b68
fix: we need to build teams if a server becomes healthy and it is not…
etschannen Nov 10, 2018
828b8d8
updated release notes for 6.0.15
etschannen Nov 10, 2018
b8381b3
Merge branch 'release-6.0' of github.com:apple/foundationdb into rele…
etschannen Nov 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
141 changes: 63 additions & 78 deletions fdbserver/DataDistributionTracker.actor.cpp
Expand Up @@ -138,11 +138,8 @@ ACTOR Future<Void> trackShardBytes(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
UID trackerID,
bool addToSizeEstimate = true)
{
state Transaction tr(self->cx);

Void _ = wait( delay( 0, TaskDataDistribution ) );

/*TraceEvent("TrackShardBytesStarting")
Expand All @@ -154,65 +151,61 @@ ACTOR Future<Void> trackShardBytes(

try {
loop {
try {
state ShardSizeBounds bounds;
if( shardSize->get().present() ) {
auto bytes = shardSize->get().get().bytes;
auto bandwidthStatus = getBandwidthStatus( shardSize->get().get() );
bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES );
bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) );
bounds.permittedError.bytes = bytes * 0.1;
if( bandwidthStatus == BandwidthStatusNormal ) { // Not high or low
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusHigh ) { // > 10MB/sec for 100MB shard, proportionally lower for smaller shard, > 200KB/sec no matter what
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusLow ) { // < 10KB/sec
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4;
} else
ASSERT( false );

} else {
bounds.max.bytes = -1;
bounds.min.bytes = -1;
bounds.permittedError.bytes = -1;
ShardSizeBounds bounds;
if( shardSize->get().present() ) {
auto bytes = shardSize->get().get().bytes;
auto bandwidthStatus = getBandwidthStatus( shardSize->get().get() );
bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES );
bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) );
bounds.permittedError.bytes = bytes * 0.1;
if( bandwidthStatus == BandwidthStatusNormal ) { // Not high or low
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusHigh ) { // > 10MB/sec for 100MB shard, proportionally lower for smaller shard, > 200KB/sec no matter what
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC;
bounds.permittedError.bytesPerKSecond = bounds.min.bytesPerKSecond / 4;
} else if( bandwidthStatus == BandwidthStatusLow ) { // < 10KB/sec
bounds.max.bytesPerKSecond = SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
}

bounds.max.iosPerKSecond = bounds.max.infinity;
bounds.min.iosPerKSecond = 0;
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;

StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) );

/*TraceEvent("ShardSizeUpdate")
.detail("Keys", printable(keys))
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwithStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
.detail("ShardSizePresent", shardSize->get().present())
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0 )
.detail("TrackerID", trackerID);*/

if( shardSize->get().present() && addToSizeEstimate )
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardSize->get().get().bytes );

shardSize->set( metrics );
} catch( Error &e ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me why this can be removed. I see that there is one wait on a call to waitStorageMetrics. Is the idea that it can't throw any errors where we would retry them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, waitStorageMetrics is not transactional, and is not subject to our normal retry loop. The only reason it a function of a transaction is to use the client's cache of storage server locations. It would probably be more appropriate to make it a function of DatabaseContext.

//TraceEvent("ShardSizeUpdateError").error(e, true).detail("Begin", printable(keys.begin)).detail("End", printable(keys.end)).detail("TrackerID", trackerID);
Void _ = wait( tr.onError(e) );
bounds.permittedError.bytesPerKSecond = bounds.max.bytesPerKSecond / 4;
} else
ASSERT( false );

} else {
bounds.max.bytes = -1;
bounds.min.bytes = -1;
bounds.permittedError.bytes = -1;
bounds.max.bytesPerKSecond = bounds.max.infinity;
bounds.min.bytesPerKSecond = 0;
bounds.permittedError.bytesPerKSecond = bounds.permittedError.infinity;
}

bounds.max.iosPerKSecond = bounds.max.infinity;
bounds.min.iosPerKSecond = 0;
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;

Transaction tr(self->cx);
StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) );

/*TraceEvent("ShardSizeUpdate")
.detail("Keys", printable(keys))
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwithStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
.detail("ShardSizePresent", shardSize->get().present())
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0 )
.detail("TrackerID", trackerID);*/

if( shardSize->get().present() && addToSizeEstimate )
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardSize->get().get().bytes );

shardSize->set( metrics );
}
} catch( Error &e ) {
if (e.code() != error_code_actor_cancelled)
Expand Down Expand Up @@ -283,12 +276,12 @@ ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRangeRef keys,
}

struct HasBeenTrueFor : NonCopyable {
explicit HasBeenTrueFor( double seconds, bool value ) : enough( seconds ), trigger( value ? Void() : Future<Void>() ) {}
explicit HasBeenTrueFor( bool value ) : trigger( value ? Void() : Future<Void>() ) {}

Future<Void> set() {
if( !trigger.isValid() ) {
cleared = Promise<Void>();
trigger = delayJittered( enough, TaskDataDistribution - 1 ) || cleared.getFuture();
trigger = delayJittered( SERVER_KNOBS->DD_MERGE_COALESCE_DELAY, TaskDataDistribution - 1 ) || cleared.getFuture();
}
return trigger;
}
Expand All @@ -308,12 +301,10 @@ struct HasBeenTrueFor : NonCopyable {
private:
Future<Void> trigger;
Promise<Void> cleared;
const double enough;
};

ACTOR Future<Void> shardSplitter(
DataDistributionTracker* self,
UID trackerId,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
ShardSizeBounds shardBounds )
Expand All @@ -340,7 +331,6 @@ ACTOR Future<Void> shardSplitter(
TraceEvent("RelocateShardStartSplitx100", self->masterId)
.detail("Begin", printable(keys.begin))
.detail("End", printable(keys.end))
.detail("TrackerID", trackerId)
.detail("MaxBytes", shardBounds.max.bytes)
.detail("MetricsBytes", metrics.bytes)
.detail("Bandwidth", bandwidthStatus == BandwidthStatusHigh ? "High" : bandwidthStatus == BandwidthStatusNormal ? "Normal" : "Low")
Expand Down Expand Up @@ -378,7 +368,6 @@ ACTOR Future<Void> shardSplitter(

Future<Void> shardMerger(
DataDistributionTracker* self,
UID trackerId,
KeyRange const& keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize )
{
Expand Down Expand Up @@ -464,8 +453,7 @@ Future<Void> shardMerger(
.detail("OldKeys", printable(keys))
.detail("NewKeys", printable(mergeRange))
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged)
.detail("TrackerID", trackerId);
.detail("BatchedMerges", shardsMerged);

restartShardTrackers( self, mergeRange, endingStats );
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
Expand All @@ -479,8 +467,7 @@ ACTOR Future<Void> shardEvaluator(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
HasBeenTrueFor *wantsToMerge,
UID trackerID)
HasBeenTrueFor *wantsToMerge)
{
Future<Void> onChange = shardSize->onChange() || yieldedFuture(self->maxShardSize->onChange());

Expand Down Expand Up @@ -515,10 +502,10 @@ ACTOR Future<Void> shardEvaluator(
.detail("HasBeenTrueLongEnough", wantsToMerge->hasBeenTrueForLongEnough());*/

if(!self->anyZeroHealthyTeams->get() && wantsToMerge->hasBeenTrueForLongEnough()) {
onChange = onChange || shardMerger( self, trackerID, keys, shardSize );
onChange = onChange || shardMerger( self, keys, shardSize );
}
if( shouldSplit ) {
onChange = onChange || shardSplitter( self, trackerID, keys, shardSize, shardBounds );
onChange = onChange || shardSplitter( self, keys, shardSize, shardBounds );
}

Void _ = wait( onChange );
Expand All @@ -528,11 +515,10 @@ ACTOR Future<Void> shardEvaluator(
ACTOR Future<Void> shardTracker(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
UID trackerID )
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize)
{
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
state HasBeenTrueFor wantsToMerge( SERVER_KNOBS->DD_MERGE_COALESCE_DELAY, shardSize->get().present() );
state HasBeenTrueFor wantsToMerge( shardSize->get().present() );

Void _ = wait( yieldedFuture(self->readyToStart.getFuture()) );

Expand All @@ -556,7 +542,7 @@ ACTOR Future<Void> shardTracker(
try {
loop {
// Use the current known size to check for (and start) splits and merges.
Void _ = wait( shardEvaluator( self, keys, shardSize, &wantsToMerge, trackerID ) );
Void _ = wait( shardEvaluator( self, keys, shardSize, &wantsToMerge ) );

// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
// delay(0) mitigates the resulting SlowTask
Expand Down Expand Up @@ -593,11 +579,10 @@ void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Opti
shardSize->set( startingSize );
}

UID trackerID = g_random->randomUniqueID();
ShardTrackedData data;
data.stats = shardSize;
data.trackShard = shardTracker( self, ranges[i], shardSize, trackerID );
data.trackBytes = trackShardBytes( self, keys, shardSize, trackerID );
data.trackShard = shardTracker( self, ranges[i], shardSize );
data.trackBytes = trackShardBytes( self, ranges[i], shardSize );
self->shards.insert( ranges[i], data );
}
}
Expand Down