Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXXCBC-368: Subscribe to clustermap notifications to speedup failover #490

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 14 additions & 1 deletion core/bucket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ class bucket_impl
{
std::vector<topology::configuration::node> added{};
std::vector<topology::configuration::node> removed{};
bool sequence_changed = false;
{
std::scoped_lock lock(config_mutex_);
if (!config_) {
Expand All @@ -554,9 +555,21 @@ class bucket_impl
if (config_) {
diff_nodes(config_->nodes, config.nodes, added);
diff_nodes(config.nodes, config_->nodes, removed);
if (added.empty() && removed.empty() && config.nodes.size() == config_->nodes.size()) {
for (std::size_t i = 0; i < config.nodes.size(); ++i) {
if (config.nodes[i] != config_->nodes[i]) {
sequence_changed = true;
break;
}
}
} else {
sequence_changed = true;
}
} else {
sequence_changed = true;
added = config.nodes;
}
config_.reset();
config_ = config;
configured_ = true;

Expand All @@ -567,7 +580,7 @@ class bucket_impl
}
}
}
if (!added.empty() || !removed.empty()) {
if (!added.empty() || !removed.empty() || sequence_changed) {
std::scoped_lock lock(sessions_mutex_);
std::map<size_t, io::mcbp_session> new_sessions{};

Expand Down
2 changes: 1 addition & 1 deletion core/cluster_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct cluster_options {
io::dns::dns_config dns_config{ io::dns::dns_config::system_config() };
bool show_queries{ false };
bool enable_unordered_execution{ true };
bool enable_clustermap_notification{ false };
bool enable_clustermap_notification{ true };
bool enable_compression{ true };
bool enable_tracing{ true };
bool enable_metrics{ true };
Expand Down
19 changes: 16 additions & 3 deletions core/io/mcbp_session.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class mcbp_session_impl
}
if (session_->origin_.options().enable_clustermap_notification) {
hello_req.body().enable_clustermap_change_notification();
hello_req.body().enable_deduplicate_not_my_vbucket_clustermap();
}
if (session_->origin_.options().enable_compression) {
hello_req.body().enable_compression();
Expand Down Expand Up @@ -665,7 +666,18 @@ class mcbp_session_impl
Expects(protocol::is_valid_server_request_opcode(msg.header.opcode));
switch (static_cast<protocol::server_opcode>(msg.header.opcode)) {
case protocol::server_opcode::cluster_map_change_notification: {
protocol::server_request<protocol::cluster_map_change_notification_request_body> req(std::move(msg));
protocol::cmd_info info{ session_->bootstrap_hostname_, session_->bootstrap_port_number_ };
if (session_->origin_.options().dump_configuration) {
std::string_view config_text{ reinterpret_cast<const char*>(msg.body.data()), msg.body.size() };
CB_LOG_TRACE(
"{} configuration from cluster_map_change_notification request (size={}, endpoint=\"{}:{}\"), {}",
session_->log_prefix_,
config_text.size(),
info.endpoint_address,
info.endpoint_port,
config_text);
}
protocol::server_request<protocol::cluster_map_change_notification_request_body> req(std::move(msg), info);
std::optional<topology::configuration> config = req.body().config();
if (session_ && config.has_value()) {
if ((!config->bucket.has_value() && req.body().bucket().empty()) ||
Expand Down Expand Up @@ -1260,11 +1272,12 @@ class mcbp_session_impl
}
}
}
config_.reset();
config_.emplace(std::move(config));
configured_ = true;
for (const auto& listener : config_listeners_) {
asio::post(asio::bind_executor(
ctx_, [listener, config = config_.value()]() mutable { return listener->update_config(std::move(config)); }));
asio::post(
asio::bind_executor(ctx_, [listener, c = config_.value()]() mutable { return listener->update_config(std::move(c)); }));
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/protocol/cmd_hello.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ class hello_request_body
features_.emplace_back(hello_feature::clustermap_change_notification);
}

void enable_deduplicate_not_my_vbucket_clustermap()
{
features_.emplace_back(hello_feature::deduplicate_not_my_vbucket_clustermap);
}

void enable_compression()
{
features_.emplace_back(hello_feature::snappy);
Expand Down
6 changes: 6 additions & 0 deletions core/protocol/hello_feature.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ enum class hello_feature : std::uint16_t {
* Indicates support for subdoc lookup operations on replicas
*/
subdoc_replica_read = 0x1c,

/**
* The server will not send configuration body to the connections, that already has seen it.
*/
deduplicate_not_my_vbucket_clustermap = 0x1e,
};

constexpr bool
Expand Down Expand Up @@ -191,6 +196,7 @@ is_valid_hello_feature(std::uint16_t code)
case hello_feature::replace_body_with_xattr:
case hello_feature::resource_units:
case hello_feature::subdoc_replica_read:
case hello_feature::deduplicate_not_my_vbucket_clustermap:
return true;
}
return false;
Expand Down
3 changes: 3 additions & 0 deletions core/protocol/hello_feature_fmt.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ struct fmt::formatter<couchbase::core::protocol::hello_feature> {
case couchbase::core::protocol::hello_feature::subdoc_replica_read:
name = "subdoc_replica_read";
break;
case couchbase::core::protocol::hello_feature::deduplicate_not_my_vbucket_clustermap:
name = "deduplicate_not_my_vbucket_clustermap";
break;
}
return format_to(ctx.out(), "{}", name);
}
Expand Down
6 changes: 6 additions & 0 deletions core/topology/configuration.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ struct configuration {
port_map services_tls{};
std::map<std::string, alternate_address> alt{};

bool operator!=(const node& other) const
{
return hostname != other.hostname || services_plain.key_value != other.services_plain.key_value ||
services_tls.key_value != other.services_tls.key_value;
}

[[nodiscard]] std::uint16_t port_or(service_type type, bool is_tls, std::uint16_t default_value) const;

[[nodiscard]] std::uint16_t port_or(const std::string& network, service_type type, bool is_tls, std::uint16_t default_value) const;
Expand Down
2 changes: 1 addition & 1 deletion couchbase/behavior_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class behavior_options
private:
std::string user_agent_extra_{};
bool show_queries_{ false };
bool enable_clustermap_notification_{ false };
bool enable_clustermap_notification_{ true };
bool enable_mutation_tokens_{ true };
bool enable_unordered_execution_{ true };
bool dump_configuration_{ false };
Expand Down
1 change: 1 addition & 0 deletions tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ add_executable(
cbc.cxx
utils.cxx
analytics.cxx
beam.cxx
get.cxx
pillowfight.cxx
query.cxx
Expand Down
234 changes: 234 additions & 0 deletions tools/beam.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2023-Present Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "beam.hxx"
#include "core/cluster.hxx"
#include "core/operations/management/bucket_describe.hxx"
#include "core/topology/configuration_json.hxx"
#include "core/utils/json.hxx"
#include "utils.hxx"

#include <couchbase/cluster.hxx>
#include <couchbase/fmt/cas.hxx>
#include <couchbase/fmt/retry_reason.hxx>

#include <asio/io_context.hpp>
#include <fmt/chrono.h>

#include <csignal>

namespace cbc
{
namespace
{
std::atomic_flag running{ true };

void
sigint_handler(int signal)
{
fmt::print(stderr, "\nrequested stop, signal={}\n", signal);
running.clear();
}

auto
timestamp()
{
auto currentTime = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000;
return fmt::format("[{:%T}.{:03}] ", fmt::localtime(std::chrono::system_clock::to_time_t(currentTime)), ms.count());
}

class beam_app : public CLI::App
{
public:
beam_app()
: CLI::App{ "Send series of get operations focused on vBucketID or node index.", "beam" }
{
auto* focus = add_option_group("focus", "Selector for the target");
focus->add_option("--vbucket-id", vbucket_ids_, "vBucketIDs to send the operations.");
focus->add_option("--node-index", node_indexes_, "vBucketIDs to send the operations.");
focus->require_option(1, 0);

add_flag("--verbose", verbose_, "Include more context and information where it is applicable.");
add_option("--bucket-name", bucket_name_, "Name of the bucket.")->default_val(default_bucket_name);
add_option("--scope-name", scope_name_, "Name of the scope.")->default_val(couchbase::scope::default_name);
add_option("--collection-name", collection_name_, "Name of the collection.")->default_val(couchbase::collection::default_name);

add_common_options(this, common_options_);
allow_extras(true);
}

[[nodiscard]] int execute()
{
apply_logger_options(common_options_.logger);

auto cluster_options = build_cluster_options(common_options_);

asio::io_context io;
auto guard = asio::make_work_guard(io);
std::thread io_thread([&io]() { io.run(); });
const auto connection_string = common_options_.connection.connection_string;

auto [cluster, ec] = couchbase::cluster::connect(io, connection_string, cluster_options).get();
if (ec) {
guard.reset();
io_thread.join();

fail(fmt::format("Failed to connect to the cluster at {:?}: {}", connection_string, ec.message()));
}

auto core = couchbase::core::get_core_cluster(cluster);

auto config = [core, bucket = bucket_name_]() {
couchbase::core::operations::management::bucket_describe_request req{ bucket };
auto barrier = std::make_shared<std::promise<couchbase::core::operations::management::bucket_describe_response>>();
auto f = barrier->get_future();
core.execute(req, [barrier](auto resp) { barrier->set_value(std::move(resp)); });
auto resp = f.get();

if (resp.ctx.ec) {
fail(fmt::format("Failed to get bucket config for {:?}: {}", bucket, resp.ctx.ec.message()));
}
return couchbase::core::utils::json::parse(resp.ctx.http_body).as<couchbase::core::topology::configuration>();
}();

if (!config.vbmap) {
fail(fmt::format("vBucketMap for bucket {:?} is empty", bucket_name_));
}
const auto& vbmap = config.vbmap.value();
// get all vbuckets for the nodes and add them to the list
for (std::uint16_t vbucket_id = 0; static_cast<std::size_t>(vbucket_id) < vbmap.size(); ++vbucket_id) {
if (std::find(node_indexes_.begin(), node_indexes_.end(), vbmap[vbucket_id][0]) != node_indexes_.end()) {
vbucket_ids_.insert(vbucket_id);
}
}

std::map<std::size_t, std::vector<std::uint16_t>> vbuckets_by_master_index;
for (std::uint16_t vbucket_id = 0; static_cast<std::size_t>(vbucket_id) < vbmap.size(); ++vbucket_id) {
auto master_index = vbmap[vbucket_id][0];
if (master_index < 0) {
fail(fmt::format("negative value for master node of vBucketID {}", vbucket_id));
}
vbuckets_by_master_index[static_cast<std::size_t>(master_index)].push_back(vbucket_id);
}

if (verbose_) {
for (const auto& [master_index, vbuckets] : vbuckets_by_master_index) {
fmt::print("{}. {:?}: {}\n", master_index, config.nodes[master_index].hostname, fmt::join(vbuckets, ", "));
}
}

std::vector<std::string> ids;
ids.reserve(vbucket_ids_.size());
for (const auto& vbucket_id : vbucket_ids_) {
for (std::size_t index = 0;; ++index) {
std::string key = fmt::format("vb-{:03}_{:05}", vbucket_id, index);
auto [vbid, _] = config.map_key(key, 0);
if (vbid == vbucket_id) {
ids.push_back(key);
break;
}
}
}

if (verbose_) {
fmt::print("{} IDs will be used for the workload:", ids.size());
for (size_t i = 0; i < ids.size(); ++i) {
fmt::print("{}{}", i % 16 == 0 ? "\n" : " ", ids[i]);
}
fmt::print("\n");
}
(void)fflush(stdout);

auto collection = cluster.bucket(bucket_name_).scope(scope_name_).collection(collection_name_);

// Populate the keys first
for (const auto& id : ids) {
auto [ctx, resp] = collection.upsert(id, "value").get();
if (ctx.ec()) {
fail(fmt::format("Failed to store value for key {:?}: {}", id, ctx.ec().message()));
}
}

(void)std::signal(SIGINT, sigint_handler);
(void)std::signal(SIGTERM, sigint_handler);

bool has_error{ false };

while (running.test_and_set()) {
for (const auto& id : ids) {
auto [ctx, resp] = collection.get(id, {}).get();

if (ctx.ec()) {
fmt::print(stderr,
"{} failed to get value for key {:?}: {}, last_dispatched_to={:?}, retries={} ({})\n",
timestamp(),
id,
ctx.ec().message(),
ctx.last_dispatched_to().value_or("-"),
ctx.retry_attempts(),
fmt::join(ctx.retry_reasons(), ", "));
has_error = true;
} else if (has_error) {
fmt::print(stderr,
"{} success for key {:?}, last_dispatched_to={:?}, retries={} ({})\n",
timestamp(),
id,
ctx.last_dispatched_to().value_or("-"),
ctx.retry_attempts(),
fmt::join(ctx.retry_reasons(), ", "));
has_error = false;
}
}
}

cluster.close();
guard.reset();

io_thread.join();

return 0;
}

private:
common_options common_options_{};

std::string bucket_name_{ default_bucket_name };
std::string scope_name_{ couchbase::scope::default_name };
std::string collection_name_{ couchbase::collection::default_name };
bool verbose_{ false };

std::vector<std::size_t> node_indexes_{};
std::set<std::uint16_t> vbucket_ids_{};
};
} // namespace

auto
make_beam_command() -> std::shared_ptr<CLI::App>
{
return std::make_shared<beam_app>();
}

auto
execute_beam_command(CLI::App* app) -> int
{
if (auto* beam = dynamic_cast<beam_app*>(app); beam != nullptr) {
return beam->execute();
}
return 1;
}
} // namespace cbc