Skip to content

Commit

Permalink
CXXCBC-328: fix restarting MCBP sessions
Browse files Browse the repository at this point in the history
Instead of letting session restarting itself, only allow it to removed
itself from the bucket. And in the end, just notify the bucket to check
configuration and establish connections to the endpoints that don't have
it yet.
  • Loading branch information
avsej committed May 11, 2023
1 parent 21802c6 commit 42d9207
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 104 deletions.
142 changes: 62 additions & 80 deletions core/bucket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "core/mcbp/codec.hxx"
#include "dispatcher.hxx"
#include "impl/bootstrap_state_listener.hxx"
#include "mcbp/completion_token.hxx"
#include "mcbp/operation_queue.hxx"
#include "mcbp/queue_request.hxx"
#include "mcbp/queue_response.hxx"
Expand Down Expand Up @@ -303,82 +302,71 @@ class bucket_impl
return config_->map_key(key, node_index);
}

void restart_node(std::size_t index, const std::string& hostname, const std::string& port)
void restart_sessions()
{
if (closed_) {
CB_LOG_DEBUG(R"({} requested to restart session, but the bucket has been closed already. idx={}, address="{}:{}")",
log_prefix_,
index,
hostname,
port);
return;
}
{
std::scoped_lock lock(config_mutex_);
if (!config_->has_node(origin_.options().network, service_type::key_value, origin_.options().enable_tls, hostname, port)) {
CB_LOG_TRACE(
R"({} requested to restart session, but the node has been ejected from current configuration already. idx={}, network={}, address="{}:{}")",
log_prefix_,
index,
origin_.options().network,
hostname,
port);
return;
std::scoped_lock lock(config_mutex_, sessions_mutex_);

for (const auto& node : config_->nodes) {
if (sessions_.find(node.index) != sessions_.end()) {
continue;
}
}
couchbase::core::origin origin(origin_.credentials(), hostname, port, origin_.options());

io::mcbp_session session = origin_.options().enable_tls
? io::mcbp_session(client_id_, ctx_, tls_, origin, state_listener_, name_, known_features_)
: io::mcbp_session(client_id_, ctx_, origin, state_listener_, name_, known_features_);
const auto& hostname = node.hostname_for(origin_.options().network);
auto port = node.port_or(origin_.options().network, service_type::key_value, origin_.options().enable_tls, 0);
if (port == 0) {
continue;
}

std::scoped_lock lock(sessions_mutex_);
if (auto ptr = sessions_.find(index); ptr == sessions_.end()) {
CB_LOG_DEBUG(R"({} requested to restart session idx={}, which does not exist yet, initiate new one id="{}", address="{}:{}")",
log_prefix_,
index,
session.id(),
hostname,
port);
} else {
const auto& old_session = ptr->second;
auto old_id = old_session.id();
sessions_.erase(ptr);
Expects(sessions_.count(index) == 0);
CB_LOG_DEBUG(R"({} restarting session idx={}, id=("{}" -> "{}"), address="{}:{}")",
couchbase::core::origin origin(origin_.credentials(), hostname, port, origin_.options());
io::mcbp_session session = origin_.options().enable_tls
? io::mcbp_session(client_id_, ctx_, tls_, origin, state_listener_, name_, known_features_)
: io::mcbp_session(client_id_, ctx_, origin, state_listener_, name_, known_features_);
CB_LOG_DEBUG(R"({} rev={}, restart idx={}, session="{}", address="{}:{}")",
log_prefix_,
index,
old_id,
node.index,
config_->rev_str(),
session.id(),
hostname,
port);
session.bootstrap(
[self = shared_from_this(), session](std::error_code err, topology::configuration cfg) mutable {
if (err) {
return self->remove_session(session.id());
}
self->update_config(std::move(cfg));
session.on_configuration_update(self);
session.on_stop([id = session.id(), self]() { self->remove_session(id); });
self->drain_deferred_queue();
},
true);
sessions_.insert_or_assign(node.index, std::move(session));
}

session.bootstrap(
[self = shared_from_this(), session, this_index = index, hostname, port](std::error_code ec,
const topology::configuration& config) mutable {
if (self->closed_) {
asio::post(asio::bind_executor(
self->ctx_, [session = std::move(session)]() mutable { return session.stop(retry_reason::do_not_retry); }));
return;
}
if (ec) {
CB_LOG_WARNING(R"({} failed to restart session idx={}, ec={})", session.log_prefix(), this_index, ec.message());
self->restart_node(this_index, hostname, port);
return;
}
session.on_configuration_update(self);
session.on_stop([this_index, hostname, port, self](retry_reason reason) {
if (reason == retry_reason::socket_closed_while_in_flight) {
self->restart_node(this_index, hostname, port);
}
});
return;
}

void remove_session(const std::string& id)
{
bool found{ false };
std::scoped_lock lock(sessions_mutex_);
for (auto ptr = sessions_.cbegin(); ptr != sessions_.cend();) {
if (ptr->second.id() == id) {
CB_LOG_DEBUG(R"({} removed session id="{}", address="{}", bootstrap_address="{}:{}")",
log_prefix_,
ptr->second.id(),
ptr->second.remote_address(),
ptr->second.bootstrap_hostname(),
ptr->second.bootstrap_port());
ptr = sessions_.erase(ptr);
found = true;
} else {
ptr = std::next(ptr);
}
}

self->update_config(config);
self->drain_deferred_queue();
},
true);
sessions_.insert_or_assign(index, std::move(session));
if (found) {
asio::post(asio::bind_executor(ctx_, [self = shared_from_this()]() { return self->restart_sessions(); }));
}
}

void bootstrap(utils::movable_function<void(std::error_code, topology::configuration)>&& handler)
Expand All @@ -396,12 +384,7 @@ class bucket_impl
} else {
const std::size_t this_index = new_session.index();
new_session.on_configuration_update(self);
new_session.on_stop([this_index, hostname = new_session.bootstrap_hostname(), port = new_session.bootstrap_port(), self](
retry_reason reason) {
if (reason == retry_reason::socket_closed_while_in_flight) {
self->restart_node(this_index, hostname, port);
}
});
new_session.on_stop([id = new_session.id(), self]() { self->remove_session(id); });

{
std::scoped_lock lock(self->sessions_mutex_);
Expand Down Expand Up @@ -589,16 +572,15 @@ class bucket_impl
if (!err) {
self->update_config(std::move(cfg));
session.on_configuration_update(self);
session.on_stop(
[index = session.index(), hostname = session.bootstrap_hostname(), port = session.bootstrap_port(), self](
retry_reason reason) {
if (reason == retry_reason::socket_closed_while_in_flight) {
self->restart_node(index, hostname, port);
}
});
session.on_stop([id = session.id(), self]() { self->remove_session(id); });
self->drain_deferred_queue();
} else if (err == errc::common::unambiguous_timeout && forced_config) {
self->restart_node(idx, session.bootstrap_hostname(), session.bootstrap_port());
CB_LOG_WARNING(R"({} failed to bootstrap session idx={}, id="{}", ec={})",
session.log_prefix(),
idx,
session.id(),
err.message());
self->remove_session(session.id());
}
},
true);
Expand Down
7 changes: 5 additions & 2 deletions core/bucket.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ class bucket
return defer_command([self = shared_from_this(), cmd]() { self->map_and_send(cmd); });
}
if (session->is_stopped()) {
CB_LOG_TRACE(
R"({} the session has been found, but it is stopped, retrying id={}, session={})", log_prefix(), cmd->id_, session->id());
CB_LOG_TRACE(R"({} the session has been found for idx={}, but it is stopped, retrying id={}, session={})",
log_prefix(),
index,
cmd->id_,
session->id());
return io::retry_orchestrator::maybe_retry(
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
}
Expand Down
2 changes: 1 addition & 1 deletion core/cluster.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ class cluster : public std::enable_shared_from_this<cluster>
}
self->session_manager_->set_configuration(config, self->origin_.options());
self->session_->on_configuration_update(self->session_manager_);
self->session_->on_stop([self](retry_reason) {
self->session_->on_stop([self]() {
if (self->session_) {
self->session_.reset();
}
Expand Down
24 changes: 6 additions & 18 deletions core/io/mcbp_session.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ class mcbp_session_impl
return stopped_;
}

void on_stop(utils::movable_function<void(retry_reason)> handler)
void on_stop(utils::movable_function<void()> handler)
{
on_stop_handler_ = std::move(handler);
}
Expand Down Expand Up @@ -863,7 +863,7 @@ class mcbp_session_impl
config_listeners_.clear();
state_ = diag::endpoint_state::disconnected;
if (auto on_stop = std::move(on_stop_handler_); on_stop) {
on_stop(reason);
on_stop();
}
}

Expand Down Expand Up @@ -1462,7 +1462,7 @@ class mcbp_session_impl
std::mutex command_handlers_mutex_{};
std::map<std::uint32_t, command_handler> command_handlers_{};
std::vector<std::shared_ptr<config_listener>> config_listeners_{};
utils::movable_function<void(retry_reason)> on_stop_handler_{};
utils::movable_function<void()> on_stop_handler_{};

std::atomic_bool bootstrapped_{ false };
std::atomic_bool stopped_{ false };
Expand Down Expand Up @@ -1582,22 +1582,10 @@ mcbp_session::supports_feature(protocol::hello_feature feature)
return impl_->supports_feature(feature);
}

// const std::string&
// mcbp_session::id() const
// {
// return impl_->id();
// }
std::string
const std::string&
mcbp_session::id() const
{
if (impl_) {
return fmt::format("{}, {}, {}, refcnt={}",
reinterpret_cast<const void*>(this),
reinterpret_cast<const void*>(impl_.get()),
impl_->id(),
impl_.use_count());
}
return fmt::format("{}, nullptr", reinterpret_cast<const void*>(this));
return impl_->id();
}

std::string
Expand Down Expand Up @@ -1637,7 +1625,7 @@ mcbp_session::bootstrap(utils::movable_function<void(std::error_code, topology::
}

void
mcbp_session::on_stop(utils::movable_function<void(retry_reason)> handler)
mcbp_session::on_stop(utils::movable_function<void()> handler)
{
return impl_->on_stop(std::move(handler));
}
Expand Down
5 changes: 2 additions & 3 deletions core/io/mcbp_session.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ class mcbp_session
[[nodiscard]] mcbp_context context() const;
[[nodiscard]] bool supports_feature(protocol::hello_feature feature);
[[nodiscard]] std::vector<protocol::hello_feature> supported_features() const;
//[[nodiscard]] const std::string& id() const;
[[nodiscard]] std::string id() const;
[[nodiscard]] const std::string& id() const;
[[nodiscard]] std::string remote_address() const;
[[nodiscard]] std::string local_address() const;
[[nodiscard]] const std::string& bootstrap_hostname() const;
Expand All @@ -111,7 +110,7 @@ class mcbp_session
void write_and_subscribe(std::uint32_t opaque, std::vector<std::byte>&& data, command_handler&& handler);
void bootstrap(utils::movable_function<void(std::error_code, topology::configuration)>&& handler,
bool retry_on_bucket_not_found = false);
void on_stop(utils::movable_function<void(retry_reason)> handler);
void on_stop(utils::movable_function<void()> handler);
void stop(retry_reason reason);
[[nodiscard]] std::size_t index() const;
[[nodiscard]] bool has_config() const;
Expand Down

0 comments on commit 42d9207

Please sign in to comment.