Skip to content

Commit

Permalink
Merge pull request apple#157 from cie/choose-leader-on-stateless-proc…
Browse files Browse the repository at this point in the history
…esses

Catch and update processClass change from DBSource
  • Loading branch information
etschannen authored and GitHub Enterprise committed Oct 13, 2017
1 parent c6753f3 commit c2f3c4e
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 46 deletions.
35 changes: 35 additions & 0 deletions fdbclient/CoordinationInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"

const int MAX_CLUSTER_FILE_BYTES = 60000;

Expand Down Expand Up @@ -96,10 +97,44 @@ struct LeaderInfo {
bool forward; // If true, serializedInfo is a connection string instead!

LeaderInfo() : forward(false) {}
LeaderInfo(UID changeID) : changeID(changeID), forward(false) {}

bool operator < (LeaderInfo const& r) const { return changeID < r.changeID; }
bool operator == (LeaderInfo const& r) const { return changeID == r.changeID; }

// The first 4 bits of ChangeID represent cluster controller process class fitness, the lower the better
bool updateChangeID(uint64_t processClassFitness) {
uint64_t mask = 15ll << 60;
processClassFitness <<= 60;

if ((changeID.first() & mask) == processClassFitness) {
return false;
}

changeID = UID((changeID.first() & ~mask) | processClassFitness, changeID.second());
return true;
}

// Change leader only if the candidate has better process class fitness
bool leaderChangeRequired(LeaderInfo const& candidate) const {
uint64_t mask = 15ll << 60;
if ((changeID.first() & mask) > (candidate.changeID.first() & mask)) {
return true;
} else {
return false;
}
}

// All but the first 4 bits are used to represent process id
bool equalInternalId(LeaderInfo const& leaderInfo) const {
uint64_t mask = ~(15ll << 60);
if ((changeID.first() & mask) == (leaderInfo.changeID.first() & mask)) {
return true;
} else {
return false;
}
}

template <class Ar>
void serialize(Ar& ar) {
ar & changeID & serializedInfo & forward;
Expand Down
55 changes: 38 additions & 17 deletions fdbserver/ClusterController.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ void failAfter( Future<Void> trigger, Endpoint e );

struct WorkerInfo : NonCopyable {
Future<Void> watcher;
ReplyPromise<Void> reply;
ReplyPromise<ProcessClass> reply;
Generation gen;
int reboots;
WorkerInterface interf;
ProcessClass initialClass;
ProcessClass processClass;

WorkerInfo() : gen(-1), reboots(0) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<Void> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass ) :
WorkerInfo( Future<Void> watcher, ReplyPromise<ProcessClass> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), interf(interf), initialClass(initialClass), processClass(processClass) {}

WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
Expand Down Expand Up @@ -1055,9 +1055,11 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
}
}
when( Void _ = wait( failed ) ) { // remove workers that have failed
cluster->id_worker[ worker.locality.processId() ].reply.send( Void() );
WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ];
if (!failedWorkerInfo.reply.isSet()) {
failedWorkerInfo.reply.send( failedWorkerInfo.processClass );
}
cluster->id_worker.erase( worker.locality.processId() );

cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
return Void();
}
Expand Down Expand Up @@ -1318,38 +1320,49 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c

void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
WorkerInterface w = req.wi;
ProcessClass processClass = req.processClass;
ProcessClass newProcessClass = req.processClass;
auto info = self->id_worker.find( w.locality.processId() );

TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace();

if( info == self->id_worker.end() ) {
// Check process class if needed
if (self->gotProcessClasses && (info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen)) {
auto classIter = self->id_class.find(w.locality.processId());

if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.processClass.classType() == ProcessClass::UnsetClass) ) {
processClass = classIter->second;
if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || req.initialClass.classType() == ProcessClass::UnsetClass)) {
newProcessClass = classIter->second;
} else {
newProcessClass = req.initialClass;
}

self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, req.processClass, self ), req.reply, req.generation, w, req.processClass, processClass );
// Notify the worker to register again with new process class
if (newProcessClass != req.processClass && !req.reply.isSet()) {
req.reply.send( newProcessClass );
}
}

if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass );
checkOutstandingRequests( self );

return;
}

if( info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if( info->second.processClass.classSource() == ProcessClass::CommandLineSource ||
(info->second.processClass.classSource() == ProcessClass::AutoSource && req.processClass.classType() != ProcessClass::UnsetClass) ) {
info->second.processClass = req.processClass;
if (info->second.processClass != newProcessClass) {
info->second.processClass = newProcessClass;
}

info->second.initialClass = req.processClass;
info->second.reply.send( Never() );
info->second.initialClass = req.initialClass;
if (!info->second.reply.isSet()) {
info->second.reply.send( Never() );
}
info->second.reply = req.reply;
info->second.gen = req.generation;

if(info->second.interf.id() != w.id()) {
info->second.interf = w;
info->second.watcher = workerAvailabilityWatch( w, req.processClass, self );
info->second.watcher = workerAvailabilityWatch( w, newProcessClass, self );
}
return;
}
Expand Down Expand Up @@ -1546,11 +1559,19 @@ ACTOR Future<Void> monitorProcessClasses(ClusterControllerData *self) {

for( auto& w : self->id_worker ) {
auto classIter = self->id_class.find(w.first);
ProcessClass newProcessClass;

if( classIter != self->id_class.end() && (classIter->second.classSource() == ProcessClass::DBSource || w.second.initialClass.classType() == ProcessClass::UnsetClass) ) {
w.second.processClass = classIter->second;
newProcessClass = classIter->second;
} else {
w.second.processClass = w.second.initialClass;
newProcessClass = w.second.initialClass;
}

if (newProcessClass != w.second.processClass) {
w.second.processClass = newProcessClass;
if (!w.second.reply.isSet()) {
w.second.reply.send( newProcessClass );
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions fdbserver/ClusterRecruitmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,17 @@ struct RecruitStorageRequest {
struct RegisterWorkerRequest {
WorkerInterface wi;
ProcessClass processClass;
ProcessClass initialClass;
Generation generation;
ReplyPromise<Void> reply;
ReplyPromise<ProcessClass> reply;

RegisterWorkerRequest() {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass processClass, Generation generation) :
wi(wi), processClass(processClass), generation(generation) {}
RegisterWorkerRequest(WorkerInterface wi, ProcessClass initialClass, ProcessClass processClass, Generation generation) :
wi(wi), initialClass(initialClass), processClass(processClass), generation(generation) {}

template <class Ar>
void serialize( Ar& ar ) {
ar & wi & processClass & generation & reply;
ar & wi & initialClass & processClass & generation & reply;
}
};

Expand Down
19 changes: 14 additions & 5 deletions fdbserver/Coordination.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
}
when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
//TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID );
availableCandidates.erase( LeaderInfo(req.prevChangeID) );
availableCandidates.insert( req.myInfo );
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader)
req.reply.send( currentNominee.get() );
Expand All @@ -226,8 +227,9 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
}
when (LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
//TODO: use notify to only send a heartbeat once per interval
availableLeaders.erase( LeaderInfo(req.prevChangeID) );
availableLeaders.insert( req.myInfo );
req.reply.send( currentNominee.present() && req.myInfo == currentNominee.get() );
req.reply.send( currentNominee.present() && currentNominee.get().equalInternalId(req.myInfo) );
}
when (ForwardRequest req = waitNext( interf.forward.getFuture() ) ) {
LeaderInfo newInfo;
Expand All @@ -246,11 +248,18 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
TraceEvent("EndingLeaderNomination").detail("Key", printable(key));
return Void();
} else {
Optional<LeaderInfo> nextNominee =
availableLeaders.size() ? *availableLeaders.begin() :
availableCandidates.size() ? *availableCandidates.begin() : Optional<LeaderInfo>();
Optional<LeaderInfo> nextNominee;
if (availableLeaders.size() && availableCandidates.size()) {
nextNominee = (*availableLeaders.begin()).leaderChangeRequired(*availableCandidates.begin()) ? *availableCandidates.begin() : *availableLeaders.begin();
} else if (availableLeaders.size()) {
nextNominee = *availableLeaders.begin();
} else if (availableCandidates.size()) {
nextNominee = *availableCandidates.begin();
} else {
nextNominee = Optional<LeaderInfo>();
}

if (nextNominee != currentNominee || !availableLeaders.size()) {
if ( currentNominee.present() != nextNominee.present() || (currentNominee.present() && !currentNominee.get().equalInternalId(nextNominee.get())) || !availableLeaders.size() ) {
TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
.detail("Changed", nextNominee != currentNominee).detail("Key", printable(key));
for(int i=0; i<notify.size(); i++)
Expand Down
11 changes: 6 additions & 5 deletions fdbserver/CoordinationInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,30 @@ struct LeaderElectionRegInterface : ClientLeaderRegInterface {
struct CandidacyRequest {
Key key;
LeaderInfo myInfo;
UID knownLeader;
UID knownLeader, prevChangeID;
ReplyPromise<Optional<LeaderInfo>> reply;

CandidacyRequest() {}
CandidacyRequest(Key key, LeaderInfo const& myInfo, UID const& knownLeader) : key(key), myInfo(myInfo), knownLeader(knownLeader) {}
CandidacyRequest(Key key, LeaderInfo const& myInfo, UID const& knownLeader, UID const& prevChangeID) : key(key), myInfo(myInfo), knownLeader(knownLeader), prevChangeID(prevChangeID) {}

template <class Ar>
void serialize(Ar& ar) {
ar & key & myInfo & knownLeader & reply;
ar & key & myInfo & knownLeader & prevChangeID & reply;
}
};

struct LeaderHeartbeatRequest {
Key key;
LeaderInfo myInfo;
UID prevChangeID;
ReplyPromise<bool> reply;

LeaderHeartbeatRequest() {}
explicit LeaderHeartbeatRequest( Key key, LeaderInfo const& myInfo ) : key(key), myInfo(myInfo) {}
explicit LeaderHeartbeatRequest( Key key, LeaderInfo const& myInfo, UID prevChangeID ) : key(key), myInfo(myInfo), prevChangeID(prevChangeID) {}

template <class Ar>
void serialize(Ar& ar) {
ar & key & myInfo & reply;
ar & key & myInfo & prevChangeID & reply;
}
};

Expand Down
24 changes: 16 additions & 8 deletions fdbserver/LeaderElection.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@

extern Optional<LeaderInfo> getLeader( vector<Optional<LeaderInfo>> nominees );

ACTOR Future<Void> submitCandidacy( Key key, LeaderElectionRegInterface coord, LeaderInfo myInfo, Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees, int index ) {
ACTOR Future<Void> submitCandidacy( Key key, LeaderElectionRegInterface coord, LeaderInfo myInfo, UID prevChangeID, Reference<AsyncVar<vector<Optional<LeaderInfo>>>> nominees, int index ) {
loop {
auto const& nom = nominees->get()[index];
Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.candidacy, CandidacyRequest( key, myInfo, nom.present() ? nom.get().changeID : UID() ), TaskCoordinationReply ) );
Optional<LeaderInfo> li = wait( retryBrokenPromise( coord.candidacy, CandidacyRequest( key, myInfo, nom.present() ? nom.get().changeID : UID(), prevChangeID ), TaskCoordinationReply ) );

if (li != nominees->get()[index]) {
vector<Optional<LeaderInfo>> v = nominees->get();
Expand Down Expand Up @@ -80,6 +80,7 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
state LeaderInfo myInfo;
state Future<Void> candidacies;
state bool iAmLeader = false;
state UID prevChangeID;

nominees->set( vector<Optional<LeaderInfo>>( coordinators.clientLeaderServers.size() ) );

Expand All @@ -91,14 +92,13 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
while (!iAmLeader) {
state Future<Void> badCandidateTimeout;

UID randomID = g_random->randomUniqueID();
int64_t mask = 15ll << 60;
int64_t modifiedFirstPart = (randomID.first() & ~mask) | ((int64_t)asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController) << 60);
myInfo.changeID = UID(modifiedFirstPart, randomID.second());
myInfo.changeID = g_random->randomUniqueID();
prevChangeID = myInfo.changeID;
myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController));

vector<Future<Void>> cand;
for(int i=0; i<coordinators.leaderElectionServers.size(); i++)
cand.push_back( submitCandidacy( coordinators.clusterKey, coordinators.leaderElectionServers[i], myInfo, nominees, i ) );
cand.push_back( submitCandidacy( coordinators.clusterKey, coordinators.leaderElectionServers[i], myInfo, prevChangeID, nominees, i ) );
candidacies = waitForAll(cand);

loop {
Expand Down Expand Up @@ -153,6 +153,9 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
break;
}
when (Void _ = wait(candidacies)) { ASSERT(false); }
when (Void _ = wait( asyncProcessClass->onChange() )) {
break;
}
}
}

Expand All @@ -162,10 +165,15 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
ASSERT( iAmLeader && outSerializedLeader->get() == proposedSerializedInterface );

loop {
prevChangeID = myInfo.changeID;
if (myInfo.updateChangeID(asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController))) {
TraceEvent("ChangeLeaderChangeID").detail("PrevChangeID", prevChangeID).detail("NewChangeID", myInfo.changeID);
}

state vector<Future<Void>> true_heartbeats;
state vector<Future<Void>> false_heartbeats;
for(int i=0; i<coordinators.leaderElectionServers.size(); i++) {
Future<bool> hb = retryBrokenPromise( coordinators.leaderElectionServers[i].leaderHeartbeat, LeaderHeartbeatRequest( coordinators.clusterKey, myInfo ), TaskCoordinationReply );
Future<bool> hb = retryBrokenPromise( coordinators.leaderElectionServers[i].leaderHeartbeat, LeaderHeartbeatRequest( coordinators.clusterKey, myInfo, prevChangeID ), TaskCoordinationReply );
true_heartbeats.push_back( onEqual(hb, true) );
false_heartbeats.push_back( onEqual(hb, false) );
}
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/WorkerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, int
Future<Void> extractClusterInterface( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& a, Reference<AsyncVar<Optional<struct ClusterInterface>>> const& b );

Future<Void> fdbd( Reference<ClusterConnectionFile> const&, LocalityData const& localities, ProcessClass const& processClass, std::string const& dataFolder, std::string const& coordFolder, int64_t const& memoryLimit, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, ProcessClass const& processClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> workerServer( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> const& ccInterface, LocalityData const& localities, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass, ProcessClass const& initialClass, std::string const& filename, int64_t const& memoryLimit, Future<Void> const& forceFailure, std::string const& metricsConnFile, std::string const& metricsPrefix );
Future<Void> clusterController( Reference<ClusterConnectionFile> const&, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> const& currentCC, Reference<AsyncVar<ProcessClass>> const& asyncProcessClass );

// These servers are started by workerServer
Expand Down
Loading

0 comments on commit c2f3c4e

Please sign in to comment.