Skip to content

Commit

Permalink
MB-44539: Don't read dropped collections metadata unless it exists
Browse files Browse the repository at this point in the history
Profiling the flusher shows that "getDroppedCollections" called
from saveDocs accounted for 2.2% to 2.5% of the flusher threads
profile, and this is for a workload where the document didn't
actually exist, couchstore still does some work to read and
decompress data to find that the document is not present.

The dropped collection data is only needed by the flusher for
stats accounting when it exists and we already have some state
in the VB::Manifest which indicates if this document exists.

This commit makes that VB::Manifest 'bool' readable from
CouchKVStore::saveDocs so that the call to getDroppedCollections
can be skipped unless needed.

This also requires some work in the compaction code as compaction
is the entity which may finally delete the document (once compaction
has ran and purged all dropped collections).

Change-Id: Id7274161a07bf6f59b2bcc6b7757945fa1546a79
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/147911
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Richard de Mellow <richard.demellow@couchbase.com>
Reviewed-by: Ben Huddleston <ben.huddleston@couchbase.com>
  • Loading branch information
jimwwalker committed Mar 11, 2021
1 parent e32b8c5 commit cfc9e99
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 42 deletions.
19 changes: 19 additions & 0 deletions engines/ep/src/collections/eraser_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,35 @@ class EraserContext : public ScanContext {
* @param se The flags...SystemEvent of the value
*/
void processSystemEvent(const DocKey& key, SystemEvent se);

/**
* @return true if the on-disk collection meta-data should be updated
*/
bool needToUpdateCollectionsMetadata() const;

/**
* @return true if data associated with dropped collections exists on disk
*/
bool doesOnDiskDroppedDataExist() const {
return onDiskDroppedDataExists;
}

/**
* Eraser (compaction) may not always remove *all* dropped data, compaction
* sets the status with this method.
*/
void setOnDiskDroppedDataExists(bool value) {
onDiskDroppedDataExists = value;
}

private:
friend std::ostream& operator<<(std::ostream&, const EraserContext&);

/// set to true if a 'drop' system event was seen
bool seenEndOfCollection = false;

/// set to true when 'erase' completes and dropped collection data exists
bool onDiskDroppedDataExists = false;
};

std::ostream& operator<<(std::ostream&, const EraserContext&);
Expand Down
3 changes: 3 additions & 0 deletions engines/ep/src/collections/flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -805,4 +805,7 @@ flatbuffers::DetachedBuffer Flush::encodeOpenScopes(
return builder.Release();
}

bool Flush::droppedCollectionsExists() const {
return manifest.isDropInProgress();
}
} // namespace Collections::VB
5 changes: 5 additions & 0 deletions engines/ep/src/collections/flush.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ class Flush {
*/
flatbuffers::DetachedBuffer encodeOpenScopes(cb::const_byte_buffer scopes);

/**
* @return if dropped collections exist in storage
*/
bool droppedCollectionsExists() const;

private:
/**
* Set the ManifestUid from the create/drop events (but only the greatest
Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/collections/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ void Collections::Manager::warmupCompleted(EPBucket& bucket) const {
Vbid vbid = Vbid(i);
auto vb = bucket.getVBuckets().getBucket(vbid);
if (vb) {
if (vb->lockCollections().isDropInProgress()) {
if (vb->getManifest().isDropInProgress()) {
Collections::VB::Flush::triggerPurge(vbid, bucket);
}

Expand Down
10 changes: 5 additions & 5 deletions engines/ep/src/collections/vbucket_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,12 @@ void Manifest::dropCollection(WriteHandle& wHandle,
map.erase(itr);
}

// Method is cleaning up for insertions made in Manifest::dropCollection
void Manifest::collectionDropPersisted(CollectionID cid, uint64_t seqno) {
// As soon as we get notification that a dropped collection was flushed
// successfully, mark this flag so subsequent flushes can maintain stats
// correctly.
dropInProgress.store(true);

droppedCollections.wlock()->remove(cid, seqno);
}

Expand Down Expand Up @@ -847,10 +851,6 @@ uint64_t Manifest::queueCollectionSystemEvent(
return rv;
}

bool Manifest::isDropInProgress() const {
return dropInProgress;
}

size_t Manifest::getSystemEventItemCount() const {
// Every 'live' collection has 1 'live' item, except for the default
// collection which has no creation event.
Expand Down
32 changes: 24 additions & 8 deletions engines/ep/src/collections/vbucket_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,21 @@ class Manifest {
*/
void collectionDropPersisted(CollectionID cid, uint64_t seqno);

/**
* @return true if a collection drop is in-progress, at least 1 collection
* is in the state isDeleting
*/
bool isDropInProgress() const {
return dropInProgress.load();
}

/**
* Set the value of 'drop in progress'.
*/
void setDropInProgress(bool value) {
dropInProgress.store(value);
}

/**
* Get the system event collection create data from a SystemEvent
* Item's value.
Expand Down Expand Up @@ -817,12 +832,6 @@ class Manifest {
OptionalSeqno seq,
std::function<void(int64_t)> assignedSeqnoCallback) const;

/**
* @return true if a collection drop is in-progress, at least 1 collection
* is in the state isDeleting
*/
bool isDropInProgress() const;

/**
* @return the number of system events that exist (as items)
*/
Expand Down Expand Up @@ -944,8 +953,15 @@ class Manifest {
/// Manager of collections
const std::shared_ptr<Manager> manager;

/// Does this vbucket need collection purging triggering
bool dropInProgress{false};
/**
* Flag value which indicates if dropped collections exist, that includes
* documents and metadata. This is atomic because we don't require the
* 'manifest' rwlock and this value is written by compaction and read by
* the flusher.
*
* Value is true if dropped collection data exists
*/
std::atomic<bool> dropInProgress{false};

friend std::ostream& operator<<(std::ostream& os, const Manifest& manifest);
friend std::ostream& operator<<(std::ostream&,
Expand Down
7 changes: 0 additions & 7 deletions engines/ep/src/collections/vbucket_manifest_handles.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,6 @@ class ReadHandle {
Summary& summary) const {
manifest->accumulateStats(collections, summary);
}
/**
* @return true if a collection drop is in-progress, at least 1
* collection is in the state isDeleting
*/
bool isDropInProgress() const {
return manifest->isDropInProgress();
}

// @returns all of the statistics that need to be updated during flushing
StatsForFlush getStatsForFlush(CollectionID collection,
Expand Down
33 changes: 21 additions & 12 deletions engines/ep/src/couch-kvstore/couch-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1185,10 +1185,12 @@ CouchKVStore::replayPrecommitProcessDroppedCollections(
// 3) If the function returned data, write it, else the document is
// deleted.
if (fbData.data()) {
hook_ctx.eraserContext->setOnDiskDroppedDataExists(true);
localDocQueue.emplace_back(Collections::droppedCollectionsName,
fbData);
} else {
// Need to ensure the 'dropped' list on disk is now gone
hook_ctx.eraserContext->setOnDiskDroppedDataExists(false);
localDocQueue.emplace_back(Collections::droppedCollectionsName,
CouchLocalDocRequest::IsDeleted{});
}
Expand Down Expand Up @@ -1484,6 +1486,8 @@ CouchKVStore::CompactDBInternalStatus CouchKVStore::compactDBInternal(
localDocQueue.emplace_back(
Collections::droppedCollectionsName,
CouchLocalDocRequest::IsDeleted{});
hook_ctx->eraserContext->setOnDiskDroppedDataExists(
false);
}
auto ret = maybePatchOnDiskPrepares(
compacted, hook_ctx->stats, localDocQueue, vbid);
Expand Down Expand Up @@ -3004,19 +3008,24 @@ couchstore_error_t CouchKVStore::saveDocs(Vbid vbid,
docsLogicalBytes += calcLogicalDataSize(docs[idx], *docinfos[idx]);
}

auto [getDroppedStatus, droppedCollections] =
getDroppedCollections(*db);
if (getDroppedStatus != COUCHSTORE_SUCCESS) {
logger.warn(
"CouchKVStore::saveDocs: getDroppedConnections"
"error:{}, {}",
couchstore_strerror(getDroppedStatus),
vbid);
return getDroppedStatus;
}
// If dropped collections exists, read the dropped collections metadata
// so the flusher can do the correct stat calculations. This is
// conditional as (trying) to read this data slows down saveDocs.
if (kvctx.commitData.collections.droppedCollectionsExists()) {
auto [getDroppedStatus, droppedCollections] =
getDroppedCollections(*db);
if (getDroppedStatus != COUCHSTORE_SUCCESS) {
logger.warn(
"CouchKVStore::saveDocs: getDroppedConnections"
"error:{}, {}",
couchstore_strerror(getDroppedStatus),
vbid);
return getDroppedStatus;
}

kvctx.commitData.collections.setDroppedCollectionsForStore(
droppedCollections);
kvctx.commitData.collections.setDroppedCollectionsForStore(
droppedCollections);
}

auto cs_begin = std::chrono::steady_clock::now();

Expand Down
19 changes: 14 additions & 5 deletions engines/ep/src/ep_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ void EPBucket::compactionCompletionCallback(CompactionContext& ctx) {
auto preNumTotalItems = vb->getNumTotalItems();

auto preCollectionSizes = ctx.stats.collectionSizeUpdates;
bool preDropInProgess = vb->getManifest().isDropInProgress();
for (auto& [cid, newSize] : preCollectionSizes) {
auto handle = vb->getManifest().lock(cid);

Expand All @@ -1230,34 +1231,42 @@ void EPBucket::compactionCompletionCallback(CompactionContext& ctx) {
vb->setPurgeSeqno(ctx.max_purged_seq);
vb->decrNumTotalItems(ctx.stats.collectionsItemsPurged);

applyPostCompactionCollectionStats(*vb,
ctx.stats.collectionSizeUpdates);
updateCollectionStatePostCompaction(
*vb,
ctx.stats.collectionSizeUpdates,
ctx.eraserContext->doesOnDiskDroppedDataExist());

if (postCompactionCompletionStatsUpdateHook) {
postCompactionCompletionStatsUpdateHook();
}

} catch (std::exception& e) {
// Re-apply our pre-compaction stats snapshot
vb->setPurgeSeqno(prePurgeSeqno);
vb->setNumTotalItems(preNumTotalItems);

applyPostCompactionCollectionStats(*vb, preCollectionSizes);
updateCollectionStatePostCompaction(
*vb, preCollectionSizes, preDropInProgess);

// And re-throw to "undo" the on disk compaction
throw e;
}
}

void EPBucket::applyPostCompactionCollectionStats(
void EPBucket::updateCollectionStatePostCompaction(
VBucket& vb,
CompactionStats::CollectionSizeUpdates& collectionSizeUpdates) {
CompactionStats::CollectionSizeUpdates& collectionSizeUpdates,
bool onDiskDroppedCollectionDataExists) {
for (const auto& [cid, newSize] : collectionSizeUpdates) {
auto handle = vb.getManifest().lock(cid);

if (handle.valid()) {
handle.setDiskSize(newSize);
}
}

// Set the dropInProgress flag to true if ondisk dropped data exists
vb.getManifest().setDropInProgress(onDiskDroppedCollectionDataExists);
}

void EPBucket::compactInternal(LockedVBucketPtr& vb, CompactionConfig& config) {
Expand Down
10 changes: 7 additions & 3 deletions engines/ep/src/ep_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,17 @@ class EPBucket : public KVBucket {
void compactionCompletionCallback(CompactionContext& ctx);

/**
* Apply the collection size updates post-compaction
* Update collection state (VB::Manifest) after compaction has completed.
*
* @param vb VBucket ref
* @param stats Map of cid to new size value (new value not delta)
* @param onDiskDroppedCollectionDataExists true if the compacted file
* has dropped collections (documents and/or metadata).
*/
void applyPostCompactionCollectionStats(
VBucket& vb, CompactionStats::CollectionSizeUpdates& stats);
void updateCollectionStatePostCompaction(
VBucket& vb,
CompactionStats::CollectionSizeUpdates& stats,
bool onDiskDroppedCollectionDataExists);

void stopWarmup();

Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/vb_commit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace VB {
Commit::Commit(Collections::VB::Manifest& manifest,
vbucket_state vbs,
std::function<bool(const std::system_error&)> sysErrorCallback)
SysErrorCallback sysErrorCallback)
: collections(manifest),
proposedVBState(std::move(vbs)),
sysErrorCallback(std::move(sysErrorCallback)) {
Expand Down

0 comments on commit cfc9e99

Please sign in to comment.