Skip to content

Commit

Permalink
MB-54516: Generate a ModifyCollection SystemEvent
Browse files Browse the repository at this point in the history
When processing a Manifest against VB::Manifest detect if the history
setting changes. When a change in history occurs update the collection
entry with the new value and generate a ModifyCollection system event.

Once the VB::Manifest write lock is dropped, the VB::Manifest will
return the new history value for all subsequent mutations (which
are ordered after the ModifyCollection event). This will in a follow
up change allow the replica to correctly process incoming mutations
against the correct history setting.

With this commit a ModifyCollection SystemEvent uses a single key per
collection. The first modification is an insert, with all subsequent
modifications generating updates. However if the CDC feature is
enabled, the complete history (for the retention period) would be
available to a DCP client, which could be useful if a DCP client
enables change_streams and flatbuffer_system_events - they could infer
the exact seqno ranges that have history enabled/disabled.

In the seqno-index a life of a collection may now become

1: CreateCollection(name=c1, history=false) <- deduplication occurs
5: Mutation(c1, k1)
7: Mutation(c1, k2)
8: ModifyCollection(name=c1, history=true) <- all versions preserved
5: Mutation(c1, k1)
7: Mutation(c1, k1)

Due to the non-unique key used for ModifyCollection, backfilling the
vbucket history does not allow the reader to infer anything about the
collections "history" configuration unless the bucket is configured to
retain all versions.

This commit only supports the ActiveReplicaManifest from
vbucket_manifest_test.cc - the new SystemEvent does not yet
transfer via DCP and cannot yet be flushed.

Change-Id: I4a1ab90bcb56ca07647840183302724df3c63d7a
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/183249
Well-Formed: Restriction Checker
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
  • Loading branch information
jimwwalker committed Dec 14, 2022
1 parent 47a8114 commit 0eb2b8f
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 32 deletions.
7 changes: 6 additions & 1 deletion engines/ep/src/collections/collections_types.h
Expand Up @@ -327,7 +327,12 @@ class CollectionSharedMetaData : public RCValue {
const std::string name;
const ScopeID scope;
const cb::ExpiryLimit maxTtl;
const CanDeduplicate canDeduplicate;

// atomic: can be changed by any thread
// mutable: the VB::Manifest "handle" provides a const interface (the map of
// collections cannot be changed), but allows some changes to be made to
// the values in that map.
mutable std::atomic<CanDeduplicate> canDeduplicate;
};
std::ostream& operator<<(std::ostream& os,
const CollectionSharedMetaData& meta);
Expand Down
5 changes: 5 additions & 0 deletions engines/ep/src/collections/flush_accounting.cc
Expand Up @@ -27,7 +27,12 @@ static std::pair<bool, std::optional<CollectionID>> getCollectionID(
break;
}
case SystemEvent::Scope:
// system event but not for a collection
return {true, {}};
case SystemEvent::ModifyCollection:
// @todo: separate patch to cover this.
throw std::runtime_error(
"No ModifyCollection support in flush_accounting");
}
} else {
cid = key.getCollectionID();
Expand Down
97 changes: 94 additions & 3 deletions engines/ep/src/collections/vbucket_manifest.cc
Expand Up @@ -171,6 +171,27 @@ std::optional<Manifest::CollectionCreation> Manifest::applyCreates(
return rv;
}

std::optional<Manifest::CollectionModification> Manifest::applyModifications(
const WriteHandle& wHandle,
::VBucket& vb,
std::vector<CollectionModification>& changes) {
std::optional<CollectionModification> rv;
if (!changes.empty()) {
rv = changes.back();
changes.pop_back();
}
for (const auto& modification : changes) {
modifyCollection(wHandle,
vb,
manifestUid,
modification.cid,
modification.canDeduplicate,
OptionalSeqno{/*no-seqno*/});
}
changes.clear();
return rv;
}

std::optional<ScopeID> Manifest::applyScopeDrops(
const WriteHandle& wHandle,
::VBucket& vb,
Expand Down Expand Up @@ -365,6 +386,17 @@ void Manifest::completeUpdate(mutex_type::UpgradeHolder&& upgradeLock,
OptionalSeqno{/*no-seqno*/});
}

auto finalModification =
applyModifications(wHandle, vb, changeset.collectionsToModify);
if (finalModification) {
modifyCollection(wHandle,
vb,
changeset.getUidForChange(manifestUid),
finalModification.value().cid,
finalModification.value().canDeduplicate,
OptionalSeqno{/*no-seqno*/});
}

// This is done last so the scope deletion follows any collection
// deletions
auto finalScopeDrop = applyScopeDrops(wHandle, vb, changeset.scopesToDrop);
Expand All @@ -389,6 +421,9 @@ void Manifest::completeUpdate(mutex_type::UpgradeHolder&& upgradeLock,
// property has changed
static bool isImmutablePropertyModified(const CollectionMetaData& newEntry,
const ManifestEntry& existingEntry) {
// Scope cannot change.
// Name cannot change.
// CanDeduplicate (history) can change.
return newEntry.sid != existingEntry.getScopeID() ||
newEntry.name != existingEntry.getName();
}
Expand Down Expand Up @@ -582,6 +617,45 @@ void Manifest::dropCollection(WriteHandle& wHandle,
map.erase(itr);
}

void Manifest::modifyCollection(const WriteHandle& wHandle,
::VBucket& vb,
ManifestUid newManUid,
CollectionID cid,
CanDeduplicate canDeduplicate,
OptionalSeqno optionalSeqno) {
auto itr = map.find(cid);
if (itr == map.end()) {
throwException<std::logic_error>(
__FUNCTION__, "did not find collection:" + cid.to_string());
}

// record the uid of the manifest which modified the collection
updateUid(newManUid, optionalSeqno.has_value());

// Now change the value
itr->second.setCanDeduplicate(canDeduplicate);

auto seqno = queueCollectionSystemEvent(wHandle,
vb,
cid,
itr->second.getName(),
itr->second,
SystemEventType::Modify,
optionalSeqno,
{/*no callback*/});

EP_LOG_DEBUG(
"{} modify collection:id:{} from scope:{}, seq:{}, manifest:{:#x}"
", {}{}",
vb.getId(),
cid,
itr->second.getScopeID(),
seqno,
newManUid,
canDeduplicate,
optionalSeqno.has_value() ? ", replica" : "");
}

void Manifest::collectionDropPersisted(CollectionID cid, uint64_t seqno) {
// As soon as we get notification that a dropped collection was flushed
// successfully, mark this flag so subsequent flushes can maintain stats
Expand Down Expand Up @@ -750,6 +824,9 @@ Manifest::ManifestChanges Manifest::processManifest(
if (itr == manifest.end()) {
// Not found, so this collection should be dropped.
rv.collectionsToDrop.push_back(cid);
} else if (entry.getCanDeduplicate() != itr->second.canDeduplicate) {
// Found the collection and history was modified
rv.collectionsToModify.push_back({cid, itr->second.canDeduplicate});
}
}

Expand Down Expand Up @@ -891,7 +968,9 @@ std::unique_ptr<Item> Manifest::makeCollectionSystemEvent(
flatbuffers::FlatBufferBuilder builder;

switch (type) {
case SystemEventType::Create: {
case SystemEventType::Create:
// Modify carries the current collection metadata (same as data as create)
case SystemEventType::Modify: {
auto collection = CreateCollection(
builder,
uid,
Expand All @@ -914,8 +993,20 @@ std::unique_ptr<Item> Manifest::makeCollectionSystemEvent(
}
}

auto item = SystemEventFactory::makeCollectionEvent(
cid, {builder.GetBufferPointer(), builder.GetSize()}, seq);
std::unique_ptr<Item> item;
switch (type) {
case SystemEventType::Create:
case SystemEventType::Delete: {
item = SystemEventFactory::makeCollectionEvent(
cid, {builder.GetBufferPointer(), builder.GetSize()}, seq);
break;
}
case SystemEventType::Modify: {
item = SystemEventFactory::makeModifyCollectionEvent(
cid, {builder.GetBufferPointer(), builder.GetSize()}, seq);
break;
}
}

if (type == SystemEventType::Delete) {
item->setDeleted();
Expand Down
45 changes: 37 additions & 8 deletions engines/ep/src/collections/vbucket_manifest.h
Expand Up @@ -312,11 +312,7 @@ class Manifest {
static DropScopeEventData getDropScopeEventData(
std::string_view flatbufferData);

enum SystemEventType {
Create,
Delete
// @todo Modify
};
enum SystemEventType { Create, Modify, Delete };

/**
* @return an Item that represent a collection create or delete
Expand All @@ -341,6 +337,11 @@ class Manifest {
CanDeduplicate canDeduplicate;
};

struct CollectionModification {
CollectionID cid;
CanDeduplicate canDeduplicate;
};

// local struct for managing scope creation
struct ScopeCreation {
ScopeID sid;
Expand All @@ -367,6 +368,7 @@ class Manifest {
std::vector<ScopeModified> scopesToModify;
std::vector<CollectionCreation> collectionsToCreate;
std::vector<CollectionID> collectionsToDrop;
std::vector<CollectionModification> collectionsToModify;
// stores any new value (if needed)
std::optional<bool> changeScopeWithDataLimitExists;

Expand All @@ -376,7 +378,8 @@ class Manifest {
bool none() const {
return scopesToCreate.empty() && scopesToModify.empty() &&
scopesToDrop.empty() && collectionsToCreate.empty() &&
collectionsToDrop.empty() && !changeScopeWithDataLimitExists;
collectionsToDrop.empty() && collectionsToModify.empty() &&
!changeScopeWithDataLimitExists;
}

/// @return true if there are collections/scopes to create or drop
Expand Down Expand Up @@ -414,8 +417,8 @@ class Manifest {

/**
* Sub-functions used by update.
* applyCreates/applyDrops and applyScopeCreates/applyScopeDrops follow
* a similar pattern as follows.
* applyCreates/applyDrops/applyModifications and
* applyScopeCreates/applyScopeDrops follow a similar pattern as follows.
*
* Given a 'changeset' (vector of changes) remove the last entry of the
* changes vector and then call an 'update' function on every remaining
Expand All @@ -440,6 +443,11 @@ class Manifest {
::VBucket& vb,
std::vector<CollectionID>& changes);

std::optional<CollectionModification> applyModifications(
const WriteHandle& wHandle,
::VBucket& vb,
std::vector<CollectionModification>& changes);

std::optional<ScopeCreation> applyScopeCreates(
const WriteHandle& wHandle,
::VBucket& vb,
Expand Down Expand Up @@ -491,6 +499,27 @@ class Manifest {
CollectionID cid,
OptionalSeqno optionalSeqno);

/**
* Modify the collection from the vbucket
*
* @param wHandle The manifest write handle under which this operation is
* currently locked. Required to ensure we lock correctly around
* VBucket::notifyNewSeqno
* @param vb The vbucket to modify the collection in
* @param newManUid the uid of the manifest which made the change
* @param cid CollectionID to modify
* @param canDeduplicate current deduplicate setting
* @param optionalSeqno Either a seqno to assign to the modification event
* of the collection or none (none means the checkpoint assigns the
* seqno).
*/
void modifyCollection(const WriteHandle& wHandle,
::VBucket& vb,
ManifestUid newManUid,
CollectionID cid,
CanDeduplicate canDeduplicate,
OptionalSeqno optionalSeqno);

/**
* Create a scope in the vbucket.
*
Expand Down
4 changes: 4 additions & 0 deletions engines/ep/src/collections/vbucket_manifest_entry.h
Expand Up @@ -206,6 +206,10 @@ class ManifestEntry {
return meta->canDeduplicate;
}

void setCanDeduplicate(CanDeduplicate value) {
meta->canDeduplicate.store(value);
}

private:
/**
* Return a string for use in throwException, returns:
Expand Down
24 changes: 24 additions & 0 deletions engines/ep/src/collections/vbucket_manifest_handles.h
Expand Up @@ -669,6 +669,30 @@ class WriteHandle {
OptionalSeqno{startSeqno});
}

/**
* Modify a collection for a replica VB, this is for receiving
* collection updates via DCP when the collection already has a start
* seqno assigned (assigned by the active).
*
* @param vb The vbucket to create the collection in
* @param manifestUid the uid of the manifest which made the change
* @param cid The collection id
* @param canDeduplicate New deduplicate value
* @param startSeqno The start-seqno assigned to the collection.
*/
void replicaModifyCollection(::VBucket& vb,
ManifestUid manifestUid,
CollectionID cid,
CanDeduplicate canDeduplicate,
int64_t startSeqno) {
manifest.modifyCollection(*this,
vb,
manifestUid,
cid,
canDeduplicate,
OptionalSeqno{startSeqno});
}

/**
* Drop collection for a replica VB, this is for receiving
* collection updates via DCP and the collection already has an end
Expand Down
1 change: 1 addition & 0 deletions engines/ep/src/dcp/active_stream.cc
Expand Up @@ -1355,6 +1355,7 @@ bool ActiveStream::shouldProcessItem(const Item& item) {
if (item.getOperation() == queue_op::system_event) {
switch (SystemEvent(item.getFlags())) {
case SystemEvent::Collection:
case SystemEvent::ModifyCollection:
case SystemEvent::Scope:
return true;
}
Expand Down
6 changes: 4 additions & 2 deletions engines/ep/src/dcp/backfill_by_id_disk.cc
Expand Up @@ -45,8 +45,10 @@ backfill_status_t DCPBackfillByIdDisk::create() {
// 2) the range for the collection itself

// The system event start/end we can make from SystemEventFactory
auto sysStart = SystemEventFactory::makeCollectionEventKey(cid);
auto sysEnd = SystemEventFactory::makeCollectionEventKey(uint32_t{cid} + 1);
auto sysStart = SystemEventFactory::makeCollectionEventKey(
cid, SystemEvent::Collection);
auto sysEnd = SystemEventFactory::makeCollectionEventKey(
uint32_t{cid} + 1, SystemEvent::Collection);

// Create the start and end keys for the collection itself
cb::mcbp::unsigned_leb128<CollectionIDType> start(uint32_t{cid});
Expand Down
4 changes: 2 additions & 2 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc
Expand Up @@ -2602,8 +2602,8 @@ CompactDBStatus MagmaKVStore::compactDBInternal(

// We've finish processing this collection.
// Create a SystemEvent key for the collection and process it.
auto collectionKey =
SystemEventFactory::makeCollectionEventKey(dc.collectionId);
auto collectionKey = SystemEventFactory::makeCollectionEventKey(
dc.collectionId, SystemEvent::Collection);

keySlice = {reinterpret_cast<const char*>(collectionKey.data()),
collectionKey.size()};
Expand Down

0 comments on commit 0eb2b8f

Please sign in to comment.