Skip to content

Commit

Permalink
Merge pull request #11127 from kakaiu/improve-consistency-check-urgent
Browse files Browse the repository at this point in the history
  • Loading branch information
jzhou77 committed Jan 17, 2024
2 parents ddaa2fd + 7e9594f commit c23d4f6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 19 deletions.
12 changes: 6 additions & 6 deletions fdbclient/ConsistencyCheckUtil.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ Key getKeyFromString(const std::string& str) {
return emptyKey;
}
if (str.size() % 4 != 0) {
TraceEvent(SevWarnAlways, "ConsistencyCheck_GetKeyFromStringError")
TraceEvent(SevWarnAlways, "ConsistencyCheckUrgent_GetKeyFromStringError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongLength")
Expand All @@ -320,7 +320,7 @@ Key getKeyFromString(const std::string& str) {
std::vector<uint8_t> byteList;
for (int i = 0; i < str.size(); i += 4) {
if (str.at(i + 0) != '\\' || str.at(i + 1) != 'x') {
TraceEvent(SevWarnAlways, "ConsistencyCheck_GetKeyFromStringError")
TraceEvent(SevWarnAlways, "ConsistencyCheckUrgent_GetKeyFromStringError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongBytePrefix")
Expand All @@ -330,7 +330,7 @@ Key getKeyFromString(const std::string& str) {
const char first = str.at(i + 2);
const char second = str.at(i + 3);
if (parseCharMap.count(first) == 0 || parseCharMap.count(second) == 0) {
TraceEvent(SevWarnAlways, "ConsistencyCheck_GetKeyFromStringError")
TraceEvent(SevWarnAlways, "ConsistencyCheckUrgent_GetKeyFromStringError")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Reason", "WrongByteContent")
Expand Down Expand Up @@ -403,7 +403,7 @@ std::vector<KeyRange> loadRangesToCheckFromKnob() {
rangeBegin = allKeys.end;
}
if (rangeEnd > allKeys.end) {
TraceEvent("ConsistencyCheck_ReverseInputRange")
TraceEvent("ConsistencyCheckUrgent_ReverseInputRange")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Index", i)
Expand All @@ -418,7 +418,7 @@ std::vector<KeyRange> loadRangesToCheckFromKnob() {
} else if (rangeBegin > rangeEnd) {
rangeToCheck = Standalone(KeyRangeRef(rangeEnd, rangeBegin));
} else {
TraceEvent("ConsistencyCheck_EmptyInputRange")
TraceEvent("ConsistencyCheckUrgent_EmptyInputRange")
.setMaxEventLength(-1)
.setMaxFieldLength(-1)
.detail("Index", i)
Expand All @@ -437,7 +437,7 @@ std::vector<KeyRange> loadRangesToCheckFromKnob() {
res.push_back(rangeToCheck.range());
}
}
TraceEvent e("ConsistencyCheck_LoadedInputRange");
TraceEvent e("ConsistencyCheckUrgent_LoadedInputRange");
e.setMaxEventLength(-1);
e.setMaxFieldLength(-1);
for (int i = 0; i < res.size(); i++) {
Expand Down
62 changes: 58 additions & 4 deletions fdbserver/tester.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,11 +710,45 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
state PromiseStream<Future<Void>> addWorkload;
state Future<Void> workerFatalError = actorCollection(addWorkload.getFuture());

TraceEvent("StartingTesterServerCore", interf.id());
// Dedicated to consistencyCheckerUrgent
// At any time, we only allow at most 1 consistency checker workload on a server
state std::pair<int64_t, Future<Void>> consistencyCheckerUrgentTester = std::make_pair(0, Future<Void>());

TraceEvent(SevInfo, "StartingTesterServerCore", interf.id());
loop choose {
when(wait(workerFatalError)) {}
when(wait(consistencyCheckerUrgentTester.second.isValid() ? consistencyCheckerUrgentTester.second : Never())) {
ASSERT(consistencyCheckerUrgentTester.first != 0);
TraceEvent(SevInfo, "ConsistencyCheckUrgent_ServerWorkloadEnd", interf.id())
.detail("ConsistencyCheckerId", consistencyCheckerUrgentTester.first);
consistencyCheckerUrgentTester = std::make_pair(0, Future<Void>()); // reset
}
when(WorkloadRequest work = waitNext(interf.recruitments.getFuture())) {
addWorkload.send(testerServerWorkload(work, ccr, dbInfo, locality));
if (work.sharedRandomNumber > SERVER_KNOBS->CONSISTENCY_CHECK_ID_MIN &&
work.sharedRandomNumber < SERVER_KNOBS->CONSISTENCY_CHECK_ID_MAX_PLUS_ONE) {
// The workload is a consistency checker urgent workload
if (work.sharedRandomNumber == consistencyCheckerUrgentTester.first) {
TraceEvent(SevInfo, "ConsistencyCheckUrgent_ServerDuplicatedRequest", interf.id())
.detail("ConsistencyCheckerId", work.sharedRandomNumber)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount);
} else if (consistencyCheckerUrgentTester.second.isValid() &&
!consistencyCheckerUrgentTester.second.isReady()) {
TraceEvent(SevWarnAlways, "ConsistencyCheckUrgent_ServerConflict", interf.id())
.detail("ExistingConsistencyCheckerId", consistencyCheckerUrgentTester.first)
.detail("ArrivingConsistencyCheckerId", work.sharedRandomNumber)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount);
}
consistencyCheckerUrgentTester =
std::make_pair(work.sharedRandomNumber, testerServerWorkload(work, ccr, dbInfo, locality));
TraceEvent(SevInfo, "ConsistencyCheckUrgent_ServerWorkloadStart", interf.id())
.detail("ConsistencyCheckerId", consistencyCheckerUrgentTester.first)
.detail("ClientId", work.clientId)
.detail("ClientCount", work.clientCount);
} else {
addWorkload.send(testerServerWorkload(work, ccr, dbInfo, locality));
}
}
}
}
Expand Down Expand Up @@ -1036,8 +1070,9 @@ ACTOR Future<std::unordered_set<int>> runUrgentConsistencyCheckWorkload(
for (int i = 0; i < workRequests.size(); i++) {
ASSERT(workRequests[i].isReady());
if (workRequests[i].get().isError()) {
Error e = workRequests[i].get().getError();
TraceEvent("ConsistencyCheckUrgent_FailedToContactToClient")
.error(workRequests[i].get().getError())
.error(e)
.detail("TesterCount", testers.size())
.detail("TesterId", i)
.detail("ConsistencyCheckerId", consistencyCheckerId);
Expand Down Expand Up @@ -1065,8 +1100,15 @@ ACTOR Future<std::unordered_set<int>> runUrgentConsistencyCheckWorkload(
}
}
try {
throwIfError(setups, "ConsistencyCheckUrgent_SetupWorkloadFailed");
for (auto& setup : setups) {
if (setup.isError()) {
throw setup.getError();
} else if (setup.get().isError()) {
throw setup.get().getError();
}
}
} catch (Error& e) {
TraceEvent(SevWarn, "ConsistencyCheckUrgent_SetupWorkloadFailed").error(e);
// Give up this round if any setup failed
for (int i = 0; i < workRequests.size(); i++) {
ASSERT(workRequests[i].isReady());
Expand Down Expand Up @@ -1119,7 +1161,19 @@ ACTOR Future<std::unordered_set<int>> runUrgentConsistencyCheckWorkload(
for (int i = 0; i < workRequests.size(); i++) {
ASSERT(workRequests[i].isReady());
if (!workRequests[i].get().isError()) {
TraceEvent("ConsistencyCheckUrgent_RunWorkloadStopSignal")
.detail("State", "Succeed")
.detail("ClientId", i)
.detail("ClientCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
workRequests[i].get().get().stop.send(ReplyPromise<Void>());
// This signal is not reliable but acceptable
} else {
TraceEvent("ConsistencyCheckUrgent_RunWorkloadStopSignal")
.detail("State", "Not issue since the interface is failed to fetch")
.detail("ClientId", i)
.detail("ClientCount", testers.size())
.detail("ConsistencyCheckerId", consistencyCheckerId);
}
}

Expand Down
26 changes: 17 additions & 9 deletions fdbserver/workloads/ConsistencyCheck.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,15 @@ struct ConsistencyCheckWorkload : TestWorkload {
}

ACTOR Future<Void> _startUrgent(Database cx, ConsistencyCheckWorkload* self) {
TraceEvent("ConsistencyCheck_Start")
.detail("Mode", "Urgent")
TraceEvent("ConsistencyCheckUrgent_Start")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("Distributed", self->distributed)
.detail("ClientCount", self->clientCount)
.detail("ClientId", self->clientId)
.detail("Indefinite", self->indefinite)
.detail("Repetitions", self->repetitions);
wait(self->runUrgentCheck(cx, self));
TraceEvent("ConsistencyCheck_Exit")
.detail("Mode", "Urgent")
TraceEvent("ConsistencyCheckUrgent_Exit")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("Distributed", self->distributed)
.detail("ClientCount", self->clientCount)
Expand All @@ -222,7 +220,6 @@ struct ConsistencyCheckWorkload : TestWorkload {
loop {
while (self->suspendConsistencyCheck.get()) {
TraceEvent("ConsistencyCheck_Suspended")
.detail("Mode", "Normal")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("Distributed", self->distributed)
.detail("ClientCount", self->clientCount)
Expand All @@ -233,7 +230,6 @@ struct ConsistencyCheckWorkload : TestWorkload {
wait(self->suspendConsistencyCheck.onChange());
}
TraceEvent("ConsistencyCheck_StartingOrResuming")
.detail("Mode", "Normal")
.detail("ConsistencyCheckerId", self->consistencyCheckerId)
.detail("Distributed", self->distributed)
.detail("ClientCount", self->clientCount)
Expand Down Expand Up @@ -1582,6 +1578,8 @@ struct ConsistencyCheckWorkload : TestWorkload {

// Step 3: Read a limited number of entries at a time, repeating until all keys in the shard have been read
state int64_t totalReadAmount = 0;
state int64_t shardReadAmount = 0;
state int64_t shardKeyCompared = 0;
state bool valueAvailableToCheck = true;
state KeySelector begin = firstGreaterOrEqual(range.begin);
loop {
Expand Down Expand Up @@ -1617,8 +1615,10 @@ struct ConsistencyCheckWorkload : TestWorkload {
if (rangeResult.present()) {
e.detail("ErrorPresent", rangeResult.get().error.present());
if (rangeResult.get().error.present()) {
e.detail("Error", rangeResult.get().error.get().name());
e.detail("Error", rangeResult.get().error.get().what());
}
} else {
e.detail("ResultNotPresentWithError", rangeResult.getError().what());
}
break;
}
Expand Down Expand Up @@ -1650,6 +1650,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
// to compare against
if (firstValidServer == -1) {
firstValidServer = j;
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
shardKeyCompared += current.data.size();
} else {
// Compare this shard against the first
GetKeyValuesReply reference = keyValueFutures[firstValidServer].get().get();
Expand Down Expand Up @@ -1736,6 +1738,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
// RateKeeping
wait(rateLimiter->getAllowance(totalReadAmount));

shardReadAmount += totalReadAmount;

// Advance to the next set of entries
ASSERT(firstValidServer != -1);
if (keyValueFutures[firstValidServer].get().get().more) {
Expand Down Expand Up @@ -1773,7 +1777,9 @@ struct ConsistencyCheckWorkload : TestWorkload {
.detail("ShardBegin", range.begin)
.detail("ShardEnd", range.end)
.detail("ReplicaCount", storageServers.size())
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch)
.detail("ShardBytesRead", shardReadAmount)
.detail("ShardKeysCompared", shardKeyCompared);
} else {
numCompleteShards++;
if (!self->rangesToCheck.present()) { // In case we are able to persist progress
Expand All @@ -1796,7 +1802,9 @@ struct ConsistencyCheckWorkload : TestWorkload {
.detail("NumFailedShards", numFailedShards)
.detail("NumShardThisClient", numShardThisClient)
.detail("NumShardToCheckThisEpoch", numShardToCheck - 1)
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch);
.detail("ConsistencyCheckEpoch", consistencyCheckEpoch)
.detail("ShardBytesRead", shardReadAmount)
.detail("ShardKeysCompared", shardKeyCompared);
}
}

Expand Down

0 comments on commit c23d4f6

Please sign in to comment.