Skip to content

Commit

Permalink
CXXCBC-328: fix restarting MCBP sessions (#406)
Browse files Browse the repository at this point in the history
Fix socket reconnection during rebalance process. There are several improvements has been implemented to make the library resilient to rapid topology changes (in particular in Cloud environment) when both DNS-SRV bootstrap is being used along with alternative addresses. The changes include:
  * take into account alternative hostname and ports during detection of added/removed nodes on configuration update
  * replace node index tracking with hostname/port matching to when restarting the connections, this way we will be     sure that no duplicate connections will be left, or live connections replaced by restarted session.
  * improved logging of critial events during rebalance: restarting, preservation and removing connections.

Tools
-----

* Added `--verbose` switch to `cbc-get` and `cbc-pillowfight` that will report error contexts of every failed operation   to standard error stream in JSON format. It might be useful to collect/diagnose errors of the long running workloads.

* Added batching switches `--batch-size` and `--batch-wait` to `cbc-pillowfight`. `--batch-size` allows to change number   of operations to be scheduled at once before waiting for completion. `--batch-wait` allows to insert delays between   batches in the workload thread.

* Added `--number-of-keys-to-populate` switch, which allow to preload set of keys before running the workload. The   switch sets number of the keys per thread.
  • Loading branch information
avsej committed May 23, 2023
1 parent cb25284 commit adc416d
Show file tree
Hide file tree
Showing 23 changed files with 692 additions and 324 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ set(couchbase_cxx_client_FILES
core/impl/internal_search_row_locations.cxx
core/impl/internal_term_facet_result.cxx
core/impl/key_value_error_category.cxx
core/impl/key_value_error_context.cxx
core/impl/lookup_in.cxx
core/impl/management_error_category.cxx
core/impl/match_all_query.cxx
Expand All @@ -322,6 +323,7 @@ set(couchbase_cxx_client_FILES
core/impl/prepend.cxx
core/impl/query.cxx
core/impl/query_error_category.cxx
core/impl/query_error_context.cxx
core/impl/query_string_query.cxx
core/impl/regexp_query.cxx
core/impl/remove.cxx
Expand Down
335 changes: 183 additions & 152 deletions core/bucket.cxx

Large diffs are not rendered by default.

21 changes: 17 additions & 4 deletions core/bucket.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class bucket
auto cmd = std::make_shared<operations::mcbp_command<bucket, Request>>(ctx_, shared_from_this(), request, default_timeout());
cmd->start([cmd, handler = std::forward<Handler>(handler)](std::error_code ec, std::optional<io::mcbp_message>&& msg) mutable {
using encoded_response_type = typename Request::encoded_response_type;
std::uint16_t status_code = msg ? msg->header.status() : 0U;
std::uint16_t status_code = msg ? msg->header.status() : 0xffffU;
auto resp = msg ? encoded_response_type(std::move(*msg)) : encoded_response_type{};
auto ctx = make_key_value_error_context(ec, status_code, cmd, resp);
handler(cmd->request.make_response(std::move(ctx), std::move(resp)));
Expand All @@ -107,7 +107,7 @@ class bucket
auto [partition, server] = map_id(cmd->request.id);
if (!server.has_value()) {
CB_LOG_TRACE(
R"({} unable to map key=\"{}\" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition);
R"({} unable to map key="{}" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition);
return io::retry_orchestrator::maybe_retry(
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
}
Expand All @@ -116,19 +116,32 @@ class bucket
}
auto session = find_session_by_index(index);
if (!session || !session->has_config()) {
CB_LOG_TRACE(R"({} defer operation id={}, session={}, has_config={})",
CB_LOG_TRACE(R"({} defer operation id={}, key="{}", partition={}, index={}, session={}, address="{}", has_config={})",
log_prefix(),
cmd->id_,
cmd->request.id,
cmd->request.partition,
index,
session.has_value(),
session.has_value() ? session->bootstrap_address() : "",
session.has_value() && session->has_config());
return defer_command([self = shared_from_this(), cmd]() { self->map_and_send(cmd); });
}
if (session->is_stopped()) {
CB_LOG_TRACE(
R"({} the session has been found, but it is stopped, retrying id={}, session={})", log_prefix(), cmd->id_, session->id());
R"({} the session has been found for idx={}, but it is stopped, retrying id={}, key="{}", partition={}, session={}, address="{}")",
log_prefix(),
index,
cmd->id_,
cmd->request.id,
cmd->request.partition,
session->id(),
session->bootstrap_address());
return io::retry_orchestrator::maybe_retry(
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
}
cmd->last_dispatched_from_ = session->local_address();
cmd->last_dispatched_to_ = session->bootstrap_address();
cmd->send_to(session.value());
}

Expand Down
2 changes: 1 addition & 1 deletion core/cluster.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ class cluster : public std::enable_shared_from_this<cluster>
}
self->session_manager_->set_configuration(config, self->origin_.options());
self->session_->on_configuration_update(self->session_manager_);
self->session_->on_stop([self](retry_reason) {
self->session_->on_stop([self]() {
if (self->session_) {
self->session_.reset();
}
Expand Down
3 changes: 2 additions & 1 deletion core/error_context/key_value.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ key_value_error_context
make_key_value_error_context(std::error_code ec, const document_id& id)
{
return {
ec, {}, {}, 0, {}, id.key(), id.bucket(), id.scope(), id.collection(), 0, {}, {}, {}, {},
{}, ec, {}, {}, 0, {}, id.key(), id.bucket(), id.scope(), id.collection(), 0, {}, {}, {}, {},
};
}

Expand All @@ -36,6 +36,7 @@ make_subdocument_error_context(const key_value_error_context& ctx,
bool deleted)
{
return {
ctx.operation_id(),
ec,
ctx.last_dispatched_to(),
ctx.last_dispatched_from(),
Expand Down
22 changes: 10 additions & 12 deletions core/error_context/key_value.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,21 @@ make_key_value_error_context(std::error_code ec, std::uint16_t status_code, cons
const auto& scope = command->request.id.scope();
const auto& bucket = command->request.id.bucket();
std::uint32_t opaque = (ec && response.opaque() == 0) ? command->request.opaque : response.opaque();
auto status = response.status();
auto retry_attempts = command->request.retries.retry_attempts();
auto retry_reasons = command->request.retries.retry_reasons();
std::optional<std::string> last_dispatched_from{};
std::optional<std::string> last_dispatched_to{};
std::optional<key_value_status_code> status{};
std::optional<key_value_error_map_info> error_map_info{};
if (command->session_) {
last_dispatched_from = command->session_->local_address();
last_dispatched_to = command->session_->remote_address();
if (status_code) {
if (status_code != 0xffffU) {
status = response.status();
if (command->session_ && status_code > 0) {
error_map_info = command->session_->decode_error_code(status_code);
}
}
auto retry_attempts = command->request.retries.retry_attempts();
auto retry_reasons = command->request.retries.retry_reasons();

return { ec,
std::move(last_dispatched_from),
std::move(last_dispatched_to),
return { command->id_,
ec,
command->last_dispatched_to_,
command->last_dispatched_from_,
retry_attempts,
std::move(retry_reasons),
key,
Expand Down
98 changes: 98 additions & 0 deletions core/impl/key_value_error_context.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2020-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <couchbase/key_value_error_context.hxx>

#include <couchbase/fmt/cas.hxx>
#include <couchbase/fmt/key_value_error_map_attribute.hxx>
#include <couchbase/fmt/key_value_status_code.hxx>
#include <couchbase/fmt/retry_reason.hxx>

#include <tao/json/to_string.hpp>

namespace couchbase
{
auto
key_value_error_context::to_json() const -> std::string
{
tao::json::value json = {
{
"ec",
tao::json::value{
{ "value", ec().value() },
{ "message", ec().message() },
},
},
{ "operation_id", operation_id() },
{ "id", id_ },
{ "bucket", bucket_ },
{ "scope", scope_ },
{ "collection", collection_ },
};

if (auto val = retry_attempts(); val > 0) {
json["retry_attempts"] = val;
}
if (opaque_ > 0) {
json["opaque"] = opaque_;
}

if (!cas_.empty()) {
json["cas"] = fmt::format("{}", cas_);
}

if (const auto& reasons = retry_reasons(); !reasons.empty()) {
tao::json::value reasons_json = tao::json::empty_array;
for (const auto& reason : reasons) {
reasons_json.emplace_back(fmt::format("{}", reason));
}
json["retry_reasons"] = reasons_json;
}
if (const auto& val = last_dispatched_from(); val.has_value()) {
json["last_dispatched_from"] = val.value();
}
if (const auto& val = last_dispatched_to(); val.has_value()) {
json["last_dispatched_to"] = val.value();
}
if (const auto& val = status_code_; val.has_value()) {
json["status_code"] = fmt::format("{}", val.value());
}
if (const auto& val = extended_error_info_; val.has_value()) {
json["extended_error_info"] = tao::json::value{
{ "context", val->context() },
{ "reference", val->reference() },
};
}
if (const auto& val = error_map_info_; val.has_value()) {
tao::json::value info{
{ "code", val->code() },
{ "name", val->name() },
{ "description", val->description() },
};
if (const auto& attributes = val->attributes(); !attributes.empty()) {
tao::json::value attrs_json = tao::json::empty_array;
for (const auto& attr : attributes) {
attrs_json.emplace_back(fmt::format("{}", attr));
}
info["attributes"] = attrs_json;
}
json["error_map_info"] = info;
}

return tao::json::to_string(json, 2);
}
} // namespace couchbase
75 changes: 75 additions & 0 deletions core/impl/query_error_context.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2020-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <couchbase/query_error_context.hxx>

#include <couchbase/fmt/retry_reason.hxx>

#include <tao/json/to_string.hpp>

namespace couchbase
{
auto
query_error_context::to_json() const -> std::string
{
tao::json::value json = {
{
"ec",
tao::json::value{
{ "value", ec().value() },
{ "message", ec().message() },
},
},
{ "operation_id", operation_id() },
{ "retry_attempts", retry_attempts() },
{ "client_context_id", client_context_id_ },
{ "statement", statement_ },
{ "method", statement_ },
{ "path", statement_ },
{ "http_status", http_status_ },
{ "http_body", http_body_ },
{ "hostname", hostname_ },
{ "port", port_ },
};

if (const auto& val = parameters_; val.has_value()) {
json["parameters"] = val.value();
}
if (first_error_code_ > 0) {
json["first_error_code"] = first_error_code_;
}
if (!first_error_message_.empty()) {
json["first_error_message"] = first_error_message_;
}

if (const auto& reasons = retry_reasons(); !reasons.empty()) {
tao::json::value reasons_json = tao::json::empty_array;
for (const auto& reason : reasons) {
reasons_json.emplace_back(fmt::format("{}", reason));
}
json["retry_reasons"] = reasons_json;
}
if (const auto& val = last_dispatched_from(); val.has_value()) {
json["last_dispatched_from"] = val.value();
}
if (const auto& val = last_dispatched_to(); val.has_value()) {
json["last_dispatched_to"] = val.value();
}

return tao::json::to_string(json, 2);
}
} // namespace couchbase
11 changes: 9 additions & 2 deletions core/io/mcbp_command.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
mcbp_command_handler handler_{};
std::shared_ptr<Manager> manager_{};
std::chrono::milliseconds timeout_{};
std::string id_{ uuid::to_string(uuid::random()) };
std::string id_{
fmt::format("{:02x}/{}", static_cast<std::uint8_t>(encoded_request_type::body_type::opcode), uuid::to_string(uuid::random()))
};
std::shared_ptr<couchbase::tracing::request_span> span_{ nullptr };
std::shared_ptr<couchbase::tracing::request_span> parent_span{ nullptr };
std::optional<std::string> last_dispatched_from_{};
std::optional<std::string> last_dispatched_to_{};

mcbp_command(asio::io_context& ctx, std::shared_ptr<Manager> manager, Request req, std::chrono::milliseconds default_timeout)
: deadline(ctx)
Expand Down Expand Up @@ -109,7 +113,10 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
handler_ = nullptr;
}
}
invoke_handler(request.retries.idempotent() ? errc::common::unambiguous_timeout : errc::common::ambiguous_timeout);
invoke_handler(request.retries.idempotent() || !opaque_.has_value()
? errc::common::unambiguous_timeout // safe to retry or has not been sent to the server
: errc::common::ambiguous_timeout // non-idempotent and has been sent to the server
);
}

void invoke_handler(std::error_code ec, std::optional<io::mcbp_message>&& msg = {})
Expand Down

0 comments on commit adc416d

Please sign in to comment.