Skip to content

Commit

Permalink
MB-47308: Increase the number of functions using MemoryDomain::Secondary
Browse files Browse the repository at this point in the history
Change a number of functions that returned heap-memory to now return
a slightly different type which knows to destroy memory in the correct
domain.

Change-Id: I0e9fd9967143316c733df6d230b5fd7c7dc18de6
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/157606
Tested-by: Jim Walker <jim@couchbase.com>
Reviewed-by: Ben Huddleston <ben.huddleston@couchbase.com>
  • Loading branch information
jimwwalker committed Sep 7, 2021
1 parent 52fe578 commit 497bfd6
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 87 deletions.
95 changes: 53 additions & 42 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc
Expand Up @@ -28,6 +28,7 @@
#include <executor/executorpool.h>
#include <mcbp/protocol/unsigned_leb128.h>
#include <nlohmann/json.hpp>
#include <platform/cb_arena_malloc.h>
#include <statistics/cbstat_collector.h>
#include <utilities/logtags.h>
#include <algorithm>
Expand Down Expand Up @@ -549,19 +550,21 @@ MagmaKVStore::MagmaKVStore(MagmaKVStoreConfig& configuration)
configuration.getMagmaFragmentationPercentage());
calculateAndSetMagmaThreads();

auto kvstoreList = magma->GetKVStoreList();
for (auto& kvid : kvstoreList) {
// While loading the vbstate cache, set the kvstoreRev.
status = loadVBStateCache(Vbid(kvid), true);
++st.numLoadedVb;
if (!status) {
throw std::logic_error("MagmaKVStore vbstate vbid:" +
std::to_string(kvid) +
" not found."
" Status:" +
status.String());
}
}
magma->executeOnKVStoreList(
[this](const std::vector<magma::Magma::KVStoreID>& kvstores) {
cb::UseArenaMallocPrimaryDomain domainGuard;
for (auto kvid : kvstores) {
auto status = loadVBStateCache(Vbid(kvid), true);
++st.numLoadedVb;
if (!status) {
throw std::logic_error("MagmaKVStore vbstate vbid:" +
std::to_string(kvid) +
" not found."
" Status:" +
status.String());
}
}
});
}

MagmaKVStore::~MagmaKVStore() {
Expand Down Expand Up @@ -1319,35 +1322,43 @@ int MagmaKVStore::saveDocs(MagmaKVStoreTransactionContext& txnCtx,
return status.ErrorCode();
}

MagmaScanContext::MagmaScanContext(
std::unique_ptr<StatusCallback<GetValue>> cb,
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vb,
std::unique_ptr<KVFileHandle> handle,
int64_t start,
int64_t end,
uint64_t purgeSeqno,
DocumentFilter _docFilter,
ValueFilter _valFilter,
uint64_t _documentCount,
const vbucket_state& vbucketState,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
std::unique_ptr<magma::Magma::SeqIterator> itr)
: BySeqnoScanContext(std::move(cb),
std::move(cl),
vb,
std::move(handle),
start,
end,
purgeSeqno,
_docFilter,
_valFilter,
_documentCount,
vbucketState,
droppedCollections),
itr(std::move(itr)) {
}
/**
* MagmaScanContext is BySeqnoScanContext with the magma
* iterator added.
*/
class MagmaScanContext : public BySeqnoScanContext {
public:
MagmaScanContext(std::unique_ptr<StatusCallback<GetValue>> cb,
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vb,
std::unique_ptr<KVFileHandle> handle,
int64_t start,
int64_t end,
uint64_t purgeSeqno,
DocumentFilter _docFilter,
ValueFilter _valFilter,
uint64_t _documentCount,
const vbucket_state& vbucketState,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
DomainAwareUniquePtr<magma::Magma::SeqIterator> itr)
: BySeqnoScanContext(std::move(cb),
std::move(cl),
vb,
std::move(handle),
start,
end,
purgeSeqno,
_docFilter,
_valFilter,
_documentCount,
vbucketState,
droppedCollections),
itr(std::move(itr)) {
}

DomainAwareUniquePtr<magma::Magma::SeqIterator> itr{nullptr};
};

std::unique_ptr<BySeqnoScanContext> MagmaKVStore::initBySeqnoScanContext(
std::unique_ptr<StatusCallback<GetValue>> cb,
Expand Down
24 changes: 0 additions & 24 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h
Expand Up @@ -40,30 +40,6 @@ struct kvstats_ctx;
struct MagmaKVStoreTransactionContext;
struct vbucket_state;

/**
* MagmaScanContext is BySeqnoScanContext with the magma
* iterator added.
*/
class MagmaScanContext : public BySeqnoScanContext {
public:
MagmaScanContext(std::unique_ptr<StatusCallback<GetValue>> cb,
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vb,
std::unique_ptr<KVFileHandle> handle,
int64_t start,
int64_t end,
uint64_t purgeSeqno,
DocumentFilter _docFilter,
ValueFilter _valFilter,
uint64_t _documentCount,
const vbucket_state& vbucketState,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
std::unique_ptr<magma::Magma::SeqIterator> itr);

std::unique_ptr<magma::Magma::SeqIterator> itr{nullptr};
};

/**
* A persistence store based on magma.
*/
Expand Down
49 changes: 32 additions & 17 deletions engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc
Expand Up @@ -24,6 +24,19 @@ DomainAwareFetchBuffer::~DomainAwareFetchBuffer() {
buffer.reset();
}

template <>
void DomainAwareDelete<magma::UserStats>::operator()(magma::UserStats* p) {
cb::UseArenaMallocSecondaryDomain domainGuard;
delete p;
}

template <>
void DomainAwareDelete<magma::Magma::SeqIterator>::operator()(
magma::Magma::SeqIterator* p) {
cb::UseArenaMallocSecondaryDomain domainGuard;
delete p;
}

MagmaMemoryTrackingProxy::MagmaMemoryTrackingProxy(
magma::Magma::Config& config) {
cb::UseArenaMallocSecondaryDomain domainGuard;
Expand Down Expand Up @@ -88,11 +101,11 @@ magma::Status MagmaMemoryTrackingProxy::GetDocs(
return magma->GetDocs(kvID, getOps, cb);
}

std::vector<magma::Magma::KVStoreID>
MagmaMemoryTrackingProxy::GetKVStoreList() {
// cb::UseArenaMallocSecondaryDomain domainGuard;
// @todo: Return a type which can be destroyed in the correct domain
return magma->GetKVStoreList();
void MagmaMemoryTrackingProxy::executeOnKVStoreList(
std::function<void(const std::vector<magma::Magma::KVStoreID>&)>
callback) {
cb::UseArenaMallocSecondaryDomain domainGuard;
callback(magma->GetKVStoreList());
}

std::tuple<magma::Status, magma::Magma::KVStoreRevision>
Expand All @@ -108,18 +121,20 @@ MagmaMemoryTrackingProxy::GetKVStoreStats(const magma::Magma::KVStoreID kvid) {
return magma->GetKVStoreStats(kvid);
}

std::unique_ptr<magma::UserStats> MagmaMemoryTrackingProxy::GetKVStoreUserStats(
DomainAwareUniquePtr<magma::UserStats>
MagmaMemoryTrackingProxy::GetKVStoreUserStats(
const magma::Magma::KVStoreID kvid) {
// cb::UseArenaMallocSecondaryDomain domainGuard;
// @todo: Return a type which can be destroyed in the correct domain
return magma->GetKVStoreUserStats(kvid);
cb::UseArenaMallocSecondaryDomain domainGuard;
return DomainAwareUniquePtr<magma::UserStats>{
magma->GetKVStoreUserStats(kvid).release()};
}

std::unique_ptr<magma::UserStats> MagmaMemoryTrackingProxy::GetKVStoreUserStats(
DomainAwareUniquePtr<magma::UserStats>
MagmaMemoryTrackingProxy::GetKVStoreUserStats(
magma::Magma::Snapshot& snapshot) {
// cb::UseArenaMallocSecondaryDomain domainGuard;
// @todo: Return a type which can be destroyed in the correct domain
return magma->GetKVStoreUserStats(snapshot);
cb::UseArenaMallocSecondaryDomain domainGuard;
return DomainAwareUniquePtr<magma::UserStats>{
magma->GetKVStoreUserStats(snapshot).release()};
}

magma::Status MagmaMemoryTrackingProxy::GetLocal(
Expand Down Expand Up @@ -169,11 +184,11 @@ void MagmaMemoryTrackingProxy::GetStats(
magma->GetStats(magmaStats, cacheDuration);
}

std::unique_ptr<magma::Magma::SeqIterator>
DomainAwareUniquePtr<magma::Magma::SeqIterator>
MagmaMemoryTrackingProxy::NewSeqIterator(magma::Magma::Snapshot& snapshot) {
// cb::UseArenaMallocSecondaryDomain domainGuard;
// @todo: Return a type which can be destroyed in the correct domain
return magma->NewSeqIterator(snapshot);
cb::UseArenaMallocSecondaryDomain domainGuard;
return DomainAwareUniquePtr<magma::Magma::SeqIterator>{
magma->NewSeqIterator(snapshot).release()};
}

magma::Status MagmaMemoryTrackingProxy::Open() {
Expand Down
31 changes: 27 additions & 4 deletions engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.h
Expand Up @@ -40,6 +40,18 @@ class DomainAwareFetchBuffer {
std::unique_ptr<magma::Magma::FetchBuffer> buffer;
};

/**
* Helper/Deleter for std::unique_ptr types, will delete in
* MemoryDomain::Secondary
*/
template <class T>
struct DomainAwareDelete {
void operator()(T* p);
};

template <class T>
using DomainAwareUniquePtr = std::unique_ptr<T, DomainAwareDelete<T>>;

class MagmaMemoryTrackingProxy {
public:
/**
Expand Down Expand Up @@ -75,15 +87,26 @@ class MagmaMemoryTrackingProxy {
magma::Status GetDocs(const magma::Magma::KVStoreID kvID,
magma::Operations<magma::Magma::GetOperation>& getOps,
magma::Magma::GetDocCallback cb);
std::vector<magma::Magma::KVStoreID> GetKVStoreList();

/**
* Invokes the given callback with the result of calling
* magma::GetKVStoreList, this allows the caller to switch domains and work
* with the returned vector
*
* @param callback function to invoke for each KVStore
*/
void executeOnKVStoreList(
std::function<void(const std::vector<magma::Magma::KVStoreID>&)>
callback);

std::tuple<magma::Status, magma::Magma::KVStoreRevision> GetKVStoreRevision(
const magma::Magma::KVStoreID kvID);
std::tuple<magma::Status, magma::KVStoreStats> GetKVStoreStats(
const magma::Magma::KVStoreID kvid);

std::unique_ptr<magma::UserStats> GetKVStoreUserStats(
DomainAwareUniquePtr<magma::UserStats> GetKVStoreUserStats(
const magma::Magma::KVStoreID kvid);
std::unique_ptr<magma::UserStats> GetKVStoreUserStats(
DomainAwareUniquePtr<magma::UserStats> GetKVStoreUserStats(
magma::Magma::Snapshot& snapshot);

magma::Status GetLocal(const magma::Magma::KVStoreID kvID,
Expand All @@ -109,7 +132,7 @@ class MagmaMemoryTrackingProxy {
void GetStats(
magma::Magma::MagmaStats& magmaStats,
std::chrono::milliseconds cacheDuration = std::chrono::seconds(0));
std::unique_ptr<magma::Magma::SeqIterator> NewSeqIterator(
DomainAwareUniquePtr<magma::Magma::SeqIterator> NewSeqIterator(
magma::Magma::Snapshot& snapshot);
magma::Status Open();
magma::Status Rollback(const magma::Magma::KVStoreID kvID,
Expand Down

0 comments on commit 497bfd6

Please sign in to comment.