Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXXCBC-328: fix restarting MCBP sessions #406

Merged
merged 11 commits into from
May 23, 2023
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ set(couchbase_cxx_client_FILES
core/impl/internal_search_row_locations.cxx
core/impl/internal_term_facet_result.cxx
core/impl/key_value_error_category.cxx
core/impl/key_value_error_context.cxx
core/impl/lookup_in.cxx
core/impl/management_error_category.cxx
core/impl/match_all_query.cxx
Expand All @@ -322,6 +323,7 @@ set(couchbase_cxx_client_FILES
core/impl/prepend.cxx
core/impl/query.cxx
core/impl/query_error_category.cxx
core/impl/query_error_context.cxx
core/impl/query_string_query.cxx
core/impl/regexp_query.cxx
core/impl/remove.cxx
Expand Down
335 changes: 183 additions & 152 deletions core/bucket.cxx

Large diffs are not rendered by default.

21 changes: 17 additions & 4 deletions core/bucket.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class bucket
auto cmd = std::make_shared<operations::mcbp_command<bucket, Request>>(ctx_, shared_from_this(), request, default_timeout());
cmd->start([cmd, handler = std::forward<Handler>(handler)](std::error_code ec, std::optional<io::mcbp_message>&& msg) mutable {
using encoded_response_type = typename Request::encoded_response_type;
std::uint16_t status_code = msg ? msg->header.status() : 0U;
std::uint16_t status_code = msg ? msg->header.status() : 0xffffU;
auto resp = msg ? encoded_response_type(std::move(*msg)) : encoded_response_type{};
auto ctx = make_key_value_error_context(ec, status_code, cmd, resp);
handler(cmd->request.make_response(std::move(ctx), std::move(resp)));
Expand All @@ -107,7 +107,7 @@ class bucket
auto [partition, server] = map_id(cmd->request.id);
if (!server.has_value()) {
CB_LOG_TRACE(
R"({} unable to map key=\"{}\" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition);
R"({} unable to map key="{}" to the node, id={}, partition={})", log_prefix(), cmd->request.id, cmd->id_, partition);
return io::retry_orchestrator::maybe_retry(
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
}
Expand All @@ -116,19 +116,32 @@ class bucket
}
auto session = find_session_by_index(index);
if (!session || !session->has_config()) {
CB_LOG_TRACE(R"({} defer operation id={}, session={}, has_config={})",
CB_LOG_TRACE(R"({} defer operation id={}, key="{}", partition={}, index={}, session={}, address="{}", has_config={})",
log_prefix(),
cmd->id_,
cmd->request.id,
cmd->request.partition,
index,
session.has_value(),
session.has_value() ? session->bootstrap_address() : "",
session.has_value() && session->has_config());
return defer_command([self = shared_from_this(), cmd]() { self->map_and_send(cmd); });
}
if (session->is_stopped()) {
CB_LOG_TRACE(
R"({} the session has been found, but it is stopped, retrying id={}, session={})", log_prefix(), cmd->id_, session->id());
R"({} the session has been found for idx={}, but it is stopped, retrying id={}, key="{}", partition={}, session={}, address="{}")",
log_prefix(),
index,
cmd->id_,
cmd->request.id,
cmd->request.partition,
session->id(),
session->bootstrap_address());
return io::retry_orchestrator::maybe_retry(
cmd->manager_, cmd, retry_reason::node_not_available, errc::common::request_canceled);
}
cmd->last_dispatched_from_ = session->local_address();
cmd->last_dispatched_to_ = session->bootstrap_address();
cmd->send_to(session.value());
}

Expand Down
2 changes: 1 addition & 1 deletion core/cluster.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ class cluster : public std::enable_shared_from_this<cluster>
}
self->session_manager_->set_configuration(config, self->origin_.options());
self->session_->on_configuration_update(self->session_manager_);
self->session_->on_stop([self](retry_reason) {
self->session_->on_stop([self]() {
if (self->session_) {
self->session_.reset();
}
Expand Down
3 changes: 2 additions & 1 deletion core/error_context/key_value.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ key_value_error_context
make_key_value_error_context(std::error_code ec, const document_id& id)
{
return {
ec, {}, {}, 0, {}, id.key(), id.bucket(), id.scope(), id.collection(), 0, {}, {}, {}, {},
{}, ec, {}, {}, 0, {}, id.key(), id.bucket(), id.scope(), id.collection(), 0, {}, {}, {}, {},
};
}

Expand All @@ -36,6 +36,7 @@ make_subdocument_error_context(const key_value_error_context& ctx,
bool deleted)
{
return {
ctx.operation_id(),
ec,
ctx.last_dispatched_to(),
ctx.last_dispatched_from(),
Expand Down
22 changes: 10 additions & 12 deletions core/error_context/key_value.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,21 @@ make_key_value_error_context(std::error_code ec, std::uint16_t status_code, cons
const auto& scope = command->request.id.scope();
const auto& bucket = command->request.id.bucket();
std::uint32_t opaque = (ec && response.opaque() == 0) ? command->request.opaque : response.opaque();
auto status = response.status();
auto retry_attempts = command->request.retries.retry_attempts();
auto retry_reasons = command->request.retries.retry_reasons();
std::optional<std::string> last_dispatched_from{};
std::optional<std::string> last_dispatched_to{};
std::optional<key_value_status_code> status{};
std::optional<key_value_error_map_info> error_map_info{};
if (command->session_) {
last_dispatched_from = command->session_->local_address();
last_dispatched_to = command->session_->remote_address();
if (status_code) {
if (status_code != 0xffffU) {
status = response.status();
if (command->session_ && status_code > 0) {
error_map_info = command->session_->decode_error_code(status_code);
}
}
auto retry_attempts = command->request.retries.retry_attempts();
auto retry_reasons = command->request.retries.retry_reasons();

return { ec,
std::move(last_dispatched_from),
std::move(last_dispatched_to),
return { command->id_,
ec,
command->last_dispatched_to_,
command->last_dispatched_from_,
retry_attempts,
std::move(retry_reasons),
key,
Expand Down
98 changes: 98 additions & 0 deletions core/impl/key_value_error_context.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2020-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <couchbase/key_value_error_context.hxx>

#include <couchbase/fmt/cas.hxx>
#include <couchbase/fmt/key_value_error_map_attribute.hxx>
#include <couchbase/fmt/key_value_status_code.hxx>
#include <couchbase/fmt/retry_reason.hxx>

#include <tao/json/to_string.hpp>

namespace couchbase
{
auto
key_value_error_context::to_json() const -> std::string
{
tao::json::value json = {
{
"ec",
tao::json::value{
{ "value", ec().value() },
{ "message", ec().message() },
},
},
{ "operation_id", operation_id() },
{ "id", id_ },
{ "bucket", bucket_ },
{ "scope", scope_ },
{ "collection", collection_ },
};

if (auto val = retry_attempts(); val > 0) {
json["retry_attempts"] = val;
}
if (opaque_ > 0) {
json["opaque"] = opaque_;
}

if (!cas_.empty()) {
json["cas"] = fmt::format("{}", cas_);
}

if (const auto& reasons = retry_reasons(); !reasons.empty()) {
tao::json::value reasons_json = tao::json::empty_array;
for (const auto& reason : reasons) {
reasons_json.emplace_back(fmt::format("{}", reason));
}
json["retry_reasons"] = reasons_json;
}
if (const auto& val = last_dispatched_from(); val.has_value()) {
json["last_dispatched_from"] = val.value();
}
if (const auto& val = last_dispatched_to(); val.has_value()) {
json["last_dispatched_to"] = val.value();
}
if (const auto& val = status_code_; val.has_value()) {
json["status_code"] = fmt::format("{}", val.value());
}
if (const auto& val = extended_error_info_; val.has_value()) {
json["extended_error_info"] = tao::json::value{
{ "context", val->context() },
{ "reference", val->reference() },
};
}
if (const auto& val = error_map_info_; val.has_value()) {
tao::json::value info{
{ "code", val->code() },
{ "name", val->name() },
{ "description", val->description() },
};
if (const auto& attributes = val->attributes(); !attributes.empty()) {
tao::json::value attrs_json = tao::json::empty_array;
for (const auto& attr : attributes) {
attrs_json.emplace_back(fmt::format("{}", attr));
}
info["attributes"] = attrs_json;
}
json["error_map_info"] = info;
}

return tao::json::to_string(json, 2);
}
} // namespace couchbase
75 changes: 75 additions & 0 deletions core/impl/query_error_context.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2020-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <couchbase/query_error_context.hxx>

#include <couchbase/fmt/retry_reason.hxx>

#include <tao/json/to_string.hpp>

namespace couchbase
{
auto
query_error_context::to_json() const -> std::string
{
tao::json::value json = {
{
"ec",
tao::json::value{
{ "value", ec().value() },
{ "message", ec().message() },
},
},
{ "operation_id", operation_id() },
{ "retry_attempts", retry_attempts() },
{ "client_context_id", client_context_id_ },
{ "statement", statement_ },
{ "method", statement_ },
{ "path", statement_ },
{ "http_status", http_status_ },
{ "http_body", http_body_ },
{ "hostname", hostname_ },
{ "port", port_ },
};

if (const auto& val = parameters_; val.has_value()) {
json["parameters"] = val.value();
}
if (first_error_code_ > 0) {
json["first_error_code"] = first_error_code_;
}
if (!first_error_message_.empty()) {
json["first_error_message"] = first_error_message_;
}

if (const auto& reasons = retry_reasons(); !reasons.empty()) {
tao::json::value reasons_json = tao::json::empty_array;
for (const auto& reason : reasons) {
reasons_json.emplace_back(fmt::format("{}", reason));
}
json["retry_reasons"] = reasons_json;
}
if (const auto& val = last_dispatched_from(); val.has_value()) {
json["last_dispatched_from"] = val.value();
}
if (const auto& val = last_dispatched_to(); val.has_value()) {
json["last_dispatched_to"] = val.value();
}

return tao::json::to_string(json, 2);
}
} // namespace couchbase
11 changes: 9 additions & 2 deletions core/io/mcbp_command.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
mcbp_command_handler handler_{};
std::shared_ptr<Manager> manager_{};
std::chrono::milliseconds timeout_{};
std::string id_{ uuid::to_string(uuid::random()) };
std::string id_{
fmt::format("{:02x}/{}", static_cast<std::uint8_t>(encoded_request_type::body_type::opcode), uuid::to_string(uuid::random()))
};
std::shared_ptr<couchbase::tracing::request_span> span_{ nullptr };
std::shared_ptr<couchbase::tracing::request_span> parent_span{ nullptr };
std::optional<std::string> last_dispatched_from_{};
std::optional<std::string> last_dispatched_to_{};

mcbp_command(asio::io_context& ctx, std::shared_ptr<Manager> manager, Request req, std::chrono::milliseconds default_timeout)
: deadline(ctx)
Expand Down Expand Up @@ -109,7 +113,10 @@ struct mcbp_command : public std::enable_shared_from_this<mcbp_command<Manager,
handler_ = nullptr;
}
}
invoke_handler(request.retries.idempotent() ? errc::common::unambiguous_timeout : errc::common::ambiguous_timeout);
invoke_handler(request.retries.idempotent() || !opaque_.has_value()
? errc::common::unambiguous_timeout // safe to retry or has not been sent to the server
: errc::common::ambiguous_timeout // non-idempotent and has been sent to the server
);
}

void invoke_handler(std::error_code ec, std::optional<io::mcbp_message>&& msg = {})
Expand Down