Skip to content

Commit

Permalink
Improve distributed consistency checker (#11346)
Browse files Browse the repository at this point in the history
* ConsistencyCheckerUrgent repeated run

* address comments

* avoid trace SevError for TesterRecruitmentTimeout unless it keeps failure for over 1 day

* address comments

* address comments
  • Loading branch information
kakaiu committed Apr 30, 2024
1 parent 5e9a57b commit bf53218
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 7 deletions.
1 change: 1 addition & 0 deletions fdbclient/ClientKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ void ClientKnobs::initialize(Randomize randomize) {

init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
init( CONSISTENCY_CHECK_URGENT_NEXT_WAIT_TIME, 600 );
init( CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT, 10 ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT = 2;
init( CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX, 10 ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX = 1;
init( CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0, "" ); if( randomize && BUGGIFY ) CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0 = "";
Expand Down
1 change: 1 addition & 0 deletions fdbclient/include/fdbclient/ClientKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ClientKnobs : public KnobsImpl<ClientKno

int CONSISTENCY_CHECK_RATE_LIMIT_MAX; // Available in both normal and urgent mode
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME; // Available in normal mode
int CONSISTENCY_CHECK_URGENT_NEXT_WAIT_TIME; // Available in urgent mode
int CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT; // Available in urgent mode
int CONSISTENCY_CHECK_URGENT_RETRY_DEPTH_MAX; // Available in urgent mode
std::string CONSISTENCY_CHECK_URGENT_RANGE_BEGIN_0; // Available in urgent mode
Expand Down
43 changes: 36 additions & 7 deletions fdbserver/tester.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,7 @@ ACTOR Future<std::vector<TesterInterface>> getTesters(Reference<AsyncVar<Optiona
}
when(wait(cc->onChange())) {}
when(wait(testerTimeout)) {
TraceEvent(SevError, "TesterRecruitmentTimeout").log();
TraceEvent(SevWarnAlways, "TesterRecruitmentTimeout");
throw timed_out();
}
}
Expand Down Expand Up @@ -1792,6 +1792,7 @@ ACTOR Future<Void> runConsistencyCheckerUrgentCore(Reference<AsyncVar<Optional<C
state std::vector<TesterInterface> ts; // used to store testers interface
state std::vector<KeyRange> rangesToCheck; // get from globalProgressMap
state std::vector<KeyRange> shardsToCheck; // get from keyServer metadata
state Optional<double> whenFailedToGetTesterStart;

// Initialize globalProgressMap
Optional<std::vector<KeyRange>> rangesToCheck_ = loadRangesToCheckFromKnob();
Expand Down Expand Up @@ -1838,7 +1839,19 @@ ACTOR Future<Void> runConsistencyCheckerUrgentCore(Reference<AsyncVar<Optional<C
// Step 2: Get testers
ts.clear();
if (!testers.present()) { // In real clusters
wait(store(ts, getTesters(cc, minTestersExpected)));
try {
wait(store(ts, getTesters(cc, minTestersExpected)));
whenFailedToGetTesterStart.reset();
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
if (!whenFailedToGetTesterStart.present()) {
whenFailedToGetTesterStart = now();
} else if (now() - whenFailedToGetTesterStart.get() > 3600 * 24) { // 1 day
TraceEvent(SevError, "TesterRecruitmentTimeout");
}
}
throw e;
}
if (g_network->isSimulated() && deterministicRandom()->random01() < 0.05) {
throw operation_failed(); // Introduce random failure
}
Expand Down Expand Up @@ -1908,9 +1921,24 @@ ACTOR Future<Void> runConsistencyCheckerUrgentCore(Reference<AsyncVar<Optional<C
}
}

ACTOR Future<Void> runConsistencyCheckerUrgentHolder(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> cc,
Database cx,
Optional<std::vector<TesterInterface>> testers,
int minTestersExpected,
bool repeatRun) {
loop {
wait(runConsistencyCheckerUrgentCore(cc, cx, testers, minTestersExpected));
if (!repeatRun) {
break;
}
wait(delay(CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_NEXT_WAIT_TIME));
}
return Void();
}

Future<Void> checkConsistencyUrgentSim(Database cx, std::vector<TesterInterface> testers) {
return runConsistencyCheckerUrgentCore(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>>(), cx, testers, 1);
return runConsistencyCheckerUrgentHolder(
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>>(), cx, testers, 1, /*repeatRun=*/false);
}

ACTOR Future<bool> runTest(Database cx,
Expand Down Expand Up @@ -3018,9 +3046,10 @@ ACTOR Future<Void> runTests(Reference<IClusterConnectionRecord> connRecord,
state Reference<AsyncVar<ServerDBInfo>> dbInfo(new AsyncVar<ServerDBInfo>);
state Future<Void> ccMonitor = monitorServerDBInfo(cc, LocalityData(), dbInfo); // FIXME: locality
cx = openDBOnServer(dbInfo);
tests = reportErrors(
runConsistencyCheckerUrgentCore(cc, cx, Optional<std::vector<TesterInterface>>(), minTestersExpected),
"runConsistencyCheckerUrgentCore");
tests =
reportErrors(runConsistencyCheckerUrgentHolder(
cc, cx, Optional<std::vector<TesterInterface>>(), minTestersExpected, /*repeatRun=*/true),
"runConsistencyCheckerUrgentHolder");
} else if (at == TEST_HERE) {
auto db = makeReference<AsyncVar<ServerDBInfo>>();
std::vector<TesterInterface> iTesters(1);
Expand Down

0 comments on commit bf53218

Please sign in to comment.