Skip to content

Commit

Permalink
Merge pull request #705 from etschannen/release-6.0
Browse files Browse the repository at this point in the history
Added a yield to data distribution and fixed a problem with leader election
  • Loading branch information
AlvinMooreSr committed Aug 14, 2018
2 parents fad0730 + 883050d commit 1431d1b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 34 deletions.
4 changes: 3 additions & 1 deletion documentation/sphinx/source/release-notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Release Notes
#############

6.0.5
6.0.6
=====

Features
Expand Down Expand Up @@ -44,6 +44,8 @@ Fixes
* On clusters configured with usable_regions=2, status reported no replicas remaining when the primary DC was still healthy. [6.0.5] `(PR #687) <https://github.com/apple/foundationdb/pull/687>`_
* Clients could crash when passing in TLS options. [6.0.5] `(PR #649) <https://github.com/apple/foundationdb/pull/649>`_
* A mismatched TLS certificate and key set could cause the server to crash. [6.0.5] `(PR #689) <https://github.com/apple/foundationdb/pull/689>`_
* Databases with more than 10TB of data would pause for a few seconds after recovery. [6.0.6] `(PR #705) <https://github.com/apple/foundationdb/pull/705>`_
* Sometimes a minority of coordinators would fail to converge after a new leader was elected. [6.0.6] `(PR #700) <https://github.com/apple/foundationdb/pull/700>`_

Status
------
Expand Down
16 changes: 4 additions & 12 deletions fdbclient/CoordinationInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,14 @@ struct LeaderInfo {

// All but the first 7 bits are used to represent process id
bool equalInternalId(LeaderInfo const& leaderInfo) const {
if ( (changeID.first() & mask) == (leaderInfo.changeID.first() & mask) && changeID.second() == leaderInfo.changeID.second() ) {
return true;
} else {
return false;
}
return ((changeID.first() & mask) == (leaderInfo.changeID.first() & mask)) && changeID.second() == leaderInfo.changeID.second();
}

// Change leader only if
// Change leader only if
// 1. the candidate has better process class fitness and the candidate is not the leader
// 2. the leader process class fitness become worse
// 2. the leader process class fitness becomes worse
bool leaderChangeRequired(LeaderInfo const& candidate) const {
if ( ((changeID.first() & ~mask) > (candidate.changeID.first() & ~mask) && !equalInternalId(candidate)) || ((changeID.first() & ~mask) < (candidate.changeID.first() & ~mask) && equalInternalId(candidate)) ) {
return true;
} else {
return false;
}
return ((changeID.first() & ~mask) > (candidate.changeID.first() & ~mask) && !equalInternalId(candidate)) || ((changeID.first() & ~mask) < (candidate.changeID.first() & ~mask) && equalInternalId(candidate));
}

template <class Ar>
Expand Down
4 changes: 2 additions & 2 deletions fdbserver/Coordination.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,14 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
}
}

if ( currentNominee.present() != nextNominee.present() || (nextNominee.present() && !foundCurrentNominee) || (currentNominee.present() && currentNominee.get().leaderChangeRequired(nextNominee.get())) ) {
if ( !nextNominee.present() || !foundCurrentNominee || currentNominee.get().leaderChangeRequired(nextNominee.get()) ) {
TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
.detail("Changed", nextNominee != currentNominee).detail("Key", printable(key));
for(int i=0; i<notify.size(); i++)
notify[i].send( nextNominee );
notify.clear();
currentNominee = nextNominee;
} else if (currentNominee.present() && nextNominee.present() && currentNominee.get().equalInternalId(nextNominee.get())) {
} else if (currentNominee.get().equalInternalId(nextNominee.get())) {
// leader becomes better
currentNominee = nextNominee;
}
Expand Down
39 changes: 20 additions & 19 deletions fdbserver/DataDistribution.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2215,6 +2215,25 @@ ACTOR Future<Void> dataDistribution(
state PromiseStream<GetMetricsRequest> getShardMetrics;
state Reference<AsyncVar<bool>> processingUnhealthy( new AsyncVar<bool>(false) );
state Promise<Void> readyToStart;
state Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );

state int shard = 0;
for(; shard<initData->shards.size() - 1; shard++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[shard].key, initData->shards[shard+1].key);
shardsAffectedByTeamFailure->defineShard(keys);
std::vector<ShardsAffectedByTeamFailure::Team> teams;
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].primarySrc, true));
if(configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[shard].remoteSrc, false));
}
shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[shard].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) );
}
Void _ = wait( yield(TaskDataDistribution) );
}

vector<TeamCollectionInterface> tcis;
Reference<AsyncVar<bool>> anyZeroHealthyTeams;
Expand All @@ -2235,32 +2254,14 @@ ACTOR Future<Void> dataDistribution(
anyZeroHealthyTeams = zeroHealthyTeams[0];
}

Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure( new ShardsAffectedByTeamFailure );
actors.push_back(yieldPromiseStream(output.getFuture(), input));

for(int s=0; s<initData->shards.size() - 1; s++) {
KeyRangeRef keys = KeyRangeRef(initData->shards[s].key, initData->shards[s+1].key);
shardsAffectedByTeamFailure->defineShard(keys);
std::vector<ShardsAffectedByTeamFailure::Team> teams;
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].primarySrc, true));
if(configuration.usableRegions > 1) {
teams.push_back(ShardsAffectedByTeamFailure::Team(initData->shards[s].remoteSrc, false));
}
shardsAffectedByTeamFailure->moveShard(keys, teams);
if(initData->shards[s].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
output.send( RelocateShard( keys, PRIORITY_RECOVER_MOVE ) );
}
}

actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, mi.id() ), "DDTracker", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
if (configuration.usableRegions > 1) {
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
}
actors.push_back(yieldPromiseStream(output.getFuture(), input));

Void _ = wait( waitForAll( actors ) );
return Void();
Expand Down

0 comments on commit 1431d1b

Please sign in to comment.