Skip to content

Commit

Permalink
MB-50598: Fix various errors in NexusKVStore scanning
Browse files Browse the repository at this point in the history
Originally motivated by a segfault that we hit when we find during
a NexusKVStore scan that all of the callbacks made by the primary
have been purged from the secondary KVStore I wrote some additional
tests that pause after every item. These extra tests found various
similar errors during scanning where we fail to deal with cases
where items are below the purge seqno during a scan and we pause at
interesting points in time. In particular I observed errors where the
lastReadSeqnos of scans were different, and failures in callbacks when
we could not find items.

Change-Id: I69c3b9041fe9a39b5e1393115ef02db00fc259dc
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/169642
Reviewed-by: Richard de Mellow <richard.demellow@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
BenHuddleston committed Feb 3, 2022
1 parent 439ae07 commit c76dd75
Show file tree
Hide file tree
Showing 2 changed files with 306 additions and 58 deletions.
200 changes: 155 additions & 45 deletions engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1962,16 +1962,41 @@ class NexusSecondaryScanCallback : public StatusCallback<GetValue> {
}

void callback(GetValue& val) override {
// We should have an invocation for the primary
Expects(!primaryCallbacks.empty());
if (primaryCallbacks.empty() &&
static_cast<uint64_t>(val.item->getBySeqno()) <=
kvstore.getPurgeSeqno(vbid)) {
// primaryCallbacks could be empty if we're below the purge seqno
// and paused in an inconvenient place. Just return no_memory
// because we don't want the secondary scanning farther than the
// primary did and getting out of sync
setStatus(cb::engine_errc::no_memory);
kvstore.skippedChecksDueToPurging++;
return;
}

while (static_cast<uint64_t>(
// Pop anything the primary visited that is under the purge seqno if
// it's not our item as the secondary may have purged something
while (!primaryCallbacks.empty() &&
static_cast<uint64_t>(
primaryCallbacks.front().first.getBySeqno()) <=
kvstore.getPurgeSeqno(vbid) &&
!kvstore.compareItem(primaryCallbacks.front().first,
*val.item)) {
kvstore.skippedChecksDueToPurging++;
primaryCallbacks.pop_front();
kvstore.skippedChecksDueToPurging++;
}

if (primaryCallbacks.empty()) {
// Shouldn't happen provided that the KVStores are in sync, but we
// don't want to use primaryCallbacks.front() below if it is empty
// or we'd segfault.
auto msg = fmt::format(
"NexusSecondaryScanCallback::callback: {} key:{} scanned "
"item:{} but did not find primaryCallback",
vbid,
cb::UserData(val.item->getKey().to_string()),
*val.item);
kvstore.handleError(msg, vbid);
}

auto& [primaryVal, primaryStatus] = primaryCallbacks.front();
Expand Down Expand Up @@ -2061,22 +2086,88 @@ class NexusSecondaryCacheLookup : public StatusCallback<CacheLookup> {
}

void callback(CacheLookup& val) override {
// We should have an invocation for the primary
// primaryCallbacks could be empty if we're below the purge seqno
// and paused in an inconvenient place. Just return no_memory
// because we don't want the secondary scanning farther than the
// primary did and getting out of sync
if (primaryCallbacks.empty() &&
static_cast<uint64_t>(val.getBySeqno()) <=
kvstore.getPurgeSeqno(vbid)) {
setStatus(cb::engine_errc::no_memory);
kvstore.skippedChecksDueToPurging++;
return;
}

// We should have at least one invocation because we filter out when we
// forward on the callback so we should see every invocation here
// regardless of status.
Expects(!primaryCallbacks.empty());

auto [primaryVal, primaryStatus] = primaryCallbacks.front();
// Special case where every item seen by the primary scan has been
// purged by the secondary already. We need to stop now to keep the scan
// in-sync.
for (auto itr = primaryCallbacks.rbegin();
itr != primaryCallbacks.rend();
itr++) {
if (itr->second == cb::engine_errc::success) {
if (static_cast<uint64_t>(itr->first.getBySeqno()) <=
kvstore.getPurgeSeqno(vbid) &&
itr->first.getBySeqno() < val.getBySeqno()) {
// We want to stop the scan now, but we need to set
// the last read seqno to resume from the same point as the
// primary
primaryCallbacks.clear();
setStatus(cb::engine_errc::no_memory);
return;
} else {
break;
};
}
}

// We can't compare anything below the purge seqno (may have been purged
// from the secondary) so remove all the invocations below the purge
// seqno before starting.
// Remove anything under the purge seqno from the start of
// primaryCallbacks if it's not the item that the secondary has made
// this callback for
auto [primaryVal, primaryStatus] = primaryCallbacks.front();
while (static_cast<uint64_t>(primaryVal.getBySeqno()) <=
kvstore.getPurgeSeqno(vbid) &&
primaryVal != val) {
primaryCallbacks.pop_front();

// Nothing to read here...
if (primaryCallbacks.empty()) {
break;
}

std::tie(primaryVal, primaryStatus) = primaryCallbacks.front();
kvstore.skippedChecksDueToPurging++;
}

if (primaryCallbacks.empty()) {
// If primary has purged more than secondary then we may see that
// primaryCallbacks is now empty. Pause the scan now rather than
// continue so that we can pick up more primary items and deal with
// this later. If we continued now we'd skip keys for the secondary
// that the primary may see.
if (static_cast<uint64_t>(val.getBySeqno()) <=
kvstore.getPurgeSeqno(vbid)) {
kvstore.skippedChecksDueToPurging++;
setStatus(cb::engine_errc::no_memory);
return;
}

// Shouldn't happen provided that the KVStores are in sync, but we
// don't want to use primaryCallbacks.front() below if it is empty
// or we'd segfault.
auto msg = fmt::format(
"NexusSecondaryCacheCallback::callback: {} key:{} scanned "
"item seqno:{} but did not find primaryCallback",
vbid,
cb::UserData(val.getKey().to_string()),
val.getBySeqno());
kvstore.handleError(msg, vbid);
}

if (primaryVal != val) {
if (static_cast<uint64_t>(val.getBySeqno()) <=
kvstore.getPurgeSeqno(vbid)) {
Expand Down Expand Up @@ -2387,13 +2478,20 @@ scan_error_t NexusKVStore::scan(BySeqnoScanContext& ctx) const {
}

if (primaryCtx.lastReadSeqno != secondaryCtx.lastReadSeqno) {
auto msg = fmt::format(
"NexusKVStore::scan: {}: last read seqno not "
"equal primary:{} secondary:{}",
ctx.vbid,
primaryCtx.lastReadSeqno,
secondaryCtx.lastReadSeqno);
handleError(msg, ctx.vbid);
if (static_cast<uint64_t>(primaryCtx.lastReadSeqno) >
getPurgeSeqno(ctx.vbid) &&
static_cast<uint64_t>(secondaryCtx.lastReadSeqno) >
getPurgeSeqno(ctx.vbid)) {
auto msg = fmt::format(
"NexusKVStore::scan: {}: last read seqno not "
"equal primary:{} secondary:{}",
ctx.vbid,
primaryCtx.lastReadSeqno,
secondaryCtx.lastReadSeqno);
handleError(msg, ctx.vbid);
} else {
skippedChecksDueToPurging++;
}
}

auto& primaryScanCallback = dynamic_cast<NexusPrimaryScanCallback&>(
Expand All @@ -2402,40 +2500,52 @@ scan_error_t NexusKVStore::scan(BySeqnoScanContext& ctx) const {
primaryCtx.getCacheCallback());

if (!primaryScanCallback.callbacks.empty()) {
auto msg = fmt::format(
"NexusKVStore::scan: {}: {} primary scan "
"callbacks were not matched by secondary scan "
"callbacks ",
ctx.vbid,
primaryScanCallback.callbacks.size());
for (auto& [item, status] : primaryScanCallback.callbacks) {
fmt::format_to(std::back_inserter(msg),
"[key:'{}',seqno:{},deleted:{},status:{}],",
cb::UserData(item.getKey().to_string()),
item.getBySeqno(),
item.isDeleted(),
status);
auto firstItem = primaryScanCallback.callbacks.back().first;
if (static_cast<uint64_t>(firstItem.getBySeqno()) >
getPurgeSeqno(ctx.vbid)) {
auto msg = fmt::format(
"NexusKVStore::scan: {}: {} primary scan "
"callbacks were not matched by secondary scan "
"callbacks ",
ctx.vbid,
primaryScanCallback.callbacks.size());
for (auto& [item, status] : primaryScanCallback.callbacks) {
fmt::format_to(std::back_inserter(msg),
"[key:'{}',seqno:{},deleted:{},status:{}],",
cb::UserData(item.getKey().to_string()),
item.getBySeqno(),
item.isDeleted(),
status);
}
msg.pop_back();
handleError(msg, ctx.vbid);
} else {
skippedChecksDueToPurging++;
}
msg.pop_back();
handleError(msg, ctx.vbid);
}

if (!primaryCacheLookup.callbacks.empty()) {
auto msg = fmt::format(
"NexusKVStore::scan: {}: {} primary cache "
"lookups were not matched by secondary cache "
"lookups ",
ctx.vbid,
primaryCacheLookup.callbacks.size());
for (auto& [cl, status] : primaryCacheLookup.callbacks) {
fmt::format_to(std::back_inserter(msg),
"[key:'{}',seqno:{},status:{}],",
cb::UserData(cl.getKey().to_string()),
cl.getBySeqno(),
status);
auto firstItem = primaryCacheLookup.callbacks.back().first;
if (static_cast<uint64_t>(firstItem.getBySeqno()) >
getPurgeSeqno(ctx.vbid)) {
auto msg = fmt::format(
"NexusKVStore::scan: {}: {} primary cache "
"lookups were not matched by secondary cache "
"lookups ",
ctx.vbid,
primaryCacheLookup.callbacks.size());
for (auto& [cl, status] : primaryCacheLookup.callbacks) {
fmt::format_to(std::back_inserter(msg),
"[key:'{}',seqno:{},status:{}],",
cb::UserData(cl.getKey().to_string()),
cl.getBySeqno(),
status);
}
msg.pop_back();
handleError(msg, ctx.vbid);
} else {
skippedChecksDueToPurging++;
}
msg.pop_back();
handleError(msg, ctx.vbid);
}

// lastReadSeqno gets checked by backfill so we need to set it in the
Expand Down
Loading

0 comments on commit c76dd75

Please sign in to comment.