Skip to content

Commit

Permalink
CXXCBC-531: Fix deadlock in cluster destructor (public API) (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej committed Jun 25, 2024
1 parent f365195 commit 24fded9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
2 changes: 1 addition & 1 deletion core/cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,6 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
bucket->close();
});
self->session_manager_->close();
handler();
self->work_.reset();
if (self->tracer_) {
self->tracer_->stop();
Expand All @@ -785,6 +784,7 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
self->meter_->stop();
}
self->meter_.reset();
handler();
}));
}

Expand Down
67 changes: 44 additions & 23 deletions core/impl/cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
#include "query.hxx"
#include "search.hxx"

#include <asio/bind_executor.hpp>
#include <asio/post.hpp>
#include <couchbase/bucket.hxx>
#include <couchbase/cluster.hxx>

#include <asio/bind_executor.hpp>
#include <asio/post.hpp>

namespace couchbase
{
cluster::cluster(std::shared_ptr<cluster_impl> impl)
Expand Down Expand Up @@ -186,13 +187,30 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
{
std::promise<void> barrier;
auto future = barrier.get_future();
close([&barrier] {

// Spawn new thread to avoid joining IO thread from the same thread
// We cannot use close() method here, as it is capturing self as a shared
// pointer to extend lifetime for the user's callback. Here the reference
// counter has reached zero already, so we can only capture `*this`.
std::thread([this, &barrier]() mutable {
if (auto txns = std::move(transactions_); txns != nullptr) {
// blocks until cleanup is finished
txns->close();
}
std::promise<void> core_stopped;
auto f = core_stopped.get_future();
core_.close([&core_stopped]() {
core_stopped.set_value();
});
f.get();
io_.stop();
if (io_thread_.joinable()) {
io_thread_.join();
}
barrier.set_value();
});
future.get();
}).detach();

io_.stop();
io_thread_.join();
future.get();
}

void open(const std::string& connection_string,
Expand Down Expand Up @@ -313,11 +331,24 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>

void close(core::utils::movable_function<void()> handler)
{
if (auto txns = std::move(transactions_); txns != nullptr) {
// blocks until cleanup is finished
txns->close();
}
return core_.close(std::move(handler));
// Spawn new thread to avoid joining IO thread from the same thread
std::thread([self = shared_from_this(), handler = std::move(handler)]() mutable {
if (auto txns = std::move(self->transactions_); txns != nullptr) {
// blocks until cleanup is finished
txns->close();
}
std::promise<void> barrier;
auto future = barrier.get_future();
self->core_.close([&barrier]() {
barrier.set_value();
});
future.get();
self->io_.stop();
if (self->io_thread_.joinable()) {
self->io_thread_.join();
}
handler();
}).detach();
}

[[nodiscard]] auto core() const -> const core::cluster&
Expand Down Expand Up @@ -496,17 +527,7 @@ cluster::close(std::function<void()>&& handler)
if (!impl_) {
return handler();
}
// Spawn new thread to ensure the IO will be safely shutdown before calling
// destructor
std::thread([impl = impl_, handler = std::move(handler)]() {
auto barrier = std::make_shared<std::promise<void>>();
auto future = barrier->get_future();
impl->close([barrier] {
barrier->set_value();
});
future.get();
handler();
}).detach();
impl_->close(std::move(handler));
}

auto
Expand Down
3 changes: 0 additions & 3 deletions test/test_integration_management.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2159,8 +2159,6 @@ TEST_CASE("integration: user management", "[integration]")
REQUIRE(resp.roles.size() > 0);
}

// FIXME(SA, CXXCBC-531) triggers issues in public cluster destructor
#ifndef COUCHBASE_CXX_CLIENT_BUILD_SANITIZED
if (integration.cluster_version().is_enterprise()) {

SECTION("change user password")
Expand Down Expand Up @@ -2218,7 +2216,6 @@ TEST_CASE("integration: user management", "[integration]")
}
}
}
#endif
}

TEST_CASE("integration: user management collections roles", "[integration]")
Expand Down

0 comments on commit 24fded9

Please sign in to comment.