Skip to content

Commit

Permalink
Merge "Merge branch 'couchbase/neo' into 'couchbase/master'"
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit Code Review committed Jan 27, 2023
2 parents 16f4dbb + bb30ae1 commit 45f6681
Show file tree
Hide file tree
Showing 19 changed files with 276 additions and 142 deletions.
7 changes: 0 additions & 7 deletions engines/ep/src/collections/flush_accounting.cc
Expand Up @@ -15,13 +15,6 @@

namespace Collections::VB {

// Returns two optionals
// First is only initialised if the key is a collection system event.
// Second is only initialised if there is a collection id present.
// Any mutation will return nullopt,cid
// A collection create/drop/modify will return event, cid
// A scope event will return nullopt, nullopt

/**
* From a DocKey extract CollectionID and SystemEvent data.
* The function returns two optionals because depending on the key there could
Expand Down
17 changes: 14 additions & 3 deletions engines/ep/src/collections/scan_context.cc
Expand Up @@ -12,6 +12,7 @@
#include "collections/scan_context.h"

#include "collections/kvstore.h"
#include "systemevent_factory.h"

namespace Collections::VB {

Expand All @@ -29,13 +30,23 @@ ScanContext::ScanContext(
}

bool ScanContext::isLogicallyDeleted(const DocKey& key, uint64_t seqno) const {
if (dropped.empty() || key.isInSystemCollection()) {
if (dropped.empty()) {
return false;
}

CollectionID cid;
if (key.isInSystemCollection()) {
auto modify = SystemEventFactory::isModifyCollection(key);
if (!modify) {
return false;
}
cid = modify.value();
} else {
cid = key.getCollectionID();
}

// Is the key in a range which contains dropped collections and in the set?
return (seqno >= startSeqno && seqno <= endSeqno) &&
dropped.count(key.getCollectionID()) > 0;
return (seqno >= startSeqno && seqno <= endSeqno) && dropped.count(cid) > 0;
}

std::ostream& operator<<(std::ostream& os, const ScanContext& scanContext) {
Expand Down
3 changes: 2 additions & 1 deletion engines/ep/src/ep_bucket.cc
Expand Up @@ -1268,7 +1268,8 @@ void EPBucket::dropKey(VBucket& vb,

auto docKey = diskKey.getDocKey();
if (docKey.isInSystemCollection()) {
throw std::logic_error("EPBucket::dropKey called for a system key");
// SystemEvents aren't in memory so return.
return;
}

if (diskKey.isPrepared() && bySeqno > highCompletedSeqno) {
Expand Down
71 changes: 71 additions & 0 deletions engines/ep/src/ep_types.h
Expand Up @@ -310,6 +310,77 @@ using VBucketStateLockRef = cb::SharedLockRef<internal::VBucketStateLockTag>;
template <typename Lock>
using VBucketStateLockMap = folly::F14FastMap<Vbid, Lock>;

/**
* Properties of the storage layer.
*/
class StorageProperties {
public:
enum class ByIdScan : bool { Yes, No };

/**
* Will the KVStore de-dupe items such that only the highest seqno for any
* given key in a single flush batch is persisted?
*/
enum class AutomaticDeduplication : bool { Yes, No };

/**
* Will the KVStore count items in the prepare namespace (and update the
* values appropriately in the vbstate)
*/
enum class PrepareCounting : bool { Yes, No };

/**
* Will the KVStore make callbacks with stale (superseded) items during
* compaction?
*/
enum class CompactionStaleItemCallbacks : bool { Yes, No };

/**
* Does the KVStore support history retention (suitable for change streams)
*/
enum class HistoryRetentionAvailable : bool { Yes, No };

StorageProperties(ByIdScan byIdScan,
AutomaticDeduplication automaticDeduplication,
PrepareCounting prepareCounting,
CompactionStaleItemCallbacks compactionStaleItemCallbacks,
HistoryRetentionAvailable historyRetentionAvailable)
: byIdScan(byIdScan),
automaticDeduplication(automaticDeduplication),
prepareCounting(prepareCounting),
compactionStaleItemCallbacks(compactionStaleItemCallbacks),
historyRetentionAvailable(historyRetentionAvailable) {
}

bool hasByIdScan() const {
return byIdScan == ByIdScan::Yes;
}

bool hasAutomaticDeduplication() const {
return automaticDeduplication == AutomaticDeduplication::Yes;
}

bool hasPrepareCounting() const {
return prepareCounting == PrepareCounting::Yes;
}

bool hasCompactionStaleItemCallbacks() const {
return compactionStaleItemCallbacks ==
CompactionStaleItemCallbacks::Yes;
}

bool canRetainHistory() const {
return historyRetentionAvailable == HistoryRetentionAvailable::Yes;
}

private:
ByIdScan byIdScan;
AutomaticDeduplication automaticDeduplication;
PrepareCounting prepareCounting;
CompactionStaleItemCallbacks compactionStaleItemCallbacks;
HistoryRetentionAvailable historyRetentionAvailable;
};

namespace cb {

/**
Expand Down
15 changes: 8 additions & 7 deletions engines/ep/src/ephemeral_vb.cc
Expand Up @@ -318,10 +318,16 @@ std::optional<SequenceList::RangeIterator> EphemeralVBucket::makeRangeIterator(

bool EphemeralVBucket::isKeyLogicallyDeleted(const DocKey& key,
int64_t bySeqno) const {
if (key.isInSystemCollection()) {
if (key.isInSystemCollection() &&
!SystemEventFactory::isModifyCollection(key)) {
return false;
// else Modify events can be dropped
}
auto cHandle = lockCollections(key);

// ReadLock the manifest and permit system keys - this ensures a Modify
// can lookup the correct collection
auto cHandle =
manifest->lock(key, Collections::VB::Manifest::AllowSystemKeys{});
return !cHandle.valid() || cHandle.isLogicallyDeleted(bySeqno);
}

Expand Down Expand Up @@ -939,11 +945,6 @@ size_t EphemeralVBucket::getNumPersistedDeletes() const {
}

void EphemeralVBucket::dropKey(const DocKey& key, int64_t bySeqno) {
// The system event doesn't get dropped here (tombstone purger will deal)
if (key.isInSystemCollection()) {
return;
}

// if there is a pending item which will need removing from the DM,
// store its seqno here for use after the HT lock has been released.
int64_t prepareSeqno = 0;
Expand Down
21 changes: 11 additions & 10 deletions engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc
Expand Up @@ -941,7 +941,9 @@ static int time_purge_hook(Db& d,

// Track nothing for the individual collection as the stats doc has
// already been deleted.
} else {
} else if (!docKey.isInSystemCollection()) {
// item in a collection was dropped, track how many for item count
// adjustment.
if (!info->deleted) {
ctx.stats.collectionsItemsPurged++;
} else {
Expand Down Expand Up @@ -2823,18 +2825,17 @@ static int bySeqnoScanCallback(Db* db, DocInfo* docinfo, void* ctx) {

auto diskKey = makeDiskDocKey(docinfo->id);

// Determine if the key is logically deleted, if it is we skip the key
// Note that system event keys (like create scope) are never skipped here
// Determine if the key is logically deleted
auto docKey = diskKey.getDocKey();
if (!docKey.isInSystemCollection()) {
if (sctx->docFilter !=
DocumentFilter::ALL_ITEMS_AND_DROPPED_COLLECTIONS) {
if (sctx->collectionsContext.isLogicallyDeleted(docKey, byseqno)) {
sctx->lastReadSeqno = byseqno;
return COUCHSTORE_SUCCESS;
}
if (sctx->docFilter != DocumentFilter::ALL_ITEMS_AND_DROPPED_COLLECTIONS) {
if (sctx->collectionsContext.isLogicallyDeleted(docKey, byseqno)) {
sctx->lastReadSeqno = byseqno;
return COUCHSTORE_SUCCESS;
}
}

// Only do cache lookup for non-system events
if (!docKey.isInSystemCollection()) {
CacheLookup lookup(diskKey, byseqno, vbucketId);

cl.callback(lookup);
Expand Down
8 changes: 4 additions & 4 deletions engines/ep/src/kvstore/kvstore.cc
Expand Up @@ -54,7 +54,7 @@ ScanContext::ScanContext(
std::unique_ptr<StatusCallback<CacheLookup>> cl,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
int64_t maxSeqno)
uint64_t maxSeqno)
: vbid(vbid),
handle(std::move(handle)),
docFilter(docFilter),
Expand All @@ -73,8 +73,8 @@ BySeqnoScanContext::BySeqnoScanContext(
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vb,
std::unique_ptr<KVFileHandle> handle,
int64_t start,
int64_t end,
uint64_t start,
uint64_t end,
uint64_t purgeSeqno,
DocumentFilter _docFilter,
ValueFilter _valFilter,
Expand Down Expand Up @@ -110,7 +110,7 @@ ByIdScanContext::ByIdScanContext(
ValueFilter _valFilter,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
int64_t maxSeqno)
uint64_t maxSeqno)
: ScanContext(vb,
std::move(handle),
_docFilter,
Expand Down
91 changes: 7 additions & 84 deletions engines/ep/src/kvstore/kvstore.h
Expand Up @@ -388,7 +388,7 @@ class ScanContext {
std::unique_ptr<StatusCallback<CacheLookup>> cl,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
int64_t maxSeqno);
uint64_t maxSeqno);

virtual ~ScanContext() = default;

Expand All @@ -409,13 +409,13 @@ class ScanContext {
}

const Vbid vbid;
int64_t lastReadSeqno{0};
uint64_t lastReadSeqno{0};
std::unique_ptr<KVFileHandle> handle;
const DocumentFilter docFilter;
const ValueFilter valFilter;
BucketLogger* logger;
const Collections::VB::ScanContext collectionsContext;
int64_t maxSeqno;
uint64_t maxSeqno;

/**
* Cumulative count of bytes read from disk during this scan. Counts
Expand All @@ -441,8 +441,8 @@ class BySeqnoScanContext : public ScanContext {
std::unique_ptr<StatusCallback<CacheLookup>> cl,
Vbid vb,
std::unique_ptr<KVFileHandle> handle,
int64_t start,
int64_t end,
uint64_t start,
uint64_t end,
uint64_t purgeSeqno,
DocumentFilter _docFilter,
ValueFilter _valFilter,
Expand All @@ -452,7 +452,7 @@ class BySeqnoScanContext : public ScanContext {
droppedCollections,
std::optional<uint64_t> timestamp = {});

const int64_t startSeqno;
const uint64_t startSeqno;
const uint64_t purgeSeqno;
const uint64_t documentCount;

Expand Down Expand Up @@ -512,7 +512,7 @@ class ByIdScanContext : public ScanContext {
ValueFilter _valFilter,
const std::vector<Collections::KVStore::DroppedCollection>&
droppedCollections,
int64_t maxSeqno);
uint64_t maxSeqno);
std::vector<ByIdRange> ranges;
// Key should be set by KVStore when a scan must be paused, this is where
// a scan can resume from
Expand Down Expand Up @@ -650,83 +650,6 @@ class KVStoreStats {
}
};

/**
* Properties of the storage layer.
*
* If concurrent filesystem access is possible, maxConcurrency() will
* be greater than one. One will need to determine whether more than
* one writer is possible as well as whether more than one reader is
* possible.
*/
class StorageProperties {
public:
enum class ByIdScan : bool { Yes, No };

/**
* Will the KVStore de-dupe items such that only the highest seqno for any
* given key in a single flush batch is persisted?
*/
enum class AutomaticDeduplication : bool { Yes, No };

/**
* Will the KVStore count items in the prepare namespace (and update the
* values appropriately in the vbstate)
*/
enum class PrepareCounting : bool { Yes, No };

/**
* Will the KVStore make callbacks with stale (superseded) items during
* compaction?
*/
enum class CompactionStaleItemCallbacks : bool { Yes, No };

/**
* Does the KVStore support history retention (suitable for change streams)
*/
enum class HistoryRetentionAvailable : bool { Yes, No };

StorageProperties(ByIdScan byIdScan,
AutomaticDeduplication automaticDeduplication,
PrepareCounting prepareCounting,
CompactionStaleItemCallbacks compactionStaleItemCallbacks,
HistoryRetentionAvailable historyRetentionAvailable)
: byIdScan(byIdScan),
automaticDeduplication(automaticDeduplication),
prepareCounting(prepareCounting),
compactionStaleItemCallbacks(compactionStaleItemCallbacks),
historyRetentionAvailable(historyRetentionAvailable) {
}

bool hasByIdScan() const {
return byIdScan == ByIdScan::Yes;
}

bool hasAutomaticDeduplication() const {
return automaticDeduplication == AutomaticDeduplication::Yes;
}

bool hasPrepareCounting() const {
return prepareCounting == PrepareCounting::Yes;
}

bool hasCompactionStaleItemCallbacks() const {
return compactionStaleItemCallbacks ==
CompactionStaleItemCallbacks::Yes;
}

bool canRetainHistory() const {
return historyRetentionAvailable == HistoryRetentionAvailable::Yes;
}

private:
ByIdScan byIdScan;
AutomaticDeduplication automaticDeduplication;
PrepareCounting prepareCounting;
CompactionStaleItemCallbacks compactionStaleItemCallbacks;
HistoryRetentionAvailable historyRetentionAvailable;
};


/**
* Base class for some KVStores that implements common functionality.
*/
Expand Down

0 comments on commit 45f6681

Please sign in to comment.