Skip to content

Commit

Permalink
FDBCORE-4490: change pendingDeltaVersion from Version to NotifiedVers…
Browse files Browse the repository at this point in the history
…ion (#184)
  • Loading branch information
sfc-gh-chxu committed Apr 12, 2023
2 parents 0ff56ab + 68143d7 commit 8b8d4c1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 32 deletions.
63 changes: 33 additions & 30 deletions fdbserver/BlobWorker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
std::deque<std::pair<Version, Version>>& rollbacksInProgress,
std::deque<std::pair<Version, Version>>& rollbacksCompleted) {
Version cfRollbackVersion;
if (metadata->pendingDeltaVersion > rollbackVersion) {
if (metadata->pendingDeltaVersion.get() > rollbackVersion) {
// if we already started writing mutations to a delta or snapshot file with version > rollbackVersion,
// we need to rescind those delta file writes
ASSERT(!inFlightFiles.empty());
Expand Down Expand Up @@ -1877,7 +1877,8 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
inFlightFiles.pop_back();
toPop--;
}
metadata->pendingDeltaVersion = cfRollbackVersion;
metadata->pendingDeltaVersion = NotifiedVersion();
metadata->pendingDeltaVersion.set(cfRollbackVersion);
if (BW_DEBUG) {
fmt::print("[{0} - {1}) rollback discarding all {2} in-memory mutations",
metadata->keyRange.begin.printable(),
Expand Down Expand Up @@ -2099,7 +2100,7 @@ ACTOR Future<Void> forceFlushCleanup(Reference<BlobWorkerData> bwData, Reference
return Void();
}
wait(delay(cleanupDelay));
if (metadata->forceFlushVersion.get() < v && metadata->pendingDeltaVersion < v) {
if (metadata->forceFlushVersion.get() < v && metadata->pendingDeltaVersion.get() < v) {
metadata->forceFlushVersion.set(v);
++bwData->stats.forceFlushCleanups;
if (BW_DEBUG) {
Expand Down Expand Up @@ -2268,7 +2269,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
: success(inFlightFiles.back().future));

metadata->durableDeltaVersion.set(startVersion);
metadata->pendingDeltaVersion = startVersion;
metadata->pendingDeltaVersion.set(startVersion);
metadata->bufferedDeltaVersion = startVersion;
metadata->knownCommittedVersion = startVersion;
metadata->resetReadStats();
Expand Down Expand Up @@ -2523,7 +2524,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// FIXME: add counter for granule rollbacks and rollbacks skipped?
// explicitly check last delta in currentDeltas because lastVersion and
// bufferedDeltaVersion include empties
if (metadata->pendingDeltaVersion <= rollbackVersion &&
if (metadata->pendingDeltaVersion.get() <= rollbackVersion &&
(metadata->currentDeltas.empty() ||
metadata->currentDeltas.back().version <= rollbackVersion)) {
CODE_PROBE(true, "Granule ignoring rollback");
Expand Down Expand Up @@ -2688,7 +2689,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// The force flush contract is a version cannot be put in forceFlushVersion unless the change feed
// is already whenAtLeast that version
// FIXME: with non-uniform delta file sizes, we could potentially over-shoot target write amp
bool forceFlush = !forceFlushVersions.empty() && forceFlushVersions.back() > metadata->pendingDeltaVersion;
bool forceFlush =
!forceFlushVersions.empty() && forceFlushVersions.back() > metadata->pendingDeltaVersion.get();
bool doEarlyFlush = !metadata->currentDeltas.empty() && metadata->doEarlyReSnapshot();
CODE_PROBE(forceFlush, "Force flushing granule");
if ((processedAnyMutations &&
Expand Down Expand Up @@ -2716,7 +2718,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
if (!metadata->currentDeltas.empty()) {
ASSERT(lastDeltaVersion >= metadata->currentDeltas.back().version);
ASSERT(metadata->pendingDeltaVersion < metadata->currentDeltas.front().version);
ASSERT(metadata->pendingDeltaVersion.get() < metadata->currentDeltas.front().version);
} else {
ASSERT(forceFlush);
ASSERT(!forceFlushVersions.empty());
Expand All @@ -2736,7 +2738,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
// change feed is considered complete.
Optional<std::pair<KeyRange, UID>> oldChangeFeedDataComplete;
if (startState.splitParentGranule.present() &&
metadata->pendingDeltaVersion + 1 < startState.changeFeedStartVersion &&
metadata->pendingDeltaVersion.get() + 1 < startState.changeFeedStartVersion &&
lastDeltaVersion + 1 >= startState.changeFeedStartVersion) {
oldChangeFeedDataComplete = startState.splitParentGranule.get();
}
Expand All @@ -2759,7 +2761,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
startState.granuleID,
metadata->originalEpoch,
metadata->originalSeqno,
metadata->pendingDeltaVersion,
metadata->pendingDeltaVersion.get(),
lastDeltaVersion,
previousFuture,
waitVersionCommitted(bwData, metadata, lastDeltaVersion),
Expand All @@ -2786,8 +2788,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
InFlightFile(dfFuture, lastDeltaVersion, metadata->bufferedDeltaBytes, false, emptyDeltaFile));

// add new pending delta file
ASSERT(metadata->pendingDeltaVersion < lastDeltaVersion);
metadata->pendingDeltaVersion = lastDeltaVersion;
ASSERT(metadata->pendingDeltaVersion.get() < lastDeltaVersion);
metadata->pendingDeltaVersion.set(lastDeltaVersion);
ASSERT(metadata->bufferedDeltaVersion <= lastDeltaVersion);
metadata->bufferedDeltaVersion = lastDeltaVersion; // In case flush was forced at non-mutation version
metadata->bytesInNewDeltaFiles += metadata->bufferedDeltaBytes;
Expand Down Expand Up @@ -2836,7 +2838,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
"waiting for outstanding {5} files to finish\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->pendingDeltaVersion,
metadata->pendingDeltaVersion.get(),
metadata->bytesInNewDeltaFiles,
metadata->writeAmpTarget.getBytesBeforeCompact(),
inFlightFiles.size());
Expand All @@ -2856,9 +2858,10 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
} else {
previousFuture = Future<BlobFileIndex>(metadata->files.deltaFiles.back());
}
int64_t versionsSinceLastSnapshot = metadata->pendingDeltaVersion - metadata->pendingSnapshotVersion;
int64_t versionsSinceLastSnapshot =
metadata->pendingDeltaVersion.get() - metadata->pendingSnapshotVersion;
Future<BlobFileIndex> inFlightBlobSnapshot;
if (metadata->pendingDeltaVersion >= startState.changeFeedStartVersion) {
if (metadata->pendingDeltaVersion.get() >= startState.changeFeedStartVersion) {
inFlightBlobSnapshot = checkSplitAndReSnapshot(bwData,
bstore,
metadata,
Expand All @@ -2872,10 +2875,10 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->writeAmpTarget.decrease(metadata->bytesInNewDeltaFiles);
}
inFlightFiles.push_back(
InFlightFile(inFlightBlobSnapshot, metadata->pendingDeltaVersion, 0, true, false));
InFlightFile(inFlightBlobSnapshot, metadata->pendingDeltaVersion.get(), 0, true, false));
pendingSnapshots++;

metadata->pendingSnapshotVersion = metadata->pendingDeltaVersion;
metadata->pendingSnapshotVersion = metadata->pendingDeltaVersion.get();

// reset metadata
metadata->bytesInNewDeltaFiles = 0;
Expand Down Expand Up @@ -3606,7 +3609,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
fmt::print("WFV {0}) CF={1}, pendingD={2}, durableD={3}, pendingS={4}, durableS={5}\n",
v,
metadata->activeCFData.get()->getVersion(),
metadata->pendingDeltaVersion,
metadata->pendingDeltaVersion.get(),
metadata->durableDeltaVersion.get(),
metadata->pendingSnapshotVersion,
metadata->durableSnapshotVersion.get());
Expand All @@ -3616,7 +3619,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v

if (v <= metadata->activeCFData.get()->getVersion() &&
(v <= metadata->durableDeltaVersion.get() ||
metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion) &&
metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion.get()) &&
(v <= metadata->durableSnapshotVersion.get() ||
metadata->durableSnapshotVersion.get() == metadata->pendingSnapshotVersion)) {
CODE_PROBE(true, "Granule read not waiting");
Expand All @@ -3631,7 +3634,7 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
}

// wait for any pending delta and snapshot files as of the moment the change feed version caught up.
state Version pendingDeltaV = metadata->pendingDeltaVersion;
state Version pendingDeltaV = metadata->pendingDeltaVersion.get();
state Version pendingSnapshotV = metadata->pendingSnapshotVersion;

// If there are mutations that are no longer buffered but have not been
Expand All @@ -3654,10 +3657,10 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
// kick off another delta file and roll the mutations. In that case, we must return the new delta
// file instead of in memory mutations, so we wait for that delta file to complete

while (v > metadata->durableDeltaVersion.get() && metadata->pendingDeltaVersion > pendingDeltaV) {
while (v > metadata->durableDeltaVersion.get() && metadata->pendingDeltaVersion.get() > pendingDeltaV) {
CODE_PROBE(true, "Granule mutations flushed while waiting for files to complete");
Version waitVersion = std::min(v, metadata->pendingDeltaVersion);
pendingDeltaV = metadata->pendingDeltaVersion;
Version waitVersion = std::min(v, metadata->pendingDeltaVersion.get());
pendingDeltaV = metadata->pendingDeltaVersion.get();
wait(metadata->durableDeltaVersion.whenAtLeast(waitVersion));
}

Expand Down Expand Up @@ -3971,18 +3974,18 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// FIXME: do trivial key bounds here if key range is not fully contained in request key
// range
if (req.readVersion > metadata->durableDeltaVersion.get() && !metadata->currentDeltas.empty()) {
if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion) {
if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion.get()) {
fmt::print(
"real-time read [{0} - {1}) @ {2} doesn't have mutations!! durable={3}, pending={4}\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
req.readVersion,
metadata->durableDeltaVersion.get(),
metadata->pendingDeltaVersion);
metadata->pendingDeltaVersion.get());
}

// prune mutations based on begin version, if possible
ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion);
ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion.get());
MutationsAndVersionRef* mutationIt = metadata->currentDeltas.begin();
if (granuleBeginVersion > metadata->currentDeltas.back().version) {
CODE_PROBE(true, "beginVersion pruning all in-memory mutations");
Expand Down Expand Up @@ -5070,26 +5073,26 @@ ACTOR Future<Void> handleFlushGranuleReq(Reference<BlobWorkerData> self, FlushGr
// if delta file is already written after flush version, but that delta file did not compact,
// we have to write another one to trigger compaction
if (req.compactAfter && metadata->pendingSnapshotVersion < req.flushVersion &&
metadata->pendingDeltaVersion >= req.flushVersion) {
metadata->pendingDeltaVersion.get() >= req.flushVersion) {
CODE_PROBE(true, "Bumping granule force flush version to guarantee re-snapshot");
if (BW_DEBUG) {
fmt::print("BW {0} granule flush version [{1} - {2}) @ {3} increased to {4}\n",
self->id.toString().substr(0, 5),
req.granuleRange.begin.printable(),
req.granuleRange.end.printable(),
req.flushVersion,
metadata->pendingDeltaVersion + 1);
metadata->pendingDeltaVersion.get() + 1);
}

req.flushVersion = metadata->pendingDeltaVersion + 1;
req.flushVersion = metadata->pendingDeltaVersion.get() + 1;
}
if (req.compactAfter) {
metadata->forceCompactVersion = std::max(req.flushVersion, metadata->forceCompactVersion);
}

loop {
// force granule to flush at this version, and wait
if (req.flushVersion > metadata->pendingDeltaVersion) {
if (req.flushVersion > metadata->pendingDeltaVersion.get()) {
// first, wait for granule active
if (!metadata->activeCFData.get().isValid()) {
req.reply.sendError(wrong_shard_server());
Expand Down Expand Up @@ -5139,7 +5142,7 @@ ACTOR Future<Void> handleFlushGranuleReq(Reference<BlobWorkerData> self, FlushGr
}
}

if (req.flushVersion > metadata->pendingDeltaVersion &&
if (req.flushVersion > metadata->pendingDeltaVersion.get() &&
req.flushVersion > metadata->forceFlushVersion.get()) {
if (BW_DEBUG) {
fmt::print("BW {0} flushing granule [{1} - {2}) @ {3}: setting force flush version\n",
Expand Down
5 changes: 3 additions & 2 deletions fdbserver/include/fdbserver/BlobWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "fdbclient/BlobWorkerCommon.h"

#include "fdbclient/Notified.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "fdbserver/Knobs.h"

Expand Down Expand Up @@ -90,7 +91,7 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {

// for client to know when it is safe to read a certain version and from where (check waitForVersion)
Version bufferedDeltaVersion; // largest delta version in currentDeltas (including empty versions)
Version pendingDeltaVersion = 0; // largest version in progress writing to s3/fdb
NotifiedVersion pendingDeltaVersion = NotifiedVersion(0); // largest version in progress writing to s3/fdb
NotifiedVersion durableDeltaVersion; // largest version persisted in s3/fdb
NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots
Version pendingSnapshotVersion = 0;
Expand Down Expand Up @@ -132,7 +133,7 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {

inline bool doEarlyReSnapshot() {
return runRDC.isSet() ||
(forceCompactVersion <= pendingDeltaVersion && forceCompactVersion > pendingSnapshotVersion);
(forceCompactVersion <= pendingDeltaVersion.get() && forceCompactVersion > pendingSnapshotVersion);
}
};

Expand Down

0 comments on commit 8b8d4c1

Please sign in to comment.