Skip to content

Commit

Permalink
fix memory leak reported by address sanitizer
Browse files Browse the repository at this point in the history
New implmentation of the collection cache had circular dependency that
created memory leak, and memory allocated for collections component
cannot be reclaimed
  • Loading branch information
avsej committed Jun 21, 2024
1 parent 1b489a2 commit 91e8599
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 69 deletions.
7 changes: 4 additions & 3 deletions core/bucket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ class bucket_impl
// self->span_->add_tag(tracing::attributes::orphan, "canceled");
return req->try_callback(resp, ec);
}
backoff_and_retry(
req, reason == retry_reason::do_not_retry ? retry_reason::node_not_available : reason);
backoff_and_retry(std::move(req),
reason == retry_reason::do_not_retry ? retry_reason::node_not_available
: reason);
return;
}
key_value_status_code status{ key_value_status_code::unknown };
Expand Down Expand Up @@ -164,7 +165,7 @@ class bucket_impl
} else {
resp = std::make_shared<mcbp::queue_response>(std::move(packet));
}
resolve_response(req, resp, error, reason, std::move(error_info));
resolve_response(std::move(req), std::move(resp), error, reason, std::move(error_info));
}

auto direct_dispatch(std::shared_ptr<mcbp::queue_request> req) -> std::error_code
Expand Down
5 changes: 3 additions & 2 deletions core/cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1921,8 +1921,9 @@ cluster::execute(impl::lookup_in_replica_request request,
auto
cluster::to_string() const -> std::string
{
return fmt::format(R"(#<cluster:{} impl={}>)",
return fmt::format(R"(#<cluster:{} impl={}, use_count={}>)",
static_cast<const void*>(this),
impl_ ? static_cast<const void*>(impl_.get()) : "(none)");
impl_ ? static_cast<const void*>(impl_.get()) : "(none)",
impl_ ? std::to_string(impl_.use_count()) : "(none)");
}
} // namespace couchbase::core
129 changes: 67 additions & 62 deletions core/collections_component.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,66 +37,35 @@
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>

#include <memory>

namespace couchbase::core
{
static auto
build_key(std::string_view scope_name, std::string_view collection_name) -> std::string
{
return fmt::format("%s.%s", scope_name, collection_name);
return fmt::format("{}.{}", scope_name, collection_name);
}

class collection_id_cache_entry_impl
: public std::enable_shared_from_this<collection_id_cache_entry_impl>
, public collection_id_cache_entry
{
public:
collection_id_cache_entry_impl(std::shared_ptr<collections_component_impl> manager,
dispatcher dispatcher,
collection_id_cache_entry_impl(std::weak_ptr<collections_component_impl> manager,
std::string scope_name,
std::string collection_name,
std::size_t max_queue_size,
std::uint32_t id)
: manager_{ std::move(manager) }
, dispatcher_{ std::move(dispatcher) }
, scope_name_{ std::move(scope_name) }
, collection_name_{ std::move(collection_name) }
, max_queue_size_{ max_queue_size }
, id_{ id }
{
}

[[nodiscard]] auto dispatch(std::shared_ptr<mcbp::queue_request> req) -> std::error_code override
{
/*
* if the collection id is unknown then mark the request pending and refresh collection id first
* if it is pending then queue request
* otherwise send the request
*/
switch (std::scoped_lock lock(mutex_); id_) {
case unknown_collection_id:
CB_LOG_DEBUG(
"collection {}.{} unknown. refreshing id", req->scope_name_, req->collection_id_);
id_ = pending_collection_id;

if (auto ec = refresh_collection_id(req); ec) {
id_ = unknown_collection_id;
return ec;
}
return {};

case pending_collection_id:
CB_LOG_DEBUG("collection {}.{} pending. queueing request OP={}",
req->scope_name_,
req->collection_id_,
req->command_);
return queue_->push(req, max_queue_size_);

default:
break;
}

return send_with_collection_id(std::move(req));
}
[[nodiscard]] auto dispatch(std::shared_ptr<mcbp::queue_request> req) -> std::error_code override;

void reset_id() override
{
Expand Down Expand Up @@ -137,24 +106,6 @@ class collection_id_cache_entry_impl
return {};
}

[[nodiscard]] auto send_with_collection_id(std::shared_ptr<mcbp::queue_request> req)
-> std::error_code
{
if (auto ec = assign_collection_id(req); ec) {
CB_LOG_DEBUG("failed to set collection ID \"{}.{}\" on request (OP={}): {}",
req->scope_name_,
req->collection_name_,
req->command_,
ec.message());
return ec;
}

if (auto ec = dispatcher_.direct_dispatch(req); ec) {
return ec;
}
return {};
}

[[nodiscard]] auto refresh_collection_id(std::shared_ptr<mcbp::queue_request> req)
-> std::error_code;

Expand All @@ -167,8 +118,9 @@ class collection_id_cache_entry_impl
}

private:
const std::shared_ptr<collections_component_impl> manager_;
const dispatcher dispatcher_;
// Using std::weak_ptr here as lifetime of the entry is bound to the lifetime
// of the component, and we want to avoid memory leaks due to circular dependencies.
const std::weak_ptr<collections_component_impl> manager_;
const std::string scope_name_;
const std::string collection_name_;
const std::size_t max_queue_size_;
Expand Down Expand Up @@ -201,7 +153,6 @@ class collections_component_impl : public std::enable_shared_from_this<collectio
return it->second;
}
auto entry = std::make_shared<collection_id_cache_entry_impl>(shared_from_this(),
dispatcher_,
std::move(scope_name),
std::move(collection_name),
max_queue_size_,
Expand Down Expand Up @@ -229,7 +180,6 @@ class collections_component_impl : public std::enable_shared_from_this<collectio

cache_.try_emplace(key,
std::make_shared<collection_id_cache_entry_impl>(shared_from_this(),
dispatcher_,
std::move(scope_name),
std::move(collection_name),
max_queue_size_,
Expand Down Expand Up @@ -327,6 +277,17 @@ class collections_component_impl : public std::enable_shared_from_this<collectio
return req;
}

auto direct_re_queue(std::shared_ptr<mcbp::queue_request> request,
bool is_retry) -> std::error_code
{
return dispatcher_.direct_re_queue(std::move(request), is_retry);
}

auto direct_dispatch(std::shared_ptr<mcbp::queue_request> request) -> std::error_code
{
return dispatcher_.direct_dispatch(std::move(request));
}

auto dispatch(std::shared_ptr<mcbp::queue_request> request)
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
{
Expand Down Expand Up @@ -358,6 +319,50 @@ class collections_component_impl : public std::enable_shared_from_this<collectio
mutable std::mutex cache_mutex_{};
};

[[nodiscard]] auto
collection_id_cache_entry_impl::dispatch(std::shared_ptr<mcbp::queue_request> req)
-> std::error_code
{
/*
* if the collection id is unknown then mark the request pending and refresh collection id first
* if it is pending then queue request
* otherwise send the request
*/
switch (std::scoped_lock lock(mutex_); id_) {
case unknown_collection_id:
CB_LOG_DEBUG(
"collection {}.{} unknown. refreshing id", req->scope_name_, req->collection_id_);
id_ = pending_collection_id;

if (auto ec = refresh_collection_id(req); ec) {
id_ = unknown_collection_id;
return ec;
}
return {};

case pending_collection_id:
CB_LOG_DEBUG("collection {}.{} pending. queueing request OP={}",
req->scope_name_,
req->collection_id_,
req->command_);
return queue_->push(req, max_queue_size_);

default:
break;
}

if (auto ec = assign_collection_id(req); ec) {
CB_LOG_DEBUG("failed to set collection ID \"{}.{}\" on request (OP={}): {}",
req->scope_name_,
req->collection_name_,
req->command_,
ec.message());
return ec;
}

return manager_.lock()->direct_dispatch(std::move(req));
}

auto
collection_id_cache_entry_impl::refresh_collection_id(std::shared_ptr<mcbp::queue_request> req)
-> std::error_code
Expand All @@ -367,7 +372,7 @@ collection_id_cache_entry_impl::refresh_collection_id(std::shared_ptr<mcbp::queu
}

CB_LOG_DEBUG("refreshing collection ID for \"{}.{}\"", req->scope_name_, req->collection_name_);
auto op = manager_->get_collection_id(
auto op = manager_.lock()->get_collection_id(
req->scope_name_,
req->collection_name_,
get_collection_id_options{},
Expand All @@ -384,7 +389,7 @@ collection_id_cache_entry_impl::refresh_collection_id(std::shared_ptr<mcbp::queu
req->collection_name_);
self->set_id(unknown_collection_id);
if (self->queue_->remove(req)) {
if (self->manager_->handle_collection_unknown(req)) {
if (self->manager_.lock()->handle_collection_unknown(req)) {
return;
}
} else {
Expand All @@ -401,7 +406,7 @@ collection_id_cache_entry_impl::refresh_collection_id(std::shared_ptr<mcbp::queu
}
// There was an error getting this collection ID so lets remove the cache from the manager
// and try to callback on all the queued requests.
self->manager_->remove(req->scope_name_, req->collection_name_);
self->manager_.lock()->remove(req->scope_name_, req->collection_name_);
auto queue = self->swap_queue();
queue->close();
return queue->drain([ec](auto r) {
Expand All @@ -426,7 +431,7 @@ collection_id_cache_entry_impl::refresh_collection_id(std::shared_ptr<mcbp::queu
ec.message());
return;
}
self->dispatcher_.direct_re_queue(r, false);
self->manager_.lock()->direct_re_queue(r, false);
});
});
if (op) {
Expand Down
2 changes: 1 addition & 1 deletion core/range_scan_orchestrator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class range_scan_orchestrator_impl
vbucket < gsl::narrow_cast<std::uint16_t>(self->vbucket_map_.size());
++vbucket) {
const range_scan_create_options create_options{
self->scope_name_, {},
self->scope_name_, self->collection_name_,
self->scan_type_, self->options_.timeout,
self->collection_id_, self->vbucket_to_snapshot_requirements_[vbucket],
self->options_.ids_only, self->options_.retry_strategy,
Expand Down
2 changes: 1 addition & 1 deletion test/utils/wait_until.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ wait_until_collection_manifest_propagated(const couchbase::core::cluster& cluste
auto deadline = std::chrono::system_clock::now() + total_timeout;
while (std::chrono::system_clock::now() < deadline) {
auto propagated = test::utils::wait_until(
[cluster, bucket_name, current_manifest_uid, round, successful_rounds]() {
[&cluster, bucket_name, current_manifest_uid, round, successful_rounds]() {
couchbase::core::operations::management::collections_manifest_get_request req{
{ bucket_name, "_default", "_default", "" }
};
Expand Down

0 comments on commit 91e8599

Please sign in to comment.