Skip to content

Commit

Permalink
MB-46740: [refactor] CM::getNumItemsForCursor takes Cursor&
Browse files Browse the repository at this point in the history
Change-Id: I1ae0f591cc11706bdfdc3dd9aec04a7771b54236
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/180592
Tested-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
  • Loading branch information
paolococchi committed Oct 4, 2022
1 parent e9eda5d commit 76bccb2
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 69 deletions.
43 changes: 22 additions & 21 deletions engines/ep/src/checkpoint_manager.cc
Expand Up @@ -1190,32 +1190,33 @@ size_t CheckpointManager::getNumOpenChkItems() const {
}

size_t CheckpointManager::getNumItemsForCursor(
const CheckpointCursor* cursor) const {
const CheckpointCursor& cursor) const {
std::lock_guard<std::mutex> lh(queueLock);
return getNumItemsForCursor(lh, cursor);
}

size_t CheckpointManager::getNumItemsForCursor(
const std::lock_guard<std::mutex>& lh,
const CheckpointCursor* cursor) const {
if (cursor && cursor->valid()) {
size_t items = cursor->getRemainingItemsInCurrentCheckpoint();
CheckpointList::const_iterator chkptIterator(cursor->getCheckpoint());
if (chkptIterator != checkpointList.end()) {
++chkptIterator;
}

// Now add the item counts for all the subsequent checkpoints
auto result = std::accumulate(
chkptIterator,
checkpointList.end(),
items,
[](size_t a, const std::unique_ptr<Checkpoint>& b) {
return a + b->getNumItems();
});
return result;
}
return 0;
const CheckpointCursor& cursor) const {
if (!cursor.valid()) {
return 0;
}

// Items from the current checkpoint..
size_t items = cursor.getRemainingItemsInCurrentCheckpoint();
CheckpointList::const_iterator chkptIterator(cursor.getCheckpoint());
if (chkptIterator != checkpointList.end()) {
++chkptIterator;
}
// .. plus the items for all the subsequent checkpoints
auto result =
std::accumulate(chkptIterator,
checkpointList.end(),
items,
[](size_t a, const std::unique_ptr<Checkpoint>& b) {
return a + b->getNumItems();
});
return result;
}

void CheckpointManager::clear(std::optional<uint64_t> seqno) {
Expand Down Expand Up @@ -1494,7 +1495,7 @@ void CheckpointManager::addStats(const AddStatFn& add_stat,
vbucketId.get(),
cursor.second->getName().c_str());
add_casted_stat(buf.data(),
getNumItemsForCursor(lh, cursor.second.get()),
getNumItemsForCursor(lh, *cursor.second),
add_stat,
cookie);
}
Expand Down
14 changes: 8 additions & 6 deletions engines/ep/src/checkpoint_manager.h
Expand Up @@ -345,14 +345,16 @@ class CheckpointManager {
size_t getNumOpenChkItems() const;

/**
* Returns the count of all items (empty item excluded) that the given
* cursor has yet to process (i.e. between the cursor's current position and
* the end of the last checkpoint).
* @param cursor
* @return the count of all items (empty item excluded) that the given
* cursor has yet to process (i.e. between the cursor's current position
* and the end of the last checkpoint).
*/
size_t getNumItemsForCursor(const CheckpointCursor* cursor) const;
size_t getNumItemsForCursor(const CheckpointCursor& cursor) const;

size_t getNumItemsForPersistence() const {
return getNumItemsForCursor(persistenceCursor);
Expects(persistenceCursor);
return getNumItemsForCursor(*persistenceCursor);
}

/**
Expand Down Expand Up @@ -685,7 +687,7 @@ class CheckpointManager {
* @return number of items to be processed
*/
size_t getNumItemsForCursor(const std::lock_guard<std::mutex>& lh,
const CheckpointCursor* cursor) const;
const CheckpointCursor& cursor) const;

/**
* Clears this CM, effectively removing all checkpoints in the list and
Expand Down
4 changes: 2 additions & 2 deletions engines/ep/src/dcp/active_stream.cc
Expand Up @@ -866,7 +866,7 @@ void ActiveStream::addTakeoverStats(const AddStatFn& add_stat,
size_t chk_items = 0;
auto sp = cursor.lock();
if (vb_items > 0 && sp) {
chk_items = vb.checkpointManager->getNumItemsForCursor(sp.get());
chk_items = vb.checkpointManager->getNumItemsForCursor(*sp);
}

size_t del_items = 0;
Expand Down Expand Up @@ -2214,7 +2214,7 @@ size_t ActiveStream::getItemsRemaining() {
// (b) Items pending in our readyQ
size_t ckptItems = 0;
if (auto sp = cursor.lock()) {
ckptItems = vbucket->checkpointManager->getNumItemsForCursor(sp.get());
ckptItems = vbucket->checkpointManager->getNumItemsForCursor(*sp);
}

// Note: concurrent access to readyQ guarded by streamMutex
Expand Down

0 comments on commit 76bccb2

Please sign in to comment.