Skip to content

Commit

Permalink
CXXCBC-368: Subscribe to clustermap notifications to speedup failover
Browse files Browse the repository at this point in the history
Implement only deduplication flag from the RFC, and enable clustermap
change notifications. I this patch, we only want to use full
notifications, and do not turn off polling. So the SDK will be able to
receive new configuration sooner, but still be chatty as older versions.
  • Loading branch information
avsej committed Dec 11, 2023
1 parent c3c8001 commit b285e11
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 5 deletions.
2 changes: 1 addition & 1 deletion core/cluster_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct cluster_options {
io::dns::dns_config dns_config{ io::dns::dns_config::system_config() };
bool show_queries{ false };
bool enable_unordered_execution{ true };
bool enable_clustermap_notification{ false };
bool enable_clustermap_notification{ true };
bool enable_compression{ true };
bool enable_tracing{ true };
bool enable_metrics{ true };
Expand Down
18 changes: 15 additions & 3 deletions core/io/mcbp_session.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class mcbp_session_impl
}
if (session_->origin_.options().enable_clustermap_notification) {
hello_req.body().enable_clustermap_change_notification();
hello_req.body().enable_deduplicate_not_my_vbucket_clustermap();
}
if (session_->origin_.options().enable_compression) {
hello_req.body().enable_compression();
Expand Down Expand Up @@ -665,7 +666,18 @@ class mcbp_session_impl
Expects(protocol::is_valid_server_request_opcode(msg.header.opcode));
switch (static_cast<protocol::server_opcode>(msg.header.opcode)) {
case protocol::server_opcode::cluster_map_change_notification: {
protocol::server_request<protocol::cluster_map_change_notification_request_body> req(std::move(msg));
protocol::cmd_info info{ session_->bootstrap_hostname_, session_->bootstrap_port_number_ };
if (session_->origin_.options().dump_configuration) {
std::string_view config_text{ reinterpret_cast<const char*>(msg.body.data()), msg.body.size() };
CB_LOG_TRACE(
"{} configuration from cluster_map_change_notification request (size={}, endpoint=\"{}:{}\"), {}",
session_->log_prefix_,
config_text.size(),
info.endpoint_address,
info.endpoint_port,
config_text);
}
protocol::server_request<protocol::cluster_map_change_notification_request_body> req(std::move(msg), info);
std::optional<topology::configuration> config = req.body().config();
if (session_ && config.has_value()) {
if ((!config->bucket.has_value() && req.body().bucket().empty()) ||
Expand Down Expand Up @@ -1263,8 +1275,8 @@ class mcbp_session_impl
config_.emplace(std::move(config));
configured_ = true;
for (const auto& listener : config_listeners_) {
asio::post(asio::bind_executor(
ctx_, [listener, config = config_.value()]() mutable { return listener->update_config(std::move(config)); }));
asio::post(
asio::bind_executor(ctx_, [listener, c = config_.value()]() mutable { return listener->update_config(std::move(c)); }));
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/protocol/cmd_hello.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ class hello_request_body
features_.emplace_back(hello_feature::clustermap_change_notification);
}

void enable_deduplicate_not_my_vbucket_clustermap()
{
features_.emplace_back(hello_feature::deduplicate_not_my_vbucket_clustermap);
}

void enable_compression()
{
features_.emplace_back(hello_feature::snappy);
Expand Down
6 changes: 6 additions & 0 deletions core/protocol/hello_feature.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ enum class hello_feature : std::uint16_t {
* Indicates support for subdoc lookup operations on replicas
*/
subdoc_replica_read = 0x1c,

/**
* The server will not send configuration body to the connections, that already has seen it.
*/
deduplicate_not_my_vbucket_clustermap = 0x1e,
};

constexpr bool
Expand Down Expand Up @@ -191,6 +196,7 @@ is_valid_hello_feature(std::uint16_t code)
case hello_feature::replace_body_with_xattr:
case hello_feature::resource_units:
case hello_feature::subdoc_replica_read:
case hello_feature::deduplicate_not_my_vbucket_clustermap:
return true;
}
return false;
Expand Down
3 changes: 3 additions & 0 deletions core/protocol/hello_feature_fmt.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ struct fmt::formatter<couchbase::core::protocol::hello_feature> {
case couchbase::core::protocol::hello_feature::subdoc_replica_read:
name = "subdoc_replica_read";
break;
case couchbase::core::protocol::hello_feature::deduplicate_not_my_vbucket_clustermap:
name = "deduplicate_not_my_vbucket_clustermap";
break;
}
return format_to(ctx.out(), "{}", name);
}
Expand Down
2 changes: 1 addition & 1 deletion couchbase/behavior_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class behavior_options
private:
std::string user_agent_extra_{};
bool show_queries_{ false };
bool enable_clustermap_notification_{ false };
bool enable_clustermap_notification_{ true };
bool enable_mutation_tokens_{ true };
bool enable_unordered_execution_{ true };
bool dump_configuration_{ false };
Expand Down

0 comments on commit b285e11

Please sign in to comment.