Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixbackup copy #11077

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions fdbclient/FileBackupAgent.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3435,8 +3435,10 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase {
auto it = std::find_if(
ids.begin(), ids.end(), [uid](const std::pair<UID, Version>& p) { return p.first == uid; });
if (it == ids.end()) {
// if not exist, then set it in ids and save it back
ids.emplace_back(uid, Params.beginVersion().get(task));
} else {
// if already exist, update local
Params.beginVersion().set(task, it->second);
}

Expand All @@ -3446,6 +3448,7 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase {
}

// The task may be restarted. Set the watch if started key has NOT been set.
// why we do not check the value, i.e. true or false?
if (!taskStarted.get().present()) {
watchFuture = tr->watch(config.allWorkerStarted().key);
}
Expand Down
15 changes: 15 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,21 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
return ids;
}

// Value encodeAllWorkerStartedValue(Version v) {
// BinaryWriter wr(Unversioned());
// wr << v;
// return wr.toValue();
// }

// Version decodeAllWorkerStartedValue(const ValueRef& value) {
// Version v = 0;
// BinaryReader reader(value, IncludeVersion());
// if (value.size() > 0) {
// reader >> v;
// }
// return v;
// }

bool mutationForKey(const MutationRef& m, const KeyRef& key) {
return isSingleKeyMutation((MutationRef::Type)m.type) && m.param1 == key;
}
Expand Down
3 changes: 2 additions & 1 deletion fdbclient/include/fdbclient/BackupAgent.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,8 @@ class BackupConfig : public KeyBackedTaskConfig {
KeyBackedProperty<Reference<IBackupContainer>> backupContainer() { return configSpace.pack(__FUNCTION__sr); }

// Set to true when all backup workers for saving mutation logs have been started.
KeyBackedProperty<bool> allWorkerStarted() { return configSpace.pack(__FUNCTION__sr); }
// configSpace::pack() returns a Key, how can we return a `KeyBackedProperty`
KeyBackedProperty<Version> allWorkerStarted() { return configSpace.pack(__FUNCTION__sr); }

// Each backup worker adds its (epoch, tag.id) to this property.
KeyBackedProperty<std::vector<std::pair<int64_t, int64_t>>> startedBackupWorkers() {
Expand Down
1 change: 0 additions & 1 deletion fdbclient/include/fdbclient/KeyBackedTypes.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ class KeyBackedProperty {

template <class Transaction>
Future<Optional<T>> get(Transaction tr, Snapshot snapshot = Snapshot::False) const {

if constexpr (is_transaction_creator<Transaction>) {
return runTransaction(tr, [=, self = *this](decltype(tr->createTransaction()) tr) {
if constexpr (SystemAccess) {
Expand Down
91 changes: 79 additions & 12 deletions fdbserver/BackupWorker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ struct BackupData {
wait(config.startedBackupWorkers().get(tr));
workers = tmp;
if (!updated) {
// add this worker's info into "workers" vector
if (workers.present()) {
workers.get().emplace_back(self->recruitedEpoch, (int64_t)self->tag.id);
} else {
Expand All @@ -218,7 +219,8 @@ struct BackupData {
tags.insert(p.second);
}
if (self->totalTags == tags.size()) {
config.allWorkerStarted().set(tr, true);
// first set it to 0 to indicate all worker has been started, then get the commit version of this txn and set the version
config.allWorkerStarted().set(tr, 0);
allUpdated = true;
} else {
// monitor all workers' updates
Expand All @@ -237,11 +239,16 @@ struct BackupData {

updated = true; // Only set to true after commit.
if (allUpdated) {
Version commitVersion = tr->getCommittedVersion();
tr->reset();
config.allWorkerStarted().set(tr, commitVersion);
wait(tr->commit());
break;
}
wait(watchFuture);
tr->reset();
} else {
// update startedBackupWorkers's value to "workers"
ASSERT(workers.present() && workers.get().size() > 0);
config.startedBackupWorkers().set(tr, workers.get());
wait(tr->commit());
Expand Down Expand Up @@ -432,6 +439,12 @@ struct BackupData {
// is already popped. Advance the version is safe because these
// versions are not popped -- if they are popped, their progress should
// be already recorded and Master would use a higher version than minVersion.
TraceEvent("Hfu5ChangeSavedVersion")
.detail("BcakupEpoch", backupEpoch)
.detail("RecruitedEpoch", recruitedEpoch)
.detail("SavedVersion", savedVersion)
.detail("StartVersion", startVersion)
.log();
savedVersion = std::max(minVersion, savedVersion);
}
if (modified)
Expand Down Expand Up @@ -511,21 +524,31 @@ ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool present
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> value = wait(tr.get(backupStartedKey));
std::vector<std::pair<UID, Version>> uidVersions;
bool shouldExit = self->endVersion.present();
state std::vector<std::pair<UID, Version>> uidVersions;
state bool shouldExit = self->endVersion.present();
if (value.present()) {
uidVersions = decodeBackupStartedValue(value.get());
TraceEvent e("BackupWorkerGotStartKey", self->myId);
int i = 1;
state TraceEvent e("BackupWorkerGotStartKey", self->myId);
state int i = 1;
for (auto [uid, version] : uidVersions) {
e.detail(format("BackupID%d", i), uid).detail(format("Version%d", i), version);
i++;
if (shouldExit && version < self->endVersion.get()) {
shouldExit = false;
}
BackupConfig config(uid);
// Optional<Version> taskStarted = wait(config.allWorkerStarted().get(tr)); this does not compile
// transform the Value to Version, what's the best way?
Optional<Value> taskStarted = wait(tr.get(config.allWorkerStarted().key));
if (taskStarted.present()) {
Version v = Tuple::unpack(taskStarted.get()).getInt(0);
TraceEvent("Hfu5TaskPresent").detail("V", v).detail("Saved", self->savedVersion).log();
self->savedVersion = std::max(self->savedVersion, v);
}
}
self->exitEarly = shouldExit;
self->onBackupChanges(uidVersions);
// hfu5: it returns when backupStarted key is set
if (present || !watch)
return true;
} else {
Expand Down Expand Up @@ -561,7 +584,7 @@ ACTOR Future<Void> setBackupKeys(BackupData* self, std::map<UID, Version> savedL

state std::vector<Future<Optional<Version>>> prevVersions;
state std::vector<BackupConfig> versionConfigs;
state std::vector<Future<Optional<bool>>> allWorkersReady;
state std::vector<Future<Optional<Version>>> allWorkersReady;
for (const auto& [uid, version] : savedLogVersions) {
versionConfigs.emplace_back(uid);
prevVersions.push_back(versionConfigs.back().latestBackupWorkerSavedVersion().get(tr));
Expand All @@ -571,9 +594,9 @@ ACTOR Future<Void> setBackupKeys(BackupData* self, std::map<UID, Version> savedL
wait(waitForAll(prevVersions) && waitForAll(allWorkersReady));

for (int i = 0; i < prevVersions.size(); i++) {
if (!allWorkersReady[i].get().present() || !allWorkersReady[i].get().get())
if (!allWorkersReady[i].get().present() || allWorkersReady[i].get().get() <= 0) {
continue;

}
const Version current = savedLogVersions[versionConfigs[i].getUid()];
if (prevVersions[i].get().present()) {
const Version prev = prevVersions[i].get().get();
Expand Down Expand Up @@ -922,6 +945,9 @@ ACTOR Future<Void> uploadData(BackupData* self) {
}

if (popVersion > self->savedVersion && popVersion > self->popVersion) {
// hfu5: saveProgress is called here, but not in NOOP mode.
// Thus there might be race condition when 1 backup runs in NOOP mode popping, ther other backup start and not seeing
// certain versions being popped and try to get it from TLog.
wait(saveProgress(self, popVersion));
TraceEvent("BackupWorkerSavedProgress", self->myId)
.detail("Tag", self->tag.toString())
Expand All @@ -943,12 +969,15 @@ ACTOR Future<Void> uploadData(BackupData* self) {

// Pulls data from TLog servers using LogRouter tag.
ACTOR Future<Void> pullAsyncData(BackupData* self) {
TraceEvent("BackupWorkerPull", self->myId)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", self->popVersion)
.log();
state Future<Void> logSystemChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion));
state Arena prev;

TraceEvent("BackupWorkerPull", self->myId).log();
loop {
while (self->paused.get()) {
wait(self->paused.onChange());
Expand All @@ -968,6 +997,18 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
logSystemChange = self->logSystem.onChange();
}
}

// It's data loss issue if popped() > 0. It means mutations between popped() and tagAt are not available
if (r->popped() > 0) {
TraceEvent(SevWarn, "BackupWorkerPullMissingMutations", self->myId)
.detail("Tag", self->tag)
.detail("Start", self->startVersion)
.detail("Saved", self->savedVersion)
.detail("BackupEpoch", self->backupEpoch)
.detail("Popped", r->popped())
.detail("ExpectedPeekVersion", tagAt);
throw worker_removed();
}
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());

// Note we aggressively peek (uncommitted) messages, but only committed
Expand Down Expand Up @@ -1006,10 +1047,20 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {

ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) {
state Future<Void> pullFinished = Void();

TraceEvent("MonitorBackupActor", self->myId)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", self->popVersion)
.detail("KeyPresent", keyPresent)
.log();
loop {
state Future<bool> present = monitorBackupStartedKeyChanges(self, !keyPresent, /*watch=*/true);
// NOOP mode is quitted once each backup worker found out the `backupStartedKey` is set
// so it is always earlier than the allWorkerStarted
if (keyPresent) {
TraceEvent("Hfu5StartPull", self->myId)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", self->popVersion)
.log();
pullFinished = pullAsyncData(self);
self->pulling = true;
wait(success(present) || pullFinished);
Expand All @@ -1031,6 +1082,10 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)

loop choose {
when(wait(success(present))) {
TraceEvent("Hfu5Present", self->myId)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", self->popVersion)
.log();
break;
}
when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) {
Expand All @@ -1039,9 +1094,12 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
std::max(self->popVersion, std::max(committedVersion.get(), self->savedVersion));
self->minKnownCommittedVersion =
std::max(committedVersion.get(), self->minKnownCommittedVersion);
// wait(saveProgress(self, self->popVersion));
self->savedVersion = std::max(self->popVersion, self->savedVersion);
TraceEvent("BackupWorkerNoopPop", self->myId)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", self->popVersion);
.detail("PopVersion", self->popVersion)
.log();
self->pop(); // Pop while the worker is in this NOOP state.
committedVersion = Never();
} else {
Expand Down Expand Up @@ -1115,20 +1173,29 @@ ACTOR Future<Void> backupWorker(BackupInterface interf,
.detail("StartVersion", req.startVersion)
.detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1)
.detail("LogEpoch", req.recruitedEpoch)
.detail("Saved", self.savedVersion)
.detail("BackupEpoch", req.backupEpoch);
try {
// hfu5: savedVersion is updated but not to the latest updated by noop mode.
// it should be updated in the next lines below
addActor.send(checkRemoved(db, req.recruitedEpoch, &self));
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) {
addActor.send(monitorBackupProgress(&self));
}
addActor.send(monitorWorkerPause(&self));

TraceEvent("BackupWorkerWaitKeyBeforeMonitor", self.myId)
.detail("Saved", self.savedVersion)
.detail("ExitEarly", self.exitEarly);
// Check if backup key is present to avoid race between this check and
// noop pop as well as upload data: pop or skip upload before knowing
// there are backup keys. Set the "exitEarly" flag if needed.
bool present = wait(monitorBackupStartedKeyChanges(&self, true, false));
TraceEvent("BackupWorkerWaitKey", self.myId).detail("Present", present).detail("ExitEarly", self.exitEarly);
TraceEvent("BackupWorkerWaitKey", self.myId)
.detail("Present", present)
.detail("Saved", self.savedVersion)
.detail("ExitEarly", self.exitEarly);

pull = self.exitEarly ? Void() : monitorBackupKeyOrPullData(&self, present);
addActor.send(pull);
Expand Down
6 changes: 6 additions & 0 deletions fdbserver/QuietDatabase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,12 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
auto check = checker.startIteration(phase);

std::string evtType = "QuietDatabase" + phase;
if (tLogQueueInfo.get().second > maxPoppedVersionLag) {
TraceEvent("Hfu5MaxTLogPopVersionLag")
.detail("Value", tLogQueueInfo.get().second)
.detail("Threshold", maxPoppedVersionLag)
.log();
}
TraceEvent evt(evtType.c_str());
check.add(evt, "DataInFlight", dataInFlight.get(), dataInFlightGate)
.add(evt, "MaxTLogQueueSize", tLogQueueInfo.get().first, maxTLogQueueGate)
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/workloads/ConsistencyCheck.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ struct ConsistencyCheckWorkload : TestWorkload {

try {
wait(timeoutError(quietDatabase(cx, self->dbInfo, "ConsistencyCheckStart", 0, 1e5, 0, 0),
self->quiescentWaitTimeout)); // FIXME: should be zero?
self->quiescentWaitTimeout)); // FIXME: should be zero?
if (g_network->isSimulated()) {
g_simulator->quiesced = true;
TraceEvent("ConsistencyCheckQuiesced").detail("Quiesced", g_simulator->quiesced);
Expand Down