Skip to content

Commit

Permalink
Add WeakPtrBag
Browse files Browse the repository at this point in the history
An unordered collection which is implemented as a thread-safe vector of
weak_ptrs.

The motivation for this is to have a method like 'getAllExistingTasks',
such that we can orchestrate the execution of tasks relevant to unified
caching across all buckets without making the core aware of the
individual tasks' existence.

Change-Id: Id91a8153611909c300c2daeafea1fd8bcee6dec4
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/181794
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
veselink1 authored and daverigby committed Oct 27, 2022
1 parent 6f97685 commit 6fea6cd
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 0 deletions.
1 change: 1 addition & 0 deletions engines/ep/tests/CMakeLists.txt
Expand Up @@ -117,6 +117,7 @@ cb_add_test_executable(ep-engine_ep_unit_tests
module_tests/tagged_ptr_test.cc
module_tests/task_concurrency_test.cc
module_tests/test_helpers.cc
module_tests/utilities/weak_ptr_bag_test.cc
module_tests/vbucket_test.cc
module_tests/vbucket_durability_test.cc
module_tests/vb_ready_queue_test.cc
Expand Down
76 changes: 76 additions & 0 deletions engines/ep/tests/module_tests/utilities/weak_ptr_bag_test.cc
@@ -0,0 +1,76 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2022-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/

#include "utilities/weak_ptr_bag.h"

#include <folly/portability/GTest.h>

/**
* Check that storing in the bag does not strongly reference objects.
*/
TEST(WeakPtrBagTest, BagReferencesWeakly) {
WeakPtrBag<int> bag;
auto n1 = std::make_shared<int>();

bag.push(n1);

ASSERT_TRUE(n1.unique());
ASSERT_EQ(1, bag.getNonExpired().size());
}

TEST(WeakPtrBagTest, CompactRemovesExpiredItems) {
WeakPtrBag<int> bag;
auto n1 = std::make_shared<int>();
bag.push(n1);
auto n2 = std::make_shared<int>();
bag.push(n2);

n1.reset();

auto removedCount = bag.compact();
ASSERT_EQ(1, removedCount);
}

/**
* Add two items and check that we can get them back
*/
TEST(WeakPtrBagTest, Push) {
WeakPtrBag<int> bag;
auto n1 = std::make_shared<int>();
bag.push(n1);
auto n2 = std::make_shared<int>();
bag.push(n2);

auto nonExpired = bag.getNonExpired();
ASSERT_EQ(2, nonExpired.size());
ASSERT_NE(std::find(nonExpired.begin(), nonExpired.end(), n1),
nonExpired.end());
ASSERT_NE(std::find(nonExpired.begin(), nonExpired.end(), n2),
nonExpired.end());
}

/**
* Add two items, expire one and check that we only get the other one back.
*/
TEST(WeakPtrBagTest, PushAndExpire) {
WeakPtrBag<int> bag;
auto n1 = std::make_shared<int>();
bag.push(n1);
auto n2 = std::make_shared<int>();
bag.push(n2);

// Expire one of the items
n1.reset();

auto nonExpired = bag.getNonExpired();
ASSERT_EQ(1, nonExpired.size());
ASSERT_EQ(nonExpired[0], n2);
}
94 changes: 94 additions & 0 deletions utilities/weak_ptr_bag.h
@@ -0,0 +1,94 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2022-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/

#pragma once

#include <folly/Synchronized.h>
#include <algorithm>
#include <memory>
#include <mutex>
#include <vector>

/**
* A thread-safe unordered collection of weakly-referenced objects.
*
* The contained list of weak pointers is compacted automatically.
*/
template <typename T, typename Mutex = std::mutex>
class WeakPtrBag {
public:
/**
* Adds an item into the bag. The item will be weakly-referenced, so
* it might not be returned by a call to getItems if its use count reached
* zero before that.
*/
void push(std::shared_ptr<T> ptr) {
items.withLock([ptr = std::move(ptr)](auto& locked) {
compact(locked);
locked.push_back(ptr);
});
}

/**
* Returns strong references to the items in the bag which have a non-zero
* use count.
*/
std::vector<std::shared_ptr<T>> getNonExpired() const {
// Compact the list of tasks and return the once which are alive.
return items.withLock([](auto& locked) {
compact(locked);

std::vector<std::shared_ptr<T>> alive;
alive.reserve(locked.size());

for (auto&& weak : locked) {
if (auto strong = weak.lock(); strong) {
alive.push_back(std::move(strong));
}
}

return alive;
});
}

/**
* Compacts the list of weak pointers by removing the expired weak pointers
* from it.
*
* Similarly to vector::shrink_to_fit, this method might release the unused
* capacity.
*
* @returns The number of removed weak pointers.
*/
size_t compact() {
return items.withLock([](auto& locked) {
auto initialSize = locked.size();
compact(locked);
// Manual compact() -- there might be a reason why it was called
// manually. Ask the implementation to consider reducing the
// capacity of the vector.
locked.shrink_to_fit();
return initialSize - locked.size();
});
}

private:
static void compact(std::vector<std::weak_ptr<T>>& ptrs) {
ptrs.erase(std::partition(ptrs.begin(),
ptrs.end(),
[](auto& ptr) { return !ptr.expired(); }),
ptrs.end());
}

// The list of weak_ptrs. Mutable because we want to compact the list on
// access.
mutable folly::Synchronized<std::vector<std::weak_ptr<T>>, Mutex> items;
};

0 comments on commit 6fea6cd

Please sign in to comment.