Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed problems found when migrating a cluster to usable_regions=2 #687

Merged
merged 4 commits into from Aug 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 26 additions & 2 deletions documentation/StatusSchema.json
Expand Up @@ -420,11 +420,35 @@
"total_disk_used_bytes":0,
"total_kv_size_bytes":0,
"partitions_count":2,
"moving_data":{
"moving_data":{
"total_written_bytes":0,
"in_flight_bytes":0,
"in_queue_bytes":0
"in_queue_bytes":0,
"highest_priority":0
},
"team_trackers":[
{
"primary":true,
"in_flight_bytes":0,
"unhealthy_servers":0,
"state":{
"healthy":true,
"min_replicas_remaining":0,
"name":{
"$enum":[
"initializing",
"missing_data",
"healing",
"healthy_repartitioning",
"healthy_removing_server",
"healthy_rebalancing",
"healthy"
]
},
"description":""
}
}
],
"least_operating_space_bytes_storage_server":0,
"max_machine_failures_without_losing_data":0
},
Expand Down
61 changes: 40 additions & 21 deletions fdbserver/DataDistribution.actor.cpp
Expand Up @@ -520,6 +520,7 @@ struct DDTeamCollection {
vector<UID> allServers;
ServerStatusMap server_status;
int64_t unhealthyServers;
std::map<int,int> priority_teams;
std::map<UID, Reference<TCServerInfo>> server_info;
vector<Reference<TCTeamInfo>> teams;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;
Expand Down Expand Up @@ -1277,6 +1278,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut

Void _ = wait( yield() );
TraceEvent("TeamTrackerStarting", self->masterId).detail("Reason", "Initial wait complete (sc)").detail("Team", team->getDesc());
self->priority_teams[team->getPriority()]++;

try {
loop {
Expand Down Expand Up @@ -1371,6 +1373,12 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
team->setPriority( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER );
else
team->setPriority( PRIORITY_TEAM_HEALTHY );

if(lastPriority != team->getPriority()) {
self->priority_teams[lastPriority]--;
self->priority_teams[team->getPriority()]++;
}

TraceEvent("TeamPriorityChange", self->masterId).detail("Priority", team->getPriority());

lastZeroHealthy = self->zeroHealthyTeams->get(); //set this again in case it changed from this teams health changing
Expand All @@ -1379,23 +1387,25 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut

for(int i=0; i<shards.size(); i++) {
int maxPriority = team->getPriority();
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
for( int t=0; t<teams.size(); t++) {
if( teams[t].servers.size() && self->server_info.count( teams[t].servers[0] ) ) {
auto& info = self->server_info[teams[t].servers[0]];

bool found = false;
for( int i = 0; i < info->teams.size(); i++ ) {
if( info->teams[i]->serverIDs == teams[t].servers ) {
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
found = true;
break;
if(maxPriority < PRIORITY_TEAM_0_LEFT) {
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
for( int t=0; t<teams.size(); t++) {
if( teams[t].servers.size() && self->server_info.count( teams[t].servers[0] ) ) {
auto& info = self->server_info[teams[t].servers[0]];

bool found = false;
for( int i = 0; i < info->teams.size(); i++ ) {
if( info->teams[i]->serverIDs == teams[t].servers ) {
maxPriority = std::max( maxPriority, info->teams[i]->getPriority() );
found = true;
break;
}
}
}

TEST(!found); // A removed team is still associated with a shard in SABTF
} else {
TEST(teams[t].servers.size()); // A removed server is still associated with a team in SABTF
TEST(!found); // A removed team is still associated with a shard in SABTF
} else {
TEST(teams[t].servers.size()); // A removed server is still associated with a team in SABTF
}
}
}

Expand Down Expand Up @@ -1429,6 +1439,7 @@ ACTOR Future<Void> teamTracker( DDTeamCollection *self, Reference<IDataDistribut
Void _ = wait( yield() );
}
} catch(Error& e) {
self->priority_teams[team->getPriority()]--;
if( team->isHealthy() ) {
self->healthyTeamCount--;
ASSERT( self->healthyTeamCount >= 0 );
Expand Down Expand Up @@ -1995,8 +2006,14 @@ ACTOR Future<Void> dataDistributionTeamCollection(
}
}
when( Void _ = wait( loggingTrigger ) ) {
TraceEvent("TotalDataInFlight", masterId).detail("TotalBytes", self.getDebugTotalDataInFlight()).detail("UnhealthyServers", self.unhealthyServers).trackLatest(
(cx->dbName.toString() + "/TotalDataInFlight").c_str());
int highestPriority = 0;
for(auto it : self.priority_teams) {
if(it.second > 0) {
highestPriority = std::max(highestPriority, it.first);
}
}
TraceEvent("TotalDataInFlight", masterId).detail("Primary", self.primary).detail("TotalBytes", self.getDebugTotalDataInFlight()).detail("UnhealthyServers", self.unhealthyServers)
.detail("HighestPriority", highestPriority).trackLatest( self.primary ? "TotalDataInFlight" : "TotalDataInFlightRemote" );
loggingTrigger = delay( SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL );
self.countHealthyTeams();
}
Expand Down Expand Up @@ -2182,8 +2199,8 @@ ACTOR Future<Void> dataDistribution(
.detail( "HighestPriority", 0 )
.trackLatest( format("%s/MovingData", printable(cx->dbName).c_str() ).c_str() );

TraceEvent("TotalDataInFlight", mi.id()).detail("TotalBytes", 0)
.trackLatest((cx->dbName.toString() + "/TotalDataInFlight").c_str());
TraceEvent("TotalDataInFlight", mi.id()).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight");
TraceEvent("TotalDataInFlight", mi.id()).detail("Primary", false).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1).trackLatest("TotalDataInFlightRemote");

Void _ = wait( waitForDataDistributionEnabled(cx) );
TraceEvent("DataDistributionEnabled");
Expand All @@ -2193,6 +2210,7 @@ ACTOR Future<Void> dataDistribution(
ASSERT(configuration.storageTeamSize > 0);

state PromiseStream<RelocateShard> output;
state PromiseStream<RelocateShard> input;
state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<GetMetricsRequest> getShardMetrics;
state Reference<AsyncVar<bool>> processingUnhealthy( new AsyncVar<bool>(false) );
Expand All @@ -2218,6 +2236,7 @@ ACTOR Future<Void> dataDistribution(
}

Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
actors.push_back(yieldPromiseStream(output.getFuture(), input));

for(int s=0; s<initData->shards.size() - 1; s++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key);
Expand All @@ -2236,8 +2255,8 @@ ACTOR Future<Void> dataDistribution(
}

actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
Expand Down
6 changes: 4 additions & 2 deletions fdbserver/DataDistribution.h
Expand Up @@ -210,6 +210,7 @@ Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData,
Database const& cx,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> const& shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart,
Expand All @@ -218,7 +219,8 @@ Future<Void> dataDistributionTracker(

Future<Void> dataDistributionQueue(
Database const& cx,
PromiseStream<RelocateShard> const& input,
PromiseStream<RelocateShard> const& output,
FutureStream<RelocateShard> const& input,
PromiseStream<GetMetricsRequest> const& getShardMetrics,
Reference<AsyncVar<bool>> const& processingUnhealthy,
vector<TeamCollectionInterface> const& teamCollection,
Expand All @@ -245,4 +247,4 @@ struct ShardSizeBounds {
ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize);

//Determines the maximum shard size based on the size of the database
int64_t getMaxShardSize( double dbSizeEstimate );
int64_t getMaxShardSize( double dbSizeEstimate );
20 changes: 9 additions & 11 deletions fdbserver/DataDistributionQueue.actor.cpp
Expand Up @@ -362,7 +362,8 @@ struct DDQueueData {
PromiseStream<RelocateData> relocationComplete;
PromiseStream<RelocateData> fetchSourceServersComplete;

PromiseStream<RelocateShard> input;
PromiseStream<RelocateShard> output;
FutureStream<RelocateShard> input;
PromiseStream<GetMetricsRequest> getShardMetrics;

double* lastLimited;
Expand Down Expand Up @@ -393,10 +394,10 @@ struct DDQueueData {

DDQueueData( MasterInterface mi, MoveKeysLock lock, Database cx, std::vector<TeamCollectionInterface> teamCollections,
Reference<ShardsAffectedByTeamFailure> sABTF, PromiseStream<Promise<int64_t>> getAverageShardBytes,
int teamSize, PromiseStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited, Version recoveryVersion ) :
int teamSize, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input, PromiseStream<GetMetricsRequest> getShardMetrics, double* lastLimited, Version recoveryVersion ) :
activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ),
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), mi( mi ), lock( lock ),
cx( cx ), teamSize( teamSize ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
cx( cx ), teamSize( teamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited), recoveryVersion(recoveryVersion),
suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {}

Expand Down Expand Up @@ -569,10 +570,6 @@ struct DDQueueData {

//This function cannot handle relocation requests which split a shard into three pieces
void queueRelocation( RelocateData rd, std::set<UID> &serversToLaunchFrom ) {
// Update sabtf for changes from DDTracker
if( rd.changesBoundaries() )
shardsAffectedByTeamFailure->defineShard( rd.keys );

//TraceEvent("QueueRelocationBegin").detail("Begin", printable(rd.keys.begin)).detail("End", printable(rd.keys.end));

// remove all items from both queues that are fully contained in the new relocation (i.e. will be overwritten)
Expand Down Expand Up @@ -1086,7 +1083,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
.detail("SourceTeam", sourceTeam->getDesc())
.detail("DestTeam", destTeam->getDesc());

self->input.send( RelocateShard( moveShard, priority ) );
self->output.send( RelocateShard( moveShard, priority ) );
return true;
}
}
Expand Down Expand Up @@ -1166,7 +1163,8 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)

ACTOR Future<Void> dataDistributionQueue(
Database cx,
PromiseStream<RelocateShard> input,
PromiseStream<RelocateShard> output,
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics,
Reference<AsyncVar<bool>> processingUnhealthy,
std::vector<TeamCollectionInterface> teamCollections,
Expand All @@ -1178,7 +1176,7 @@ ACTOR Future<Void> dataDistributionQueue(
double* lastLimited,
Version recoveryVersion)
{
state DDQueueData self( mi, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, input, getShardMetrics, lastLimited, recoveryVersion );
state DDQueueData self( mi, lock, cx, teamCollections, shardsAffectedByTeamFailure, getAverageShardBytes, teamSize, output, input, getShardMetrics, lastLimited, recoveryVersion );
state std::set<UID> serversToLaunchFrom;
state KeyRange keysToLaunchFrom;
state RelocateData launchData;
Expand Down Expand Up @@ -1213,7 +1211,7 @@ ACTOR Future<Void> dataDistributionQueue(
ASSERT( launchData.startTime == -1 && keysToLaunchFrom.empty() );

choose {
when ( RelocateShard rs = waitNext( self.input.getFuture() ) ) {
when ( RelocateShard rs = waitNext( self.input ) ) {
bool wasEmpty = serversToLaunchFrom.empty();
self.queueRelocation( rs, serversToLaunchFrom );
if(wasEmpty && !serversToLaunchFrom.empty())
Expand Down
23 changes: 16 additions & 7 deletions fdbserver/DataDistributionTracker.actor.cpp
Expand Up @@ -74,14 +74,15 @@ struct DataDistributionTracker {

// CapacityTracker
PromiseStream<RelocateShard> output;
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure;

Promise<Void> readyToStart;
Reference<AsyncVar<bool>> anyZeroHealthyTeams;

DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
DataDistributionTracker(Database cx, UID masterId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), masterId( masterId ), dbSizeEstimate( new AsyncVar<int64_t>() ),
maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}

~DataDistributionTracker()
{
Expand Down Expand Up @@ -357,10 +358,16 @@ ACTOR Future<Void> shardSplitter(
for( int i = numShards-1; i > skipRange; i-- )
restartShardTrackers( self, KeyRangeRef(splitKeys[i], splitKeys[i+1]) );

for( int i = 0; i < skipRange; i++ )
self->output.send( RelocateShard( KeyRangeRef(splitKeys[i], splitKeys[i+1]), PRIORITY_SPLIT_SHARD) );
for( int i = numShards-1; i > skipRange; i-- )
self->output.send( RelocateShard( KeyRangeRef(splitKeys[i], splitKeys[i+1]), PRIORITY_SPLIT_SHARD) );
for( int i = 0; i < skipRange; i++ ) {
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
self->shardsAffectedByTeamFailure->defineShard( r );
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
}
for( int i = numShards-1; i > skipRange; i-- ) {
KeyRangeRef r(splitKeys[i], splitKeys[i+1]);
self->shardsAffectedByTeamFailure->defineShard( r );
self->output.send( RelocateShard( r, PRIORITY_SPLIT_SHARD) );
}

self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) );
} else {
Expand Down Expand Up @@ -461,6 +468,7 @@ Future<Void> shardMerger(
.detail("TrackerID", trackerId);

restartShardTrackers( self, mergeRange, endingStats );
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
self->output.send( RelocateShard( mergeRange, PRIORITY_MERGE_SHARD ) );

// We are about to be cancelled by the call to restartShardTrackers
Expand Down Expand Up @@ -661,13 +669,14 @@ ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData,
Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID masterId)
{
state DataDistributionTracker self(cx, masterId, readyToStart, output, anyZeroHealthyTeams);
state DataDistributionTracker self(cx, masterId, readyToStart, output, shardsAffectedByTeamFailure, anyZeroHealthyTeams);
state Future<Void> loggingTrigger = Void();
try {
Void _ = wait( trackInitialShards( &self, initData ) );
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/QuietDatabase.actor.cpp
Expand Up @@ -69,7 +69,7 @@ ACTOR Future<int64_t> getDataInFlight( Database cx, WorkerInterface masterWorker
try {
TraceEvent("DataInFlight").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( cx->dbName.toString() + "/TotalDataInFlight" ) ) ), 1.0 ) );
EventLogRequest( LiteralStringRef("TotalDataInFlight") ) ), 1.0 ) );
int64_t dataInFlight;
sscanf(md.getValue("TotalBytes").c_str(), "%lld", &dataInFlight);
return dataInFlight;
Expand Down