Skip to content

Commit

Permalink
MB-54516: Support flush of a ModifyCollection system event
Browse files Browse the repository at this point in the history
Update the schema in kvstore.fbs to include a new bool field for storing
the collections history setting.

The KVStore derived VB::Manifest now captures the history setting from
CreateCollection events.

With that, check for the flush of a ModifyCollection event and use the
settings from that event to make any adjustments to the KVStore
collection data.

VB::Manifest now reloads the correct state from warmup and flush
support allows testing of Modify from backfill.

A limitation of this commit is that drop collection does not yet
support the ModifyCollection event (it is not dropped when it
should be).

Change-Id: I523a4a0ea2c2b3de70d7f6cc621936495a6f8cb1
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/183251
Well-Formed: Restriction Checker
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Jim Walker <jim@couchbase.com>
  • Loading branch information
jimwwalker committed Dec 16, 2022
1 parent d16fd06 commit f04959f
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 62 deletions.
56 changes: 52 additions & 4 deletions engines/ep/src/collections/flush.cc
Expand Up @@ -176,6 +176,10 @@ void Flush::recordSystemEvent(const Item& item) {
}
break;
}
case SystemEvent::ModifyCollection: {
recordModifyCollection(item);
break;
}
case SystemEvent::Scope: {
if (item.isDeleted()) {
recordDropScope(item);
Expand Down Expand Up @@ -214,6 +218,24 @@ void Flush::recordCreateCollection(const Item& item) {
setManifestUid(createEvent.manifestUid);
}

void Flush::recordModifyCollection(const Item& item) {
// Modify and Create carry the same data - all of the collection meta, hence
// call to getCreateEventData
auto createEvent = Collections::VB::Manifest::getCreateEventData(
{item.getData(), item.getNBytes()});
KVStore::OpenCollection collection{uint64_t(item.getBySeqno()),
createEvent.metaData};
auto [itr, emplaced] =
collectionMods.try_emplace(collection.metaData.cid, collection);
if (!emplaced) {
// Collection already in the map, only keep this event if >
if (uint64_t(item.getBySeqno()) > itr->second.startSeqno) {
itr->second = collection;
}
}
setManifestUid(createEvent.manifestUid);
}

void Flush::recordDropCollection(const Item& item) {
auto dropEvent = Collections::VB::Manifest::getDropEventData(
{item.getData(), item.getNBytes()});
Expand Down Expand Up @@ -334,8 +356,14 @@ flatbuffers::DetachedBuffer Flush::encodeOpenCollections(
}
}

// generate
const auto& meta = span.high.metaData;

// This will set the value based on any modification which may have
// been flushed
auto history = getHistorySetting(
meta.cid, span.high.startSeqno, meta.canDeduplicate);

// generate
exclusiveInsertCollection(
meta.cid,
Collections::KVStore::CreateCollection(
Expand All @@ -347,7 +375,8 @@ flatbuffers::DetachedBuffer Flush::encodeOpenCollections(
meta.maxTtl.value_or(std::chrono::seconds::zero())
.count(),
builder.CreateString(meta.name.data(),
meta.name.size())));
meta.name.size()),
history));
}

// And 'merge' with the data we read
Expand All @@ -372,7 +401,12 @@ flatbuffers::DetachedBuffer Flush::encodeOpenCollections(
entry->collectionId(),
entry->ttlValid(),
entry->maxTtl(),
builder.CreateString(entry->name())));
builder.CreateString(entry->name()),
// getHistorySetting checks for modifications
getHistorySetting(entry->collectionId(),
entry->startSeqno(),
getCanDeduplicateFromHistory(
entry->history()))));
} else {
// Here we maintain the startSeqno of the dropped collection
result->second.startSeqno = entry->startSeqno();
Expand All @@ -392,7 +426,8 @@ flatbuffers::DetachedBuffer Flush::encodeOpenCollections(
0,
builder.CreateString(
Collections::DefaultCollectionIdentifier
.data())));
.data()),
false /* default to no history*/));
}

auto collectionsVector = builder.CreateVector(finalisedOpenCollection);
Expand Down Expand Up @@ -669,4 +704,17 @@ VB::Manifest& Flush::getManifest() const {
return manifest;
}

bool Flush::getHistorySetting(CollectionID cid,
uint64_t seqno,
CanDeduplicate createSetting) {
auto modification = collectionMods.find(cid);
if (modification != collectionMods.end() &&
modification->second.startSeqno > seqno) {
return getHistoryFromCanDeduplicate(
modification->second.metaData.canDeduplicate);
}
// No modification, or it was before the create - return input value.
return getHistoryFromCanDeduplicate(createSetting);
}

} // namespace Collections::VB
29 changes: 28 additions & 1 deletion engines/ep/src/collections/flush.h
Expand Up @@ -184,7 +184,8 @@ class Flush {

// @return if the set of open collections is changing
bool isOpenCollectionsChanged() const {
return !collections.empty() || isDroppedCollectionsChanged();
return !collections.empty() || !collectionMods.empty() ||
isDroppedCollectionsChanged();
}

// @return if the set of dropped collections is changing
Expand Down Expand Up @@ -226,6 +227,11 @@ class Flush {
*/
void recordCreateCollection(const Item& item);

/**
* Record that a collection was modified in a commit batch
*/
void recordModifyCollection(const Item& item);

/**
* Record that a drop collection was present in a commit batch
*/
Expand Down Expand Up @@ -340,6 +346,21 @@ class Flush {
*/
void checkAndTriggerPurge(Vbid vbid, EPBucket& bucket) const;

/**
* Return the history setting to use for the collection created @ seqno.
* The function will return either the createSetting parameter or the value
* from any modify event which may be in the flush batch (and is ordered
* after the create).
*
* @param cid The collection created (and maybe modified)
* @param seqno The sequence number of the collection create
* @oaram createSetting The CanDeduplicate value from the create event
* @return The history bool based on the CanDeduplicate setting
*/
bool getHistorySetting(CollectionID cid,
uint64_t seqno,
CanDeduplicate createSetting);

/**
* For each collection created in the batch, we record meta data of the
* first and last (high/low by-seqno). If the collection was created once,
Expand All @@ -351,6 +372,12 @@ class Flush {
};
std::unordered_map<CollectionID, CollectionSpan> collections;

/**
* For each collection modified in the batch we record the meta data for the
* greatest by-seqno
*/
std::unordered_map<CollectionID, KVStore::OpenCollection> collectionMods;

/**
* For each scope created in the batch, we record meta data for the greatest
* by-seqno.
Expand Down
119 changes: 82 additions & 37 deletions engines/ep/src/collections/flush_accounting.cc
Expand Up @@ -15,29 +15,43 @@

namespace Collections::VB {

static std::pair<bool, std::optional<CollectionID>> getCollectionID(
const DocKey& key) {
bool isSystemEvent = key.isInSystemCollection();
CollectionID cid;
if (isSystemEvent) {
// Returns two optionals
// First is only initialised if the key is a collection system event.
// Second is only initialised if there is a collection id present.
// Any mutation will return nullopt,cid
// A collection create/drop/modify will return event, cid
// A scope event will return nullopt, nullopt

/**
* From a DocKey extract CollectionID and SystemEvent data.
* The function returns two optionals because depending on the key there could
* be no SystemEvent (regular mutation) or no CollectionID (Scope event) - or we
* could have both (e.g. Collection SystemEvent).
*
* @param key a DocKey from a flushed Item
* @return a pair of optional. first (SystemEvent) is initialised if the key
* belongs to the SystemCollection (collection 1) - the value is the
* type of event that the key represents. The second (CollectionID) is
* initialised with a CollectionID where relevant. E.g. CreateScope
* leaves this as std::nullopt but CreateCollection will set it to the
* collection's ID. All non system keys will also initialise second with
* the CollectionID.
*/
static std::pair<std::optional<SystemEvent>, std::optional<CollectionID>>
getCollectionEventAndCollectionID(const DocKey& key) {
if (key.isInSystemCollection()) {
auto [event, id] = SystemEventFactory::getTypeAndID(key);
switch (event) {
case SystemEvent::ModifyCollection:
case SystemEvent::Collection: {
cid = CollectionID(id);
break;
return {event, CollectionID(id)};
}
case SystemEvent::Scope:
// system event but not for a collection
return {true, {}};
case SystemEvent::ModifyCollection:
// @todo: separate patch to cover this.
throw std::runtime_error(
"No ModifyCollection support in flush_accounting");
return {std::nullopt, std::nullopt};
}
} else {
cid = key.getCollectionID();
}
return {isSystemEvent, cid};
// No event, but we do have a collection id
return {std::nullopt, key.getCollectionID()};
}

FlushAccounting::StatisticsUpdate&
Expand Down Expand Up @@ -117,7 +131,7 @@ void FlushAccounting::StatisticsUpdate::insert(
return;
}

// else inserting a collection-start/prepare/tombstone/abort:
// else inserting a collection{start,modify}/prepare/tombstone/abort:
// no item increment but account for the disk size change
updateDiskSize(diskSizeDelta);
}
Expand All @@ -134,8 +148,8 @@ void FlushAccounting::StatisticsUpdate::remove(
IsDeleted isDelete,
IsCommitted isCommitted,
CompactionCallbacks compactionCallbacks,
size_t oldSize,
size_t newSize) {
size_t oldSize,
size_t newSize) {
if (isSystem == IsSystem::No && isCommitted == IsCommitted::Yes) {
decrementItemCount();
}
Expand Down Expand Up @@ -171,9 +185,25 @@ void FlushAccounting::presetStats(CollectionID cid,
}
}

bool FlushAccounting::processSystemEvent(CollectionID cid,
IsDeleted isDelete,
IsCompaction isCompaction) {
bool FlushAccounting::checkAndMaybeProcessSystemEvent(
SystemEvent event,
CollectionID cid,
IsDeleted isDelete,
IsCompaction isCompaction) {
switch (event) {
case SystemEvent::ModifyCollection:
// Modify event - no processing
return false;
case SystemEvent::Collection:
// Collection create/drop - break and process
break;
default:
// E.g. Scope
throw std::logic_error(
"checkAndMaybeProcessSystemEvent unexpected event" +
std::to_string(int(event)));
}

// If the update comes from compaction (where replay is copying data) then
// unconditionally remove the collection from the stats map. A create or
// or drop collection is the start or end of the collection - in both cases
Expand All @@ -184,6 +214,8 @@ bool FlushAccounting::processSystemEvent(CollectionID cid,
stats.erase(cid);
}

// caller stops processing if this function returns true - DropCollection
// stops the processing.
return isDelete == IsDeleted::Yes;
}

Expand All @@ -194,20 +226,27 @@ void FlushAccounting::updateStats(const DocKey& key,
size_t size,
IsCompaction isCompaction,
CompactionCallbacks compactionCallbacks) {
auto [isSystemEvent, cid] = getCollectionID(key);
const auto [event, cid] = getCollectionEventAndCollectionID(key);

if (!cid) {
// The key is not for a collection (e.g. a scope event).
// If the key is not associated with a Collection then abandon
// processing. This must mean the key is a Scope event and as such does
// not change collection stats.
return;
}

// System events have extra handling and may terminate the stat update
if (isSystemEvent && processSystemEvent(*cid, isDelete, isCompaction)) {
// This was a drop collection event, we don't update the collection
// stats for this case.
if (event &&
checkAndMaybeProcessSystemEvent(*event, *cid, isDelete, isCompaction)) {
// This case is reached for a DropCollection event. No collection stats
// are to be changed.
return;
}

// At this point the key is a normal mutation/delete so will for example
// increment or decrement the collection item count.
// Or the key is a SystemEvent Create/Modify collection, which needs to
// change the collection disk size and high-seqno.

// Track high-seqno for the item
auto& collsFlushStats = getStatsAndMaybeSetPersistedHighSeqno(
cid.value(), seqno, compactionCallbacks);
Expand All @@ -219,7 +258,7 @@ void FlushAccounting::updateStats(const DocKey& key,
// seqno. Empty collection detection relies on start-seqno == high-seqno.
if (!isLogicallyDeleted(cid.value(), seqno) ||
compactionCallbacks == CompactionCallbacks::AnyRevision) {
collsFlushStats.insert(isSystemEvent ? IsSystem::Yes : IsSystem::No,
collsFlushStats.insert(event ? IsSystem::Yes : IsSystem::No,
isDelete,
isCommitted,
compactionCallbacks,
Expand All @@ -240,18 +279,21 @@ bool FlushAccounting::updateStats(const DocKey& key,
bool logicalInsert = false;

// Same logic (and comments) apply as per the above updateStats function.
auto [systemEvent, cid] = getCollectionID(key);
auto isSystemEvent = systemEvent ? IsSystem::Yes : IsSystem::No;
const auto [event, cid] = getCollectionEventAndCollectionID(key);
if (!cid) {
return false;
}

// System events have extra handling and may terminate the stat update
if (isSystemEvent == IsSystem::Yes &&
processSystemEvent(*cid, isDelete, isCompaction)) {
if (event &&
checkAndMaybeProcessSystemEvent(*event, *cid, isDelete, isCompaction)) {
return false;
}

// At this point the key is a normal mutation/delete so will for example
// increment or decrement the collection item count.
// Or the key is a SystemEvent Create/Modify collection, which needs to
// change the collection disk size and high-seqno.

auto& collsFlushStats = getStatsAndMaybeSetPersistedHighSeqno(
cid.value(), seqno, compactionCallbacks);

Expand All @@ -261,6 +303,7 @@ bool FlushAccounting::updateStats(const DocKey& key,
return false;
}

const auto isSystemEvent = event ? IsSystem::Yes : IsSystem::No;
if (isSystemEvent == IsSystem::No &&
compactionCallbacks == CompactionCallbacks::AnyRevision &&
(isLogicallyDeleted(cid.value(), oldSeqno) ||
Expand Down Expand Up @@ -361,16 +404,18 @@ bool FlushAccounting::updateStats(const DocKey& key,
void FlushAccounting::maybeUpdatePersistedHighSeqno(const DocKey& key,
uint64_t seqno,
bool isDelete) {
// Same logic and comment as updateStats.
auto [isSystemEvent, cid] = getCollectionID(key);
const auto [event, cid] = getCollectionEventAndCollectionID(key);

if (!cid) {
// A scope event - no high-seqno update.
return;
}

if (isDelete && isSystemEvent) {
if (isDelete && event) {
// A system event, but deleted (drop collection) - no high-seqno update
return;
}

// don't care for the return value, just update the persisted high seqno
getStatsAndMaybeSetPersistedHighSeqno(cid.value(), seqno);
}
Expand Down

0 comments on commit f04959f

Please sign in to comment.