Skip to content

Commit

Permalink
CXXCBC-525: Columnar - open cluster connection in background (#621)
Browse files Browse the repository at this point in the history
Motivation
==========
Columnar SDKs takes a different approach to connect.
So that wrapper SDKs can still utilize the majority of the I/O
functionality in the C++ client, the Columnar connect changes should be
added.

Changes
=======
* Add path to allow SDKs to open and bootstrap a cluster in the
  background
* Add "infinite" retries when bootstrapping
* Add config_tracker
* Add functionality to track bootstrap failure/success
* Add deferred queue for HTTP requests
  • Loading branch information
thejcfactor committed Jul 10, 2024
1 parent f0c530d commit befbced
Show file tree
Hide file tree
Showing 21 changed files with 2,040 additions and 96 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/columnar.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: columnar

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
build:
runs-on: ubuntu-22.04

steps:
- name: Install build environment
run: |
sudo apt-get update -y
sudo apt-get install -y libssl-dev cmake gcc g++ curl gdb
- uses: actions/checkout@v2
with:
submodules: recursive
- name: ccache
uses: hendrikmuhs/ccache-action@v1.2
with:
key: ${{ github.job }}
- name: Build
timeout-minutes: 40
env:
CB_COLUMNAR: ON
run: ./bin/build-tests
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ if(NOT DEFINED COUCHBASE_CXX_CLIENT_MASTER_PROJECT)
endif()
endif()

option(COUCHBASE_CXX_CLIENT_COLUMNAR "Build with Columnar additions" FALSE)
if(COUCHBASE_CXX_CLIENT_COLUMNAR)
message(STATUS "COUCHBASE_CXX_CLIENT_COLUMNAR=${COUCHBASE_CXX_CLIENT_COLUMNAR} building with Columnar additions.")
endif()

project(
couchbase_cxx_client
VERSION "1.0.0"
Expand Down Expand Up @@ -226,6 +231,7 @@ set(couchbase_cxx_client_FILES
core/io/mcbp_message.cxx
core/io/mcbp_parser.cxx
core/io/mcbp_session.cxx
core/io/config_tracker.cxx
core/key_value_config.cxx
core/management/analytics_link_azure_blob_external.cxx
core/management/analytics_link_couchbase_remote.cxx
Expand Down
6 changes: 6 additions & 0 deletions bin/build-tests
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ case "${CB_SANITIZER}" in
;;
esac

CB_COLUMNAR=${CB_COLUMNAR:-""}
if [ ! -z "$CB_COLUMNAR" ] ; then
CB_CMAKE_EXTRAS="${CB_CMAKE_EXTRAS} -DCOUCHBASE_CXX_CLIENT_COLUMNAR=ON"
fi
echo "CB_COLUMNAR=${CB_COLUMNAR}"

set -exuo pipefail

BUILD_DIR="${PROJECT_ROOT}/cmake-build-tests"
Expand Down
1 change: 1 addition & 0 deletions cmake/build_config.hxx.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
#cmakedefine COUCHBASE_CXX_CLIENT_MOZILLA_CA_BUNDLE_SHA256 "@COUCHBASE_CXX_CLIENT_MOZILLA_CA_BUNDLE_SHA256@"
#cmakedefine COUCHBASE_CXX_CLIENT_BORINGSSL_SHA "@COUCHBASE_CXX_CLIENT_BORINGSSL_SHA@"
#cmakedefine COUCHBASE_CXX_CLIENT_STATIC_BORINGSSL
#cmakedefine COUCHBASE_CXX_CLIENT_COLUMNAR
265 changes: 264 additions & 1 deletion core/cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

#include "couchbase/build_config.hxx"
#include <couchbase/build_config.hxx>

#include "cluster.hxx"

Expand All @@ -24,6 +24,9 @@
#include "core/impl/get_replica.hxx"
#include "core/impl/lookup_in_replica.hxx"
#include "core/impl/observe_seqno.hxx"
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
#include "core/io/config_tracker.hxx"
#endif
#include "core/io/http_command.hxx"
#include "core/io/http_session_manager.hxx"
#include "core/io/mcbp_command.hxx"
Expand Down Expand Up @@ -151,12 +154,22 @@ is_feature_supported(const operations::management::search_index_upsert_request&
class cluster_impl : public std::enable_shared_from_this<cluster_impl>
{
public:
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
explicit cluster_impl(asio::io_context& ctx)
: ctx_(ctx)
, work_(asio::make_work_guard(ctx_))
, session_manager_(std::make_shared<io::http_session_manager>(id_, ctx_, tls_))
, retry_backoff_(ctx_)
{
}
#else
explicit cluster_impl(asio::io_context& ctx)
: ctx_(ctx)
, work_(asio::make_work_guard(ctx_))
, session_manager_(std::make_shared<io::http_session_manager>(id_, ctx_, tls_))
{
}
#endif

auto io_context() -> asio::io_context&
{
Expand Down Expand Up @@ -276,6 +289,64 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
do_open(std::move(handler));
}

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
void open_in_background(couchbase::core::origin origin,
utils::movable_function<void(std::error_code)>&& handler)
{
if (stopped_) {
return handler(errc::network::cluster_closed);
}
if (background_open_started_) {
CB_LOG_DEBUG("Background open already started for cluster, id: \"{}\"", id_);
return handler({});
}
if (origin.get_nodes().empty()) {
stopped_ = true;
work_.reset();
return handler(errc::common::invalid_argument);
}

origin_ = std::move(origin);
CB_LOG_DEBUG(R"(open cluster, id: "{}", core version: "{}", {})",
id_,
couchbase::core::meta::sdk_semver(),
origin_.to_json());
// ignore the enable_tracing flag if a tracer was passed in
if (nullptr != origin_.options().tracer) {
tracer_ = origin_.options().tracer;
} else {
if (origin_.options().enable_tracing) {
tracer_ = std::make_shared<tracing::threshold_logging_tracer>(
ctx_, origin_.options().tracing_options);
} else {
tracer_ = std::make_shared<tracing::noop_tracer>();
}
}
tracer_->start();
// ignore the metrics options if a meter was passed in.
if (nullptr != origin_.options().meter) {
meter_ = origin_.options().meter;
} else {
if (origin_.options().enable_metrics) {
meter_ = std::make_shared<metrics::logging_meter>(ctx_, origin_.options().metrics_options);
} else {
meter_ = std::make_shared<metrics::noop_meter>();
}
}
meter_->start();
session_manager_->set_tracer(tracer_);
session_manager_->set_dispatch_timeout(origin_.options().dispatch_timeout);
// at this point we will infinitely try to connect
if (origin_.options().enable_dns_srv) {
do_background_dns_srv_open();
} else {
do_background_open();
}
background_open_started_ = true;
return handler({});
}
#endif

void open_bucket(const std::string& bucket_name,
utils::movable_function<void(std::error_code)>&& handler)
{
Expand Down Expand Up @@ -654,6 +725,171 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
});
}

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
void do_background_open()
{
// TODO: retries on more failures? Right now only retry if load_verify_file() fails...

// Disables TLS v1.2 which should be okay cloud/columnar default.
configure_tls_options(true);
if (origin_.options().security_options.trust_only_capella) {
std::error_code ec{};
CB_LOG_DEBUG(R"([{}]: use Capella CA for TLS verify)", id_);
tls_.add_certificate_authority(
asio::const_buffer(couchbase::core::default_ca::capellaCaCert,
strlen(couchbase::core::default_ca::capellaCaCert)),
ec);
if (ec) {
CB_LOG_WARNING("[{}]: unable to load Capella CAs: {}", id_, ec.message());
// we don't consider this fatal and try to continue without it
}
} else if (origin_.options().security_options.trust_only_pem_file ||
origin_.options().security_options.trust_only_pem_string) {
if (origin_.options().trust_certificate.empty() /* No CA certificate (or other SDK-specific
trust source) is specified */
&& origin_.options()
.trust_certificate_value.empty() /* and certificate value has not been specified */
&& origin_.options().tls_verify !=
tls_verify_mode::none /* The user did not disable all TLS verification */) {
CB_LOG_WARNING("[{}] When TLS is enabled, the cluster options must specify certificate(s) "
"to trust or ensure that they are "
"available in system CA store.",
id_);
}
std::error_code ec{};
// load only the explicit certificate
// system and default capella certificates are not loaded
if (!origin_.options().trust_certificate_value.empty()) {
CB_LOG_DEBUG(R"([{}]: use TLS certificate passed through via options object)", id_);
tls_.add_certificate_authority(
asio::const_buffer(origin_.options().trust_certificate_value.data(),
origin_.options().trust_certificate_value.size()),
ec);
if (ec) {
CB_LOG_WARNING(
"[{}]: unable to load CA passed via options object: {}", id_, ec.message());
}
}
if (!origin_.options().trust_certificate.empty()) {
CB_LOG_DEBUG(
R"([{}]: use TLS verify file: "{}")", id_, origin_.options().trust_certificate);
tls_.load_verify_file(origin_.options().trust_certificate, ec);
if (ec) {
CB_LOG_ERROR("[{}]: unable to load verify file \"{}\": {}",
id_,
origin_.options().trust_certificate,
ec.message());
auto backoff = std::chrono::milliseconds(500);
CB_LOG_DEBUG(
"[{}] waiting for {}ms before retrying TLS verify file.", id_, backoff.count());
backoff_then_retry(backoff, [self = shared_from_this()]() {
self->do_background_open();
});
}
}
} else if (origin_.options().security_options.trust_only_platform) {
// TODO: CXXCBC-548: security_options updates (use Mozilla certs?)
CB_LOG_DEBUG(R"([{}]: use default CA for TLS verify)", id_);
std::error_code ec{};
// load system certificates
tls_.set_default_verify_paths(ec);
if (ec) {
CB_LOG_WARNING(R"([{}]: failed to load system CAs: {})", id_, ec.message());
}
} else if (!origin_.options().security_options.trust_only_certificates.empty()) {
std::error_code ec{};
CB_LOG_DEBUG("[{}]: loading {} user provided CA certificates.",
id_,
origin_.options().security_options.trust_only_certificates.size());
for (const auto& cert : origin_.options().security_options.trust_only_certificates) {
tls_.add_certificate_authority(asio::const_buffer(cert.data(), cert.size()), ec);
if (ec) {
CB_LOG_WARNING("[{}]: unable to load CA: {}", id_, ec.message());
}
}
}
// TODO: CXXCBC-548: security_options updates (support cipher suites)
// if (!origin_.options().security_options.cipher_suites.empty()) {
// }
config_tracker_ = std::make_shared<couchbase::core::io::cluster_config_tracker>(
id_, origin_, ctx_, tls_, dns_srv_tracker_);
config_tracker_->register_bootstrap_notification_subscriber(session_manager_);
create_cluster_sessions();
}

void backoff_then_retry(std::chrono::milliseconds backoff,
utils::movable_function<void()> callback)
{
retry_backoff_.expires_after(backoff);
retry_backoff_.async_wait(
[self = shared_from_this(), cb = std::move(callback)](std::error_code ec) {
if (ec == asio::error::operation_aborted || self->stopped_) {
return;
}
if (ec) {
CB_LOG_WARNING("[{}] Retry callback received error ec={}.", self->id_, ec.message());
}
cb();
});
return;
}

void do_background_dns_srv_open()
{
std::string hostname;
std::string port;
std::tie(hostname, port) = origin_.next_address();
dns_srv_tracker_ = std::make_shared<impl::dns_srv_tracker>(
ctx_, hostname, origin_.options().dns_config, origin_.options().enable_tls);
return asio::post(asio::bind_executor(
ctx_, [self = shared_from_this(), hostname = std::move(hostname)]() mutable {
return self->dns_srv_tracker_->get_srv_nodes(
[self, hostname = std::move(hostname)](origin::node_list nodes,
std::error_code ec) mutable {
if (ec) {
auto backoff = std::chrono::milliseconds(500);
self->session_manager_->notify_bootstrap_error({ ec, ec.message(), hostname, {} });
CB_LOG_DEBUG(
"[{}] waiting for {}ms before retrying DNS query.", self->id_, backoff.count());
self->backoff_then_retry(backoff, [self]() {
self->do_background_dns_srv_open();
});
return;
}
if (!nodes.empty()) {
self->origin_.set_nodes(std::move(nodes));
CB_LOG_INFO(
"[{}] Replace list of bootstrap nodes with addresses from DNS SRV of \"{}\": [{}]",
self->id_,
hostname,
utils::join_strings(self->origin_.get_nodes(), ", "));
}
return self->do_background_open();
});
}));
}

void create_cluster_sessions()
{
config_tracker_->create_sessions(
[self = shared_from_this()](std::error_code ec, topology::configuration cfg) mutable {
if (ec) {
auto backoff = std::chrono::milliseconds(500);
CB_LOG_DEBUG("[{}] Waiting for {}ms before retrying to create cluster sessions.",
self->id_,
backoff.count());
self->backoff_then_retry(backoff, [self]() {
self->create_cluster_sessions();
});
} else {
self->session_manager_->set_configuration(cfg, self->origin_.options());
self->config_tracker_->on_configuration_update(self->session_manager_);
self->config_tracker_->register_state_listener();
}
});
}
#endif

void with_bucket_configuration(
const std::string& bucket_name,
utils::movable_function<void(std::error_code, topology::configuration)>&& handler)
Expand Down Expand Up @@ -772,6 +1008,14 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
self->session_->stop(retry_reason::do_not_retry);
self->session_.reset();
}
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
if (self->config_tracker_) {
self->config_tracker_->close();
self->config_tracker_->unregister_bootstrap_notification_subscriber(
self->session_manager_);
}
self->retry_backoff_.cancel();
#endif
self->for_each_bucket([](auto bucket) {
bucket->close();
});
Expand Down Expand Up @@ -861,6 +1105,11 @@ class cluster_impl : public std::enable_shared_from_this<cluster_impl>
std::shared_ptr<couchbase::tracing::request_tracer> tracer_{ nullptr };
std::shared_ptr<couchbase::metrics::meter> meter_{ nullptr };
std::atomic_bool stopped_{ false };
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
std::shared_ptr<couchbase::core::io::cluster_config_tracker> config_tracker_{};
asio::steady_timer retry_backoff_;
std::atomic_bool background_open_started_{ false };
#endif
};

cluster::cluster(asio::io_context& ctx)
Expand Down Expand Up @@ -916,6 +1165,20 @@ cluster::open(couchbase::core::origin origin,
}
}

void
cluster::open_in_background(
[[maybe_unused]] couchbase::core::origin origin,
[[maybe_unused]] utils::movable_function<void(std::error_code)>&& handler) const
{
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
if (impl_) {
impl_->open_in_background(std::move(origin), std::move(handler));
}
#else
CB_LOG_ERROR("Background open only available for Columnar builds.");
#endif
}

void
cluster::diagnostics(std::optional<std::string> report_id,
utils::movable_function<void(diag::diagnostics_result)>&& handler) const
Expand Down
Loading

0 comments on commit befbced

Please sign in to comment.