Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a variety of miscellaneous issues #492

Merged
merged 12 commits into from Jun 15, 2018
Merged
55 changes: 36 additions & 19 deletions documentation/StatusSchema.json
Expand Up @@ -150,7 +150,12 @@
],
"log_replication_factor":3,
"log_write_anti_quorum":0,
"log_fault_tolerance":2
"log_fault_tolerance":2,
"remote_log_replication_factor":3,
"remote_log_fault_tolerance":2,
"satellite_log_replication_factor":3,
"satellite_log_write_anti_quorum":0,
"satellite_log_fault_tolerance":2
}
],
"fault_tolerance":{
Expand Down Expand Up @@ -188,6 +193,7 @@
"incompatible_connections":[

],
"datacenter_version_difference":0,
"database_available":true,
"database_locked":false,
"generation":2,
Expand Down Expand Up @@ -231,6 +237,7 @@
"$enum":[
"unreachable_master_worker",
"unreadable_configuration",
"full_replication_timeout",
"client_issues",
"unreachable_processes",
"immediate_priority_transaction_start_probe_timeout",
Expand Down Expand Up @@ -332,25 +339,32 @@
},
"cluster_controller_timestamp":1415650089,
"protocol_version":"fdb00a400050001",
"configuration":{
"full_replication":true,
"configuration":{
"log_anti_quorum":0,
"log_replicas":2,
"log_replication_policy":"(zoneid^3x1)",
"redundancy_mode":"single",
"regions":[{
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
}],
"satellite_redundancy_mode":"one_satellite_single",
"satellite_log_replicas":1,
"satellite_usable_dcs":1,
"satellite_anti_quorum":0,
"satellite_log_policy":"(zoneid^3x1)",
"satellite_logs":2
}],
"remote_redundancy_mode":"remote_single",
"remote_log_replicas":3,
"remote_logs":5,
"storage_quorum":1,
"storage_replicas":1,
"resolvers":1,
"redundancy":{
"factor":{
"$enum":[
"single",
"double",
"triple",
"custom",
"two_datacenter",
"three_datacenter",
"three_data_hall",
"fast_recovery_double",
"fast_recovery_triple"
]
}
},
"storage_policy":"(zoneid^3x1)",
"tlog_policy":"(zoneid^2x1)",
"storage_replication_policy":"(zoneid^3x1)",
"logs":2,
"storage_engine":{
"$enum":[
Expand All @@ -367,6 +381,9 @@
"address":"10.0.4.1"
}
],
"auto_proxies":3,
"auto_resolvers":1,
"auto_logs":3,
"proxies":5
},
"data":{
Expand Down
8 changes: 3 additions & 5 deletions fdbclient/DatabaseConfiguration.cpp
Expand Up @@ -160,24 +160,22 @@ bool DatabaseConfiguration::isValid() const {
getDesiredRemoteLogs() >= 1 &&
remoteTLogReplicationFactor >= 0 &&
regions.size() <= 2 &&
( remoteTLogReplicationFactor == 0 || ( remoteTLogPolicy && regions.size() == 2 && durableStorageQuorum == storageTeamSize ) ) ) ) {
( remoteTLogReplicationFactor == 0 || ( remoteTLogPolicy && regions.size() == 2 && durableStorageQuorum == storageTeamSize ) ) &&
( regions.size() == 0 || regions[0].priority >= 0 ) ) ) {
return false;
}

std::set<Key> dcIds;
std::set<int> priorities;
dcIds.insert(Key());
for(auto& r : regions) {
if( !(!dcIds.count(r.dcId) &&
!priorities.count(r.priority) &&
r.satelliteTLogReplicationFactor >= 0 &&
r.satelliteTLogWriteAntiQuorum >= 0 &&
r.satelliteTLogUsableDcs >= 0 &&
r.satelliteTLogUsableDcs >= 1 &&
( r.satelliteTLogReplicationFactor == 0 || ( r.satelliteTLogPolicy && r.satellites.size() ) ) ) ) {
return false;
}
dcIds.insert(r.dcId);
priorities.insert(r.priority);
for(auto& s : r.satellites) {
if(dcIds.count(s.dcId)) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion fdbclient/DatabaseConfiguration.h
Expand Up @@ -57,7 +57,7 @@ struct RegionInfo {

std::vector<SatelliteInfo> satellites;

RegionInfo() : priority(0), satelliteDesiredTLogCount(-1), satelliteTLogReplicationFactor(0), satelliteTLogWriteAntiQuorum(0), satelliteTLogUsableDcs(0) {}
RegionInfo() : priority(0), satelliteDesiredTLogCount(-1), satelliteTLogReplicationFactor(0), satelliteTLogWriteAntiQuorum(0), satelliteTLogUsableDcs(1) {}

struct sort_by_priority {
bool operator ()(RegionInfo const&a, RegionInfo const& b) const { return a.priority > b.priority; }
Expand Down
2 changes: 1 addition & 1 deletion fdbclient/FDBTypes.h
Expand Up @@ -33,7 +33,7 @@ typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;

enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4, tagLocalityInvalid = -99 }; //The TLog and LogRouter require these number to be as compact as possible
enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4, tagLocalitySatellite = -5, tagLocalityInvalid = -99 }; //The TLog and LogRouter require these number to be as compact as possible

#pragma pack(push, 1)
struct Tag {
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/SystemData.cpp
Expand Up @@ -258,6 +258,8 @@ int decodeDatacenterReplicasValue( ValueRef const& value ) {
return s;
}

const KeyRef primaryDatacenterKey = LiteralStringRef("\xff/primaryDatacenter");

// serverListKeys.contains(k) iff k.startsWith( serverListKeys.begin ) because '/'+1 == '0'
const KeyRangeRef serverListKeys(
LiteralStringRef("\xff/serverList/"),
Expand Down
2 changes: 2 additions & 0 deletions fdbclient/SystemData.h
Expand Up @@ -86,6 +86,8 @@ const Value datacenterReplicasValue( int const& );
Optional<Value> decodeDatacenterReplicasKey( KeyRef const& );
int decodeDatacenterReplicasValue( ValueRef const& );

extern const KeyRef primaryDatacenterKey;

// "\xff/serverList/[[serverID]]" := "[[StorageServerInterface]]"
// Storage servers are listed here when they are recruited - always before assigning them keys
// Storage servers removed from here are never replaced. The same fdbserver, if re-recruited, will always
Expand Down
49 changes: 30 additions & 19 deletions fdbserver/ClusterController.actor.cpp
Expand Up @@ -513,15 +513,31 @@ class ClusterControllerData {

std::vector<std::pair<WorkerInterface, ProcessClass>> satelliteLogs;
if(region.satelliteTLogReplicationFactor > 0) {
std::set<Optional<Key>> satelliteDCs;
for(auto& s : region.satellites) {
satelliteDCs.insert(s.dcId);
}
//FIXME: recruitment does not respect usable_dcs, a.k.a if usable_dcs is 1 we should recruit all tlogs in one data center
satelliteLogs = getWorkersForTlogs( req.configuration, region.satelliteTLogReplicationFactor, req.configuration.getDesiredSatelliteLogs(dcId), region.satelliteTLogPolicy, id_used, false, satelliteDCs );
int startDC = 0;
loop {
if(startDC > 0 && startDC >= region.satellites.size() + 1 - region.satelliteTLogUsableDcs) {
throw no_more_servers();
}

try {
std::set<Optional<Key>> satelliteDCs;
for(int s = startDC; s < std::min<int>(startDC + region.satelliteTLogUsableDcs, region.satellites.size()); s++) {
satelliteDCs.insert(region.satellites[s].dcId);
}

satelliteLogs = getWorkersForTlogs( req.configuration, region.satelliteTLogReplicationFactor, req.configuration.getDesiredSatelliteLogs(dcId), region.satelliteTLogPolicy, id_used, false, satelliteDCs );

for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].first);
}
break;
} catch (Error &e) {
if(e.code() != error_code_no_more_servers) {
throw;
}
}

for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].first);
startDC++;
}
}

Expand Down Expand Up @@ -561,9 +577,6 @@ class ClusterControllerData {
if(regions[0].priority == regions[1].priority && clusterControllerDcId.present() && regions[1].dcId == clusterControllerDcId.get()) {
std::swap(regions[0], regions[1]);
}
if(regions[0].priority < 0) {
throw no_more_servers();
}
bool setPrimaryDesired = false;
try {
auto reply = findWorkersForConfiguration(req, regions[0].dcId);
Expand Down Expand Up @@ -598,9 +611,6 @@ class ClusterControllerData {
throw;
}
} else if(req.configuration.regions.size() == 1) {
if(req.configuration.regions[0].priority < 0) {
throw no_more_servers();
}
vector<Optional<Key>> dcPriority;
dcPriority.push_back(req.configuration.regions[0].dcId);
desiredDcIds.set(dcPriority);
Expand Down Expand Up @@ -784,10 +794,11 @@ class ClusterControllerData {
if ( tlogWorker->second.priorityInfo.isExcluded )
return true;

if(logSet.isLocal && logSet.hasBestPolicy > HasBestPolicyNone) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else if(logSet.isLocal) {
if(logSet.isLocal && logSet.locality == tagLocalitySatellite) {
satellite_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
else if(logSet.isLocal) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
} else {
remote_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
}
Expand Down Expand Up @@ -1740,7 +1751,7 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
}
}

ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections)));
ErrorOr<StatusReply> result = wait(errorOr(clusterGetStatus(self->db.serverInfo, self->cx, workers, self->db.workersWithIssues, self->db.clientsWithIssues, self->db.clientVersionMap, self->db.traceLogGroupMap, coordinators, incompatibleConnections, self->datacenterVersionDifference)));
if (result.isError() && result.getError().code() == error_code_actor_cancelled)
throw result.getError();

Expand Down Expand Up @@ -1942,7 +1953,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().recoveryState == RecoveryState::REMOTE_RECOVERED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.hasBestPolicy) {
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
primaryLog = tLog.interf();
Expand Down
9 changes: 4 additions & 5 deletions fdbserver/DBCoreState.h
Expand Up @@ -41,20 +41,19 @@ struct CoreTLogSet {
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;

CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}

bool operator == (CoreTLogSet const& rhs) const {
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && hasBestPolicy == rhs.hasBestPolicy &&
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal &&
locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
}

template <class Archive>
void serialize(Archive& ar) {
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality & startVersion;
ar & tLogs & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
}
};

Expand Down Expand Up @@ -110,7 +109,7 @@ struct DBCoreState {
template <class Archive>
void serialize(Archive& ar) {
//FIXME: remove when we no longer need to test upgrades from 4.X releases
if(ar.protocolVersion() < 0x0FDB00A460010001LL) {
if(g_network->isSimulated() && ar.protocolVersion() < 0x0FDB00A460010001LL) {
TraceEvent("ElapsedTime").detail("SimTime", now()).detail("RealTime", 0).detail("RandomUnseed", 0);
flushAndExit(0);
}
Expand Down
3 changes: 1 addition & 2 deletions fdbserver/LogRouter.actor.cpp
Expand Up @@ -105,7 +105,6 @@ struct LogRouterData {
//setup just enough of a logSet to be able to call getPushLocations
logSet.logServers.resize(req.tLogLocalities.size());
logSet.tLogPolicy = req.tLogPolicy;
logSet.hasBestPolicy = req.hasBestPolicy;
logSet.locality = req.locality;
logSet.updateLocalitySet(req.tLogLocalities);

Expand Down Expand Up @@ -413,7 +412,7 @@ ACTOR Future<Void> logRouter(
Reference<AsyncVar<ServerDBInfo>> db)
{
try {
TraceEvent("LogRouterStart", interf.id()).detail("Start", req.startVersion).detail("Tag", req.routerTag.toString()).detail("Localities", req.tLogLocalities.size()).detail("HasBestPolicy", req.hasBestPolicy).detail("Locality", req.locality);
TraceEvent("LogRouterStart", interf.id()).detail("Start", req.startVersion).detail("Tag", req.routerTag.toString()).detail("Localities", req.tLogLocalities.size()).detail("Locality", req.locality);
state Future<Void> core = logRouterCore(interf, req, db);
loop choose{
when(Void _ = wait(core)) { return Void(); }
Expand Down
24 changes: 6 additions & 18 deletions fdbserver/LogSystem.h
Expand Up @@ -44,12 +44,11 @@ class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
std::vector<int> logIndexArray;
std::map<int,LocalityEntry> logEntryMap;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;
std::vector<Future<TLogLockResult>> replies;

LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}

std::string logRouterString() {
std::string result;
Expand All @@ -74,17 +73,8 @@ class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
}

int bestLocationFor( Tag tag ) {
if(hasBestPolicy == HasBestPolicyNone) {
return -1;
} else if(hasBestPolicy == HasBestPolicyId) {
//This policy supports upgrades from 5.X
if(tag == txsTag) return txsTagOld % logServers.size();
return tag.id % logServers.size();
} else {
//Unsupported policy
ASSERT(false);
throw internal_error();
}
if(tag == txsTag) return txsTagOld % logServers.size();
return tag.id % logServers.size();
}

void updateLocalitySet() {
Expand Down Expand Up @@ -127,11 +117,9 @@ class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
alsoServers.clear();
resultEntries.clear();

if(hasBestPolicy) {
for(auto& t : tags) {
if(t.locality == locality || t.locality == tagLocalitySpecial || locality == tagLocalitySpecial || (isLocal && t.locality == tagLocalityLogRouter)) {
newLocations.push_back(bestLocationFor(t));
}
for(auto& t : tags) {
if(locality == tagLocalitySpecial || t.locality == locality || t.locality < 0) {
newLocations.push_back(bestLocationFor(t));
}
}

Expand Down
13 changes: 5 additions & 8 deletions fdbserver/LogSystemConfig.h
Expand Up @@ -55,27 +55,24 @@ struct OptionalInterface {
Optional<Interface> iface;
};

enum { HasBestPolicyNone = 0, HasBestPolicyId = 1 };

struct TLogSet {
std::vector<OptionalInterface<TLogInterface>> tLogs;
std::vector<OptionalInterface<TLogInterface>> logRouters;
int32_t tLogWriteAntiQuorum, tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy;
bool isLocal;
int32_t hasBestPolicy;
int8_t locality;
Version startVersion;

TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}

std::string toString() const {
return format("anti: %d replication: %d local: %d best: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, hasBestPolicy, logRouters.size(), describe(tLogs).c_str(), locality);
return format("anti: %d replication: %d local: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, logRouters.size(), describe(tLogs).c_str(), locality);
}

bool operator == ( const TLogSet& rhs ) const {
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal || hasBestPolicy != rhs.hasBestPolicy ||
if (tLogWriteAntiQuorum != rhs.tLogWriteAntiQuorum || tLogReplicationFactor != rhs.tLogReplicationFactor || isLocal != rhs.isLocal ||
startVersion != rhs.startVersion || tLogs.size() != rhs.tLogs.size() || locality != rhs.locality || logRouters.size() != rhs.logRouters.size()) {
return false;
}
Expand All @@ -96,7 +93,7 @@ struct TLogSet {
}

bool isEqualIds(TLogSet const& r) const {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || hasBestPolicy != r.hasBestPolicy || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
if (tLogWriteAntiQuorum != r.tLogWriteAntiQuorum || tLogReplicationFactor != r.tLogReplicationFactor || isLocal != r.isLocal || startVersion != r.startVersion || tLogs.size() != r.tLogs.size() || locality != r.locality) {
return false;
}
if ((tLogPolicy && !r.tLogPolicy) || (!tLogPolicy && r.tLogPolicy) || (tLogPolicy && (tLogPolicy->info() != r.tLogPolicy->info()))) {
Expand All @@ -112,7 +109,7 @@ struct TLogSet {

template <class Ar>
void serialize( Ar& ar ) {
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & hasBestPolicy & locality & startVersion;
ar & tLogs & logRouters & tLogWriteAntiQuorum & tLogReplicationFactor & tLogPolicy & tLogLocalities & isLocal & locality & startVersion;
}
};

Expand Down