Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: do not allow a storage server to be removed within 5 million ver…
…sions of it being added, because if a storage server is added and removed within the known committed version and recovery version, they storage server will need see either the add or remove when it peeks
  • Loading branch information
etschannen committed May 6, 2018
1 parent 8371afb commit b1935f1
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 24 deletions.
41 changes: 25 additions & 16 deletions fdbserver/DataDistribution.actor.cpp
Expand Up @@ -268,14 +268,20 @@ struct ServerStatus {
};
typedef AsyncMap<UID, ServerStatus> ServerStatusMap;

ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID ) {
ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version addedVersion ) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) );
if (canRemove)
return Void();
Version ver = wait( tr.getReadVersion() );

//we cannot remove a server immediately after adding it, because

This comment has been minimized.

Copy link
@grandinj

grandinj Jun 1, 2018

seems like a line of commentary was accidentally omitted here?

if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) );
if (canRemove) {
return Void();
}
}

// Wait for any change to the serverKeys for this server
Void _ = wait( delay(SERVER_KNOBS->ALL_DATA_REMOVED_DELAY, TaskDataDistribution) );
Expand All @@ -295,7 +301,8 @@ ACTOR Future<Void> storageServerFailureTracker(
ServerStatus *status,
PromiseStream<Void> serverFailures,
int64_t *unhealthyServers,
UID masterId )
UID masterId,
Version addedVersion )
{
loop {
bool unhealthy = statusMap->count(server.id()) && statusMap->get(server.id()).isUnhealthy();
Expand All @@ -319,7 +326,7 @@ ACTOR Future<Void> storageServerFailureTracker(
TraceEvent("StatusMapChange", masterId).detail("ServerID", server.id()).detail("Status", status->toString()).
detail("Available", IFailureMonitor::failureMonitor().getState(server.waitFailure.getEndpoint()).isAvailable());
}
when ( Void _ = wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, server.id()) : Never() ) ) { break; }
when ( Void _ = wait( status->isUnhealthy() ? waitForAllDataRemoved(cx, server.id(), addedVersion) : Never() ) ) { break; }
}
}

Expand Down Expand Up @@ -479,7 +486,8 @@ Future<Void> storageServerTracker(
std::map<UID, Reference<TCServerInfo>>* const& other_servers,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& changes,
PromiseStream<Void> const& serverFailures,
Promise<Void> const& errorOut);
Promise<Void> const& errorOut,
Version const& addedVersion);

Future<Void> teamTracker( struct DDTeamCollection* const& self, Reference<IDataDistributionTeam> const& team );

Expand Down Expand Up @@ -802,7 +810,7 @@ struct DDTeamCollection {
// we preferentially mark the least used server as undesirable?
for (auto i = initTeams.allServers.begin(); i != initTeams.allServers.end(); ++i) {
if (shouldHandleServer(i->first)) {
addServer(i->first, i->second, serverTrackerErrorOut);
addServer(i->first, i->second, serverTrackerErrorOut, 0);
}
}

Expand Down Expand Up @@ -1156,15 +1164,15 @@ struct DDTeamCollection {
return (includedDCs.empty() || std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end() || (otherTrackedDCs.present() && std::find(otherTrackedDCs.get().begin(), otherTrackedDCs.get().end(), newServer.locality.dcId()) == otherTrackedDCs.get().end()));
}

void addServer( StorageServerInterface newServer, ProcessClass processClass, Promise<Void> errorOut ) {
void addServer( StorageServerInterface newServer, ProcessClass processClass, Promise<Void> errorOut, Version addedVersion ) {
if (!shouldHandleServer(newServer)) {
return;
}
allServers.push_back( newServer.id() );

TraceEvent("AddedStorageServer", masterId).detail("ServerID", newServer.id()).detail("ProcessClass", processClass.toString()).detail("WaitFailureToken", newServer.waitFailure.getEndpoint().token).detail("address", newServer.waitFailure.getEndpoint().address);
auto &r = server_info[newServer.id()] = Reference<TCServerInfo>( new TCServerInfo( newServer, processClass ) );
r->tracker = storageServerTracker( this, cx, r.getPtr(), &server_status, lock, masterId, &server_info, serverChanges, serverFailures, errorOut );
r->tracker = storageServerTracker( this, cx, r.getPtr(), &server_status, lock, masterId, &server_info, serverChanges, serverFailures, errorOut, addedVersion );
restartTeamBuilder.trigger();
}

Expand Down Expand Up @@ -1515,7 +1523,7 @@ ACTOR Future<Void> waitServerListChange( DDTeamCollection *self, Database cx, Fu
currentInterfaceChanged.send( std::make_pair(ssi,processClass) );
}
} else if( !self->recruitingIds.count(ssi.id()) ) {
self->addServer( ssi, processClass, self->serverTrackerErrorOut );
self->addServer( ssi, processClass, self->serverTrackerErrorOut, tr.getReadVersion().get() );
self->doBuildTeams = true;
}
}
Expand Down Expand Up @@ -1567,7 +1575,8 @@ ACTOR Future<Void> storageServerTracker(
std::map<UID, Reference<TCServerInfo>>* other_servers,
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > changes,
PromiseStream<Void> serverFailures,
Promise<Void> errorOut)
Promise<Void> errorOut,
Version addedVersion)
{
state Future<Void> failureTracker;
state ServerStatus status( false, false, server->lastKnownInterface.locality );
Expand Down Expand Up @@ -1651,7 +1660,7 @@ ACTOR Future<Void> storageServerTracker(
otherChanges.push_back( self->excludedServers.onChange( addr ) );
otherChanges.push_back( self->excludedServers.onChange( ipaddr ) );

failureTracker = storageServerFailureTracker( cx, server->lastKnownInterface, statusMap, &status, serverFailures, &self->unhealthyServers, masterId );
failureTracker = storageServerFailureTracker( cx, server->lastKnownInterface, statusMap, &status, serverFailures, &self->unhealthyServers, masterId, addedVersion );

//We need to recruit new storage servers if the key value store type has changed
if(hasWrongStoreTypeOrDC)
Expand Down Expand Up @@ -1766,7 +1775,7 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection *self, RecruitStorageRepl

self->recruitingIds.insert(interfaceId);
self->recruitingLocalities.insert(candidateWorker.worker.address());
ErrorOr<StorageServerInterface> newServer = wait( candidateWorker.worker.storage.tryGetReply( isr, TaskDataDistribution ) );
ErrorOr<InitializeStorageReply> newServer = wait( candidateWorker.worker.storage.tryGetReply( isr, TaskDataDistribution ) );
self->recruitingIds.erase(interfaceId);
self->recruitingLocalities.erase(candidateWorker.worker.address());

Expand All @@ -1782,8 +1791,8 @@ ACTOR Future<Void> initializeStorage( DDTeamCollection *self, RecruitStorageRepl
Void _ = wait( delay(SERVER_KNOBS->STORAGE_RECRUITMENT_DELAY, TaskDataDistribution) );
}
else if( newServer.present() ) {
if( !self->server_info.count( newServer.get().id() ) )
self->addServer( newServer.get(), candidateWorker.processClass, self->serverTrackerErrorOut );
if( !self->server_info.count( newServer.get().interf.id() ) )
self->addServer( newServer.get().interf, candidateWorker.processClass, self->serverTrackerErrorOut, newServer.get().addedVersion );
else
TraceEvent(SevWarn, "DDRecruitmentError").detail("Reason", "Server ID already recruited");

Expand Down
14 changes: 12 additions & 2 deletions fdbserver/WorkerInterface.h
Expand Up @@ -146,12 +146,22 @@ struct InitializeResolverRequest {
}
};

struct InitializeStorageReply {
StorageServerInterface interf;
Version addedVersion;

template <class Ar>
void serialize(Ar& ar) {
ar & interf & addedVersion;
}
};

struct InitializeStorageRequest {
Tag seedTag; //< If this server will be passed to seedShardServers, this will be a tag, otherwise it is invalidTag
UID reqId;
UID interfaceId;
KeyValueStoreType storeType;
ReplyPromise< struct StorageServerInterface > reply;
ReplyPromise< InitializeStorageReply > reply;

template <class Ar>
void serialize( Ar& ar ) {
Expand Down Expand Up @@ -288,7 +298,7 @@ Future<Void> storageServer(
class IKeyValueStore* const& persistentData,
StorageServerInterface const& ssi,
Tag const& seedTag,
ReplyPromise<StorageServerInterface> const& recruitReply,
ReplyPromise<InitializeStorageReply> const& recruitReply,
Reference<AsyncVar<ServerDBInfo>> const& db,
std::string const& folder );
Future<Void> storageServer(
Expand Down
4 changes: 2 additions & 2 deletions fdbserver/masterserver.actor.cpp
Expand Up @@ -338,7 +338,7 @@ ACTOR Future<Void> newSeedServers( Reference<MasterData> self, RecruitFromConfig
isr.reqId = g_random->randomUniqueID();
isr.interfaceId = g_random->randomUniqueID();

ErrorOr<StorageServerInterface> newServer = wait( recruits.storageServers[idx].storage.tryGetReply( isr ) );
ErrorOr<InitializeStorageReply> newServer = wait( recruits.storageServers[idx].storage.tryGetReply( isr ) );

if( newServer.isError() ) {
if( !newServer.isError( error_code_recruitment_failed ) && !newServer.isError( error_code_request_maybe_delivered ) )
Expand All @@ -357,7 +357,7 @@ ACTOR Future<Void> newSeedServers( Reference<MasterData> self, RecruitFromConfig
tag.id++;
idx++;

servers->push_back( newServer.get() );
servers->push_back( newServer.get().interf );
}
}

Expand Down
7 changes: 5 additions & 2 deletions fdbserver/storageserver.actor.cpp
Expand Up @@ -3239,7 +3239,7 @@ bool storageServerTerminated(StorageServer& self, IKeyValueStore* persistentData
return false;
}

ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<StorageServerInterface> recruitReply,
ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerInterface ssi, Tag seedTag, ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db, std::string folder )
{
state StorageServer self(persistentData, db, ssi);
Expand All @@ -3260,7 +3260,10 @@ ACTOR Future<Void> storageServer( IKeyValueStore* persistentData, StorageServerI
Void _ = wait( self.storage.commit() );

TraceEvent("StorageServerInit", ssi.id()).detail("Version", self.version.get()).detail("SeedTag", seedTag.toString());
recruitReply.send(ssi);
InitializeStorageReply rep;
rep.interf = ssi;
rep.addedVersion = self.version.get();
recruitReply.send(rep);
self.byteSampleRecovery = Void();
Void _ = wait( storageServerCore(&self, ssi) );

Expand Down
4 changes: 2 additions & 2 deletions fdbserver/worker.actor.cpp
Expand Up @@ -487,7 +487,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
state double loggingDelay = SERVER_KNOBS->WORKER_LOGGING_INTERVAL;
state ActorCollection filesClosed(true);
state Promise<Void> stopping;
state WorkerCache<StorageServerInterface> storageCache;
state WorkerCache<InitializeStorageReply> storageCache;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo(LiteralStringRef("DB"))) );
state Future<Void> metricsLogger;
state UID processIDUid;
Expand Down Expand Up @@ -761,7 +761,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
IKeyValueStore* data = openKVStore( req.storeType, filename, recruited.id(), memoryLimit );
Future<Void> kvClosed = data->onClosed();
filesClosed.add( kvClosed );
ReplyPromise<StorageServerInterface> storageReady = req.reply;
ReplyPromise<InitializeStorageReply> storageReady = req.reply;
storageCache.set( req.reqId, storageReady.getFuture() );
Future<Void> s = storageServer( data, recruited, req.seedTag, storageReady, dbInfo, folder );
s = handleIOErrors(s, data, recruited.id(), kvClosed);
Expand Down

0 comments on commit b1935f1

Please sign in to comment.