Skip to content

Commit

Permalink
MB-32670 [3/5]: Enhance BGFetcher to support fetching compressed values
Browse files Browse the repository at this point in the history
Update the BGFetcher and KVStore::getMulti code to request that an
item is fetched in compressed form if possible, by expanding the
current boolean GetMetaOnly parameter to a ValueFilter ternary:
KEYS_ONLY, VALUE_COMPRESSED or VALUE_DECOMPRESSED.

Encapsulate the vb_bgfetch_item_ctx_t class so the logic of selecting
the filter for a given key is handled by the class itself - also
remove unnecessary logic which was setting the value / status of a
bgFetch through the `value` pointer multiple times.

Note this functionality is currently unused in the front-end -
bgFetches for value currently only still uses VALUES_DECOMPRESSED.

Change-Id: I3d7b3ab1b09e1caab407051f7265ead47e20d245
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/141352
Reviewed-by: Trond Norbye <trond.norbye@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
daverigby committed Jan 8, 2021
1 parent e4d07a2 commit 88d935e
Show file tree
Hide file tree
Showing 15 changed files with 280 additions and 124 deletions.
2 changes: 1 addition & 1 deletion engines/ep/src/bgfetcher.cc
Expand Up @@ -93,7 +93,7 @@ size_t BgFetcher::doFetch(Vbid vbId, vb_bgfetch_queue_t& itemsToFetch) {
auto& key = fetch.first;
const vb_bgfetch_item_ctx_t& bg_item_ctx = fetch.second;

for (const auto& itm : bg_item_ctx.bgfetched_list) {
for (const auto& itm : bg_item_ctx.getRequests()) {
// We don't want to transfer ownership of itm here as we clean it
// up at the end of this method in clearItems()
fetchedItems.push_back(std::make_pair(key, itm.get()));
Expand Down
14 changes: 5 additions & 9 deletions engines/ep/src/couch-kvstore/couch-kvstore.cc
Expand Up @@ -3138,24 +3138,20 @@ static int getMultiCallback(Db* db, DocInfo* docinfo, void* ctx) {
}

vb_bgfetch_item_ctx_t& bg_itm_ctx = (*qitr).second;
const auto filter = bg_itm_ctx.isMetaOnly == GetMetaOnly::Yes
? ValueFilter::KEYS_ONLY
: ValueFilter::VALUES_DECOMPRESSED;

const auto valueFilter = bg_itm_ctx.getValueFilter();
couchstore_error_t errCode = cbCtx->cks.fetchDoc(
db, docinfo, bg_itm_ctx.value, cbCtx->vbId, filter);
if (errCode != COUCHSTORE_SUCCESS && (filter != ValueFilter::KEYS_ONLY)) {
db, docinfo, bg_itm_ctx.value, cbCtx->vbId, valueFilter);
if (errCode != COUCHSTORE_SUCCESS &&
(valueFilter != ValueFilter::KEYS_ONLY)) {
st.numGetFailure++;
}

bg_itm_ctx.value.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));

bool return_val_ownership_transferred = false;
for (auto& fetch : bg_itm_ctx.bgfetched_list) {
for (auto& fetch : bg_itm_ctx.getRequests()) {
return_val_ownership_transferred = true;
// populate return value for remaining fetch items with the
// same seqid
fetch->value = &bg_itm_ctx.value;
st.readTimeHisto.add(
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - fetch->initTime));
Expand Down
35 changes: 14 additions & 21 deletions engines/ep/src/ep_vb.cc
Expand Up @@ -118,7 +118,8 @@ ENGINE_ERROR_CODE EPVBucket::completeBGFetchForSingleItem(
getState() == vbucket_state_replica ? ForGetReplicaOp::Yes
: ForGetReplicaOp::No);
auto* v = res.storedValue;
if (fetched_item.metaDataOnly()) {
switch (fetched_item.filter) {
case ValueFilter::KEYS_ONLY:
if (status == ENGINE_SUCCESS) {
if (v && v->isTempInitialItem()) {
ht.unlocked_restoreMeta(
Expand All @@ -140,7 +141,10 @@ ENGINE_ERROR_CODE EPVBucket::completeBGFetchForSingleItem(
status = ENGINE_SUCCESS;
}
}
} else {
++stats.bg_meta_fetched;
break;
case ValueFilter::VALUES_DECOMPRESSED:
case ValueFilter::VALUES_COMPRESSED: {
bool restore = false;
if (v && v->isResident()) {
status = ENGINE_SUCCESS;
Expand Down Expand Up @@ -194,15 +198,12 @@ ENGINE_ERROR_CODE EPVBucket::completeBGFetchForSingleItem(
status = ENGINE_TMPFAIL;
}
}
++stats.bg_fetched;
break;
}
}
} // locked scope ends

if (fetched_item.metaDataOnly()) {
++stats.bg_meta_fetched;
} else {
++stats.bg_fetched;
}

const auto fetchEnd = std::chrono::steady_clock::now();
updateBGStats(fetched_item.initTime, startTime, fetchEnd);

Expand Down Expand Up @@ -345,7 +346,7 @@ void EPVBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) {
size_t num_of_deleted_pending_fetches = 0;
for (auto& bgf : pendingBGFetches) {
vb_bgfetch_item_ctx_t& bg_itm_ctx = bgf.second;
for (auto& bgitem : bg_itm_ctx.bgfetched_list) {
for (auto& bgitem : bg_itm_ctx.getRequests()) {
bgitem->abort(e, ENGINE_NOT_MY_VBUCKET, toNotify);
++num_of_deleted_pending_fetches;
}
Expand Down Expand Up @@ -602,17 +603,7 @@ size_t EPVBucket::queueBGFetchItem(const DocKey& key,
DiskDocKey diskKey{key, /*pending*/ false};
LockHolder lh(pendingBGFetchesLock);
vb_bgfetch_item_ctx_t& bgfetch_itm_ctx = pendingBGFetches[diskKey];

if (bgfetch_itm_ctx.bgfetched_list.empty()) {
bgfetch_itm_ctx.isMetaOnly = GetMetaOnly::Yes;
}

if (!fetch->metaDataOnly()) {
bgfetch_itm_ctx.isMetaOnly = GetMetaOnly::No;
}

fetch->value = &bgfetch_itm_ctx.value;
bgfetch_itm_ctx.bgfetched_list.push_back(std::move(fetch));
bgfetch_itm_ctx.addBgFetch(std::move(fetch));

bgFetcher.addPendingVB(getId());
return pendingBGFetches.size();
Expand Down Expand Up @@ -749,9 +740,11 @@ void EPVBucket::bgFetch(const DocKey& key,
// reference if that's the case
// schedule to the current batch of background fetch of the given
// vbucket
const auto filter =
isMeta ? ValueFilter::KEYS_ONLY : ValueFilter::VALUES_DECOMPRESSED;
size_t bgfetch_size = queueBGFetchItem(
key,
std::make_unique<FrontEndBGFetchItem>(cookie, isMeta),
std::make_unique<FrontEndBGFetchItem>(cookie, filter),
getBgFetcher());
EP_LOG_DEBUG("Queued a background fetch, now at {}",
uint64_t(bgfetch_size));
Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/kvstore.h
Expand Up @@ -57,7 +57,7 @@ namespace Collections::VB {
struct PersistedStats;
} // namespace Collections::VB

struct vb_bgfetch_item_ctx_t;
class vb_bgfetch_item_ctx_t;
struct TransactionContext;

using vb_bgfetch_queue_t =
Expand Down
15 changes: 6 additions & 9 deletions engines/ep/src/magma-kvstore/magma-kvstore.cc
Expand Up @@ -834,14 +834,14 @@ void MagmaKVStore::getMulti(Vbid vbid, vb_bgfetch_queue_t& itms) {
reinterpret_cast<vb_bgfetch_item_ctx_t*>(op.UserContext);
bg_itm_ctx->value.setStatus(errCode);
if (found) {
const auto filter = bg_itm_ctx->isMetaOnly == GetMetaOnly::Yes
? ValueFilter::KEYS_ONLY
: ValueFilter::VALUES_DECOMPRESSED;
bg_itm_ctx->value =
makeGetValue(vbid, op.Key, metaSlice, valueSlice, filter);
bg_itm_ctx->value = makeGetValue(vbid,
op.Key,
metaSlice,
valueSlice,
bg_itm_ctx->getValueFilter());
GetValue* rv = &bg_itm_ctx->value;

for (auto& fetch : bg_itm_ctx->bgfetched_list) {
for (auto& fetch : bg_itm_ctx->getRequests()) {
fetch->value = rv;
st.readTimeHisto.add(
std::chrono::duration_cast<std::chrono::microseconds>(
Expand All @@ -862,9 +862,6 @@ void MagmaKVStore::getMulti(Vbid vbid, vb_bgfetch_queue_t& itms) {
status.String());
st.numGetFailure++;
}
for (auto& fetch : bg_itm_ctx->bgfetched_list) {
fetch->value->setStatus(errCode);
}
}
};

Expand Down
13 changes: 5 additions & 8 deletions engines/ep/src/rocksdb-kvstore/rocksdb-kvstore.cc
Expand Up @@ -715,17 +715,14 @@ void RocksDBKVStore::getMulti(Vbid vb, vb_bgfetch_queue_t& itms) {
&value);
if (s.ok()) {
it.second.value = makeGetValue(
vb, key, value, it.second.isMetaOnly == GetMetaOnly::No);
GetValue* rv = &it.second.value;
for (auto& fetch : it.second.bgfetched_list) {
fetch->value = rv;
}
vb,
key,
value,
it.second.getValueFilter() != ValueFilter::KEYS_ONLY);
++st.io_bg_fetch_docs_read;
st.io_bgfetch_doc_bytes += keySlice.size() + value.size();
} else {
for (auto& fetch : it.second.bgfetched_list) {
fetch->value->setStatus(ENGINE_KEY_ENOENT);
}
it.second.value.setStatus(ENGINE_KEY_ENOENT);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/vbucket.h
Expand Up @@ -58,7 +58,7 @@ class RollbackResult;
class FrontEndBGFetchItem;
struct VBQueueItemCtx;
struct vbucket_transition_state;
struct vb_bgfetch_item_ctx_t;
class vb_bgfetch_item_ctx_t;
using vb_bgfetch_queue_t =
std::unordered_map<DiskDocKey, vb_bgfetch_item_ctx_t>;

Expand Down
33 changes: 30 additions & 3 deletions engines/ep/src/vbucket_bgfetch_item.cc
Expand Up @@ -19,15 +19,15 @@

#include "ep_engine.h"
#include "ep_vb.h"
#include "kvstore.h"
#include "vbucket.h"
#include "vbucket_fwd.h"

FrontEndBGFetchItem::FrontEndBGFetchItem(
GetValue* value,
std::chrono::steady_clock::time_point initTime,
bool metaOnly,
ValueFilter filter,
const void* cookie)
: BGFetchItem(value, initTime), cookie(cookie), metaOnly(metaOnly) {
: BGFetchItem(initTime), cookie(cookie), filter(filter) {
auto* traceable = cookie2traceable(cookie);
if (traceable && traceable->isTracingEnabled()) {
NonBucketAllocationGuard guard;
Expand Down Expand Up @@ -72,3 +72,30 @@ void CompactionBGFetchItem::abort(
// expire simply won't be expired. The next op/compaction can expire the
// item if still required.
}

ValueFilter CompactionBGFetchItem::getValueFilter() const {
// Don't care about values here
return ValueFilter::KEYS_ONLY;
}

void vb_bgfetch_item_ctx_t::addBgFetch(
std::unique_ptr<BGFetchItem> itemToFetch) {
itemToFetch->value = &value;
bgfetched_list.push_back(std::move(itemToFetch));
}

ValueFilter vb_bgfetch_item_ctx_t::getValueFilter() const {
// Want to fetch the minimum amount of data:
// 1. If all requests against this key are meta only; fetch just metadata
// 2. If all requests are for compressed data, fetch compressed.
// 3. Otherwise fetch uncompressed value.
static_assert(ValueFilter::KEYS_ONLY < ValueFilter::VALUES_COMPRESSED);
static_assert(ValueFilter::VALUES_COMPRESSED <
ValueFilter::VALUES_DECOMPRESSED);

auto overallFilter = ValueFilter::KEYS_ONLY;
for (const auto& request : bgfetched_list) {
overallFilter = std::max(overallFilter, request->getValueFilter());
}
return overallFilter;
}
75 changes: 50 additions & 25 deletions engines/ep/src/vbucket_bgfetch_item.h
Expand Up @@ -20,22 +20,21 @@
#include "callbacks.h"
#include "diskdockey.h"
#include "item.h"
#include "objectregistry.h"
#include "trace_helpers.h"
#include "vbucket_fwd.h"

#include <list>
#include <unordered_map>

enum class GetMetaOnly;
enum class ValueFilter;

/**
* Base BGFetch context class
*/
class BGFetchItem {
public:
BGFetchItem(GetValue* value, std::chrono::steady_clock::time_point initTime)
: value(value), initTime(initTime) {
BGFetchItem(std::chrono::steady_clock::time_point initTime)
: initTime(initTime) {
}

virtual ~BGFetchItem() = default;
Expand Down Expand Up @@ -65,12 +64,14 @@ class BGFetchItem {
ENGINE_ERROR_CODE status,
std::map<const void*, ENGINE_ERROR_CODE>& toNotify) const = 0;

/// @returns The ValueFilter for this request.
virtual ValueFilter getValueFilter() const = 0;

/**
* @return Should we BGFetch just the metadata?
* Pointer to the result of the BGFetch. Set by the vb_bgfetch_item_ctx_t
* when this BGFetchItem is added to the set of outstanding items.
*/
virtual bool metaDataOnly() const = 0;

GetValue* value;
GetValue* value{nullptr};
const std::chrono::steady_clock::time_point initTime;
};

Expand All @@ -80,14 +81,13 @@ class BGFetchItem {
*/
class FrontEndBGFetchItem : public BGFetchItem {
public:
FrontEndBGFetchItem(const void* cookie, bool metaOnly)
FrontEndBGFetchItem(const void* cookie, ValueFilter filter)
: FrontEndBGFetchItem(
nullptr, std::chrono::steady_clock::now(), metaOnly, cookie) {
std::chrono::steady_clock::now(), filter, cookie) {
}

FrontEndBGFetchItem(GetValue* value,
std::chrono::steady_clock::time_point initTime,
bool metaOnly,
FrontEndBGFetchItem(std::chrono::steady_clock::time_point initTime,
ValueFilter filter,
const void* cookie);

void complete(EventuallyPersistentEngine& engine,
Expand All @@ -100,13 +100,13 @@ class FrontEndBGFetchItem : public BGFetchItem {
ENGINE_ERROR_CODE status,
std::map<const void*, ENGINE_ERROR_CODE>& toNotify) const override;

bool metaDataOnly() const override {
return metaOnly;
ValueFilter getValueFilter() const override {
return filter;
}

const void* cookie;
cb::tracing::SpanId traceSpanId;
bool metaOnly;
ValueFilter filter;
};

/**
Expand All @@ -116,8 +116,7 @@ class FrontEndBGFetchItem : public BGFetchItem {
class CompactionBGFetchItem : public BGFetchItem {
public:
explicit CompactionBGFetchItem(const Item& item)
: BGFetchItem(nullptr /*GetValue*/, std::chrono::steady_clock::now()),
compactionItem(item) {
: BGFetchItem(std::chrono::steady_clock::now()), compactionItem(item) {
}

void complete(EventuallyPersistentEngine& engine,
Expand All @@ -130,10 +129,7 @@ class CompactionBGFetchItem : public BGFetchItem {
ENGINE_ERROR_CODE status,
std::map<const void*, ENGINE_ERROR_CODE>& toNotify) const override;

bool metaDataOnly() const override {
// Don't care about values here
return true;
}
ValueFilter getValueFilter() const override;

/**
* We copy the entire Item here because if we need to expire (delete) the
Expand All @@ -142,8 +138,37 @@ class CompactionBGFetchItem : public BGFetchItem {
Item compactionItem;
};

struct vb_bgfetch_item_ctx_t {
std::list<std::unique_ptr<BGFetchItem>> bgfetched_list;
GetMetaOnly isMetaOnly;
/**
* Tracks all outstanding requests for a single document to be fetched from
* disk.
* Multiple requests (from different clients) can be enqueued against a single
* key; the document will only be fetched from disk once; then the result
* will be returned to each client which requested it.
*/
class vb_bgfetch_item_ctx_t {
public:
using FetchList = std::list<std::unique_ptr<BGFetchItem>>;
/**
* Add a request to fetch an item from disk to the set of outstanding
* BGfetches.
*/
void addBgFetch(std::unique_ptr<BGFetchItem> itemToFetch);

/// @returns The list of outstanding fetch requests.
const FetchList& getRequests() const {
return bgfetched_list;
}

/**
* @returns the ValueFilter to be used when fetching the item for the set
* of outstanding fetch requests.
*/
ValueFilter getValueFilter() const;

/// Result the fetch.
GetValue value;

private:
/// List of outstanding requests for a given key.
FetchList bgfetched_list;
};

0 comments on commit 88d935e

Please sign in to comment.