Skip to content

Commit

Permalink
Adding useful debugging trace events
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jslocum committed Mar 29, 2022
1 parent da0673c commit 7fc6dfa
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
1 change: 1 addition & 0 deletions fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7372,6 +7372,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
fmt::print(
"BG Mapping for [{0} - %{1}) too large!\n", keyRange.begin.printable(), keyRange.end.printable());
}
TraceEvent(SevWarn, "BGMappingTooLarge").detail("Range", range).detail("Max", 1000);
throw unsupported_operation();
}
ASSERT(!blobGranuleMapping.more && blobGranuleMapping.size() < CLIENT_KNOBS->TOO_MANY);
Expand Down
38 changes: 35 additions & 3 deletions fdbserver/BlobManager.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,33 @@ struct SplitEvaluation {
: epoch(epoch), seqno(seqno), inProgress(inProgress) {}
};

struct BlobManagerStats {
CounterCollection cc;

// FIXME: pruning stats

Counter granuleSplits;
Counter granuleWriteHotSplits;
Future<Void> logger;

// Current stats maintained for a given blob worker process
explicit BlobManagerStats(UID id, double interval, std::unordered_map<UID, BlobWorkerInterface>* workers)
: cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc),
granuleWriteHotSplits("GranuleWriteHotSplits", cc) {
specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); });
logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics");
}
};

struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
UID id;
Database db;
Optional<Key> dcId;
PromiseStream<Future<Void>> addActor;
Promise<Void> doLockCheck;

BlobManagerStats stats;

Reference<BackupContainerFileSystem> bstore;

std::unordered_map<UID, BlobWorkerInterface> workersById;
Expand Down Expand Up @@ -246,8 +266,9 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
PromiseStream<RangeAssignment> rangesToAssign;

BlobManagerData(UID id, Database db, Optional<Key> dcId)
: id(id), db(db), dcId(dcId), knownBlobRanges(false, normalKeys.end),
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {}
: id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById),
knownBlobRanges(false, normalKeys.end), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
recruitingStream(0) {}
};

ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<BlobManagerData> bmData,
Expand Down Expand Up @@ -753,6 +774,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
}

for (KeyRangeRef range : rangesToRemove) {
TraceEvent("ClientBlobRangeRemoved", bmData->id).detail("Range", range);
if (BM_DEBUG) {
fmt::print(
"BM Got range to revoke [{0} - {1})\n", range.begin.printable(), range.end.printable());
Expand All @@ -768,6 +790,7 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
state std::vector<Future<Standalone<VectorRef<KeyRef>>>> splitFutures;
// Divide new ranges up into equal chunks by using SS byte sample
for (KeyRangeRef range : rangesToAdd) {
TraceEvent("ClientBlobRangeAdded", bmData->id).detail("Range", range);
splitFutures.push_back(splitRange(bmData, range, false));
}

Expand Down Expand Up @@ -1096,6 +1119,11 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
splitVersion);
}

++bmData->stats.granuleSplits;
if (writeHot) {
++bmData->stats.granuleWriteHotSplits;
}

// transaction committed, send range assignments
// range could have been moved since split eval started, so just revoke from whoever has it
RangeAssignment raRevoke;
Expand Down Expand Up @@ -1182,6 +1210,8 @@ ACTOR Future<Void> killBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerI
// Remove it from workersById also since otherwise that worker addr will remain excluded
// when we try to recruit new blob workers.

TraceEvent("KillBlobWorker", bmData->id).detail("WorkerId", bwId);

if (registered) {
bmData->deadWorkers.insert(bwId);
bmData->workerStats.erase(bwId);
Expand Down Expand Up @@ -1838,7 +1868,7 @@ ACTOR Future<Void> recoverBlobManager(Reference<BlobManagerData> bmData) {
TraceEvent("BlobManagerRecovered", bmData->id)
.detail("Epoch", bmData->epoch)
.detail("Duration", now() - recoveryStartTime)
.detail("Granules", bmData->workerAssignments.size())
.detail("Granules", bmData->workerAssignments.size()) // TODO this includes un-set ranges, so it is inaccurate
.detail("Assigned", explicitAssignments)
.detail("Revoked", outOfDateAssignments.size());

Expand Down Expand Up @@ -2089,6 +2119,8 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, U
}
}

// FIXME: trace events for pruning

/*
* Deletes all files pertaining to the granule with id granuleId and
* also removes the history entry for this granule from the system keyspace
Expand Down
31 changes: 22 additions & 9 deletions fdbserver/BlobWorker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted<BlobWorkerData> {
if (BW_DEBUG) {
fmt::print("BW {0} found new manager epoch {1}\n", id.toString(), currentManagerEpoch);
}
TraceEvent(SevDebug, "BlobWorkerFoundNewManager", id).detail("Epoch", epoch);
}

return true;
Expand Down Expand Up @@ -735,7 +736,7 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
Future<Void> streamFuture =
tr->getTransaction().getRangeStream(rowsStream, metadata->keyRange, GetRangeLimits(), Snapshot::True);
wait(streamFuture && success(snapshotWriter));
TraceEvent("BlobGranuleSnapshotFile", bwData->id)
TraceEvent(SevDebug, "BlobGranuleSnapshotFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", readVersion);
DEBUG_KEY_RANGE("BlobWorkerFDBSnapshot", readVersion, metadata->keyRange, bwData->id);
Expand All @@ -759,7 +760,8 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
wait(tr->onError(e));
retries++;
TEST(true); // Granule initial snapshot failed
TraceEvent(SevWarn, "BlobGranuleInitialSnapshotRetry", bwData->id)
// FIXME: why can't we supress error event?
TraceEvent(retries < 10 ? SevDebug : SevWarn, "BlobGranuleInitialSnapshotRetry", bwData->id)
.error(err)
.detail("Granule", metadata->keyRange)
.detail("Count", retries);
Expand Down Expand Up @@ -883,7 +885,7 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
metadata->bytesInNewDeltaFiles);
}

TraceEvent("BlobGranuleSnapshotCheck", bwData->id)
TraceEvent(SevDebug, "BlobGranuleSnapshotCheck", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", reSnapshotVersion);

Expand Down Expand Up @@ -960,7 +962,7 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
metadata->keyRange.end.printable(),
bytesInNewDeltaFiles);
}
TraceEvent("BlobGranuleSnapshotFile", bwData->id)
TraceEvent(SevDebug, "BlobGranuleSnapshotFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", metadata->durableDeltaVersion.get());

Expand Down Expand Up @@ -1540,7 +1542,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
bwData->id.toString().substr(0, 5).c_str(),
deltas.version,
rollbackVersion);
TraceEvent(SevWarn, "GranuleRollback", bwData->id)
TraceEvent(SevDebug, "GranuleRollback", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", deltas.version)
.detail("RollbackVersion", rollbackVersion);
Expand Down Expand Up @@ -1654,7 +1656,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
lastDeltaVersion,
oldChangeFeedDataComplete.present() ? ". Finalizing " : "");
}
TraceEvent("BlobGranuleDeltaFile", bwData->id)
TraceEvent(SevDebug, "BlobGranuleDeltaFile", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("Version", lastDeltaVersion);

Expand Down Expand Up @@ -1831,13 +1833,13 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}

if (e.code() == error_code_granule_assignment_conflict) {
TraceEvent(SevInfo, "GranuleAssignmentConflict", bwData->id)
TraceEvent("GranuleAssignmentConflict", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("GranuleID", startState.granuleID);
return Void();
}
if (e.code() == error_code_change_feed_popped) {
TraceEvent(SevInfo, "GranuleGotChangeFeedPopped", bwData->id)
TraceEvent("GranuleChangeFeedPopped", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("GranuleID", startState.granuleID);
return Void();
Expand Down Expand Up @@ -2579,7 +2581,16 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
info.changeFeedStartVersion = tr.getCommittedVersion();
}

TraceEvent("GranuleOpen", bwData->id).detail("Granule", req.keyRange);
TraceEvent openEv("GranuleOpen", bwData->id);
openEv.detail("GranuleID", info.granuleID)
.detail("Granule", req.keyRange)
.detail("Epoch", req.managerEpoch)
.detail("Seqno", req.managerSeqno)
.detail("CFStartVersion", info.changeFeedStartVersion)
.detail("PreviousDurableVersion", info.previousDurableVersion);
if (info.parentGranule.present()) {
openEv.detail("ParentGranuleID", info.parentGranule.get().second);
}

return info;
} catch (Error& e) {
Expand Down Expand Up @@ -2900,6 +2911,7 @@ ACTOR Future<Void> handleRangeRevoke(Reference<BlobWorkerData> bwData, RevokeBlo

ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWorkerInterface interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
TraceEvent("BlobWorkerRegister", bwData->id);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Expand All @@ -2920,6 +2932,7 @@ ACTOR Future<Void> registerBlobWorker(Reference<BlobWorkerData> bwData, BlobWork
if (BW_DEBUG) {
fmt::print("Registered blob worker {}\n", interf.id().toString());
}
TraceEvent("BlobWorkerRegistered", bwData->id);
return Void();
} catch (Error& e) {
if (BW_DEBUG) {
Expand Down

0 comments on commit 7fc6dfa

Please sign in to comment.