Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tracer and meter ref-counting #370

Merged
merged 2 commits into from Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 15 additions & 5 deletions core/cluster.hxx
Expand Up @@ -88,17 +88,18 @@ class cluster : public std::enable_shared_from_this<cluster>
tracer_ = std::make_shared<tracing::noop_tracer>();
}
}
tracer_->start();
// ignore the metrics options if a meter was passed in.
if (nullptr != origin_.options().meter) {
meter_ = origin_.options().meter;
} else {

if (origin_.options().enable_metrics) {
meter_ = std::make_shared<metrics::logging_meter>(ctx_, origin_.options().metrics_options);
} else {
meter_ = std::make_shared<metrics::noop_meter>();
}
}
meter_->start();
session_manager_->set_tracer(tracer_);
if (origin_.options().enable_dns_srv) {
auto [hostname, _] = origin_.next_address();
Expand All @@ -110,7 +111,7 @@ class cluster : public std::enable_shared_from_this<cluster>
[self, hostname = std::move(hostname), handler = std::forward<Handler>(handler)](origin::node_list nodes,
std::error_code ec) mutable {
if (ec) {
return handler(ec);
return self->close([ec, handler = std::forward<Handler>(handler)]() mutable { handler(ec); });
}
if (!nodes.empty()) {
self->origin_.set_nodes(std::move(nodes));
Expand Down Expand Up @@ -141,7 +142,13 @@ class cluster : public std::enable_shared_from_this<cluster>
self->session_manager_->close();
handler();
self->work_.reset();
if (self->tracer_) {
self->tracer_->stop();
}
self->tracer_.reset();
if (self->meter_) {
self->meter_->stop();
}
self->meter_.reset();
}));
}
Expand Down Expand Up @@ -395,7 +402,7 @@ class cluster : public std::enable_shared_from_this<cluster>
tls_.load_verify_file(origin_.options().trust_certificate, ec);
if (ec) {
CB_LOG_ERROR("[{}]: unable to load verify file \"{}\": {}", id_, origin_.options().trust_certificate, ec.message());
return handler(ec);
return close([ec, handler = std::forward<Handler>(handler)]() mutable { return handler(ec); });
}
}
#ifdef COUCHBASE_CXX_CLIENT_TLS_KEY_LOG_FILE
Expand All @@ -414,13 +421,13 @@ class cluster : public std::enable_shared_from_this<cluster>
tls_.use_certificate_chain_file(origin_.certificate_path(), ec);
if (ec) {
CB_LOG_ERROR("[{}]: unable to load certificate chain \"{}\": {}", id_, origin_.certificate_path(), ec.message());
return handler(ec);
return close([ec, handler = std::forward<Handler>(handler)]() mutable { return handler(ec); });
}
CB_LOG_DEBUG(R"([{}]: use TLS private key: "{}")", id_, origin_.key_path());
tls_.use_private_key_file(origin_.key_path(), asio::ssl::context::file_format::pem, ec);
if (ec) {
CB_LOG_ERROR("[{}]: unable to load private key \"{}\": {}", id_, origin_.key_path(), ec.message());
return handler(ec);
return close([ec, handler = std::forward<Handler>(handler)]() mutable { return handler(ec); });
}
}
session_ = io::mcbp_session(id_, ctx_, tls_, origin_, dns_srv_tracker_);
Expand Down Expand Up @@ -465,6 +472,9 @@ class cluster : public std::enable_shared_from_this<cluster>
}
});
}
if (ec) {
return self->close([ec, handler = std::forward<Handler>(handler)]() mutable { handler(ec); });
}
handler(ec);
});
}
Expand Down
2 changes: 1 addition & 1 deletion core/impl/cluster.cxx
Expand Up @@ -169,7 +169,7 @@ initiate_cluster_connect(asio::io_service& io,
auto c = couchbase::cluster(core);
// create txns as we want to start cleanup immediately if configured with metadata_collection
[[maybe_unused]] std::shared_ptr<couchbase::transactions::transactions> t = c.transactions();
handler(c, {});
handler(std::move(c), {});
});
}

Expand Down
6 changes: 4 additions & 2 deletions core/meta/CMakeLists.txt
Expand Up @@ -7,8 +7,10 @@ target_link_libraries(
snappy
fmt::fmt
spdlog::spdlog)
target_include_directories(couchbase_meta PRIVATE ${PROJECT_BINARY_DIR}/generated ../..
../../third_party/http_parser)
target_include_directories(couchbase_meta PRIVATE
${PROJECT_BINARY_DIR}/generated
${PROJECT_SOURCE_DIR}
${PROJECT_SOURCE_DIR}/third_party/http_parser)

if(NOT COUCHBASE_CXX_CLIENT_POST_LINKED_OPENSSL)
target_link_libraries(couchbase_meta PUBLIC OpenSSL::SSL OpenSSL::Crypto)
Expand Down
5 changes: 4 additions & 1 deletion core/metrics/CMakeLists.txt
Expand Up @@ -7,4 +7,7 @@ target_link_libraries(
hdr_histogram_static
fmt::fmt
spdlog::spdlog)
target_include_directories(couchbase_metrics PRIVATE ../.. ../../third_party/hdr_histogram_c/src)
target_include_directories(couchbase_metrics PRIVATE
${PROJECT_BINARY_DIR}/generated
${PROJECT_SOURCE_DIR}
${PROJECT_SOURCE_DIR}/third_party/hdr_histogram_c/src)
51 changes: 46 additions & 5 deletions core/metrics/logging_meter.cxx
Expand Up @@ -17,12 +17,16 @@

#include "logging_meter.hxx"

#include "couchbase/build_info.hxx"

#include "core/logger/logger.hxx"
#include "core/utils/json.hxx"
#include "noop_meter.hxx"

#include "third_party/hdr_histogram_c/src/hdr_histogram.h"
#include <hdr_histogram.h>

#include <gsl/assert>

#include <memory>
#include <utility>

Expand Down Expand Up @@ -142,7 +146,7 @@ logging_meter::log_report() const
{

{ "emit_interval_s", std::chrono::duration_cast<std::chrono::seconds>(options_.emit_interval).count() },
#if BACKEND_DEBUG_BUILD
#if COUCHBASE_CXX_CLIENT_DEBUG_BUILD
{ "emit_interval_ms", options_.emit_interval.count() },
#endif
},
Expand All @@ -158,22 +162,59 @@ logging_meter::log_report() const
}
}

logging_meter::logging_meter(asio::io_context& ctx, logging_meter_options options)
: emit_report_(ctx)
, options_(options)
{
}

logging_meter::~logging_meter()
{
emit_report_.cancel();
log_report();
}

void
logging_meter::rearm_reporter()
{
emit_report_.expires_after(options_.emit_interval);
emit_report_.async_wait([self = shared_from_this()](std::error_code ec) {
if (ec == asio::error::operation_aborted) {
return;
}
self->log_report();
self->rearm_reporter();
});
}

void
logging_meter::start()
{
rearm_reporter();
}

void
logging_meter::stop()
{
emit_report_.cancel();
}

std::shared_ptr<couchbase::metrics::value_recorder>
logging_meter::get_value_recorder(const std::string& name, const std::map<std::string, std::string>& tags)
{
static std::shared_ptr<noop_value_recorder> noop_recorder{ std::make_shared<noop_value_recorder>() };

if (static std::string meter_name = "db.couchbase.operations"; name != meter_name) {
if (static const std::string meter_name = "db.couchbase.operations"; name != meter_name) {
return noop_recorder;
}

static std::string service_tag = "db.couchbase.service";
static const std::string service_tag = "db.couchbase.service";
auto service = tags.find(service_tag);
if (service == tags.end()) {
return noop_recorder;
}

static std::string operation_tag = "db.operation";
static const std::string operation_tag = "db.operation";
auto operation = tags.find(operation_tag);
if (operation == tags.end()) {
return noop_recorder;
Expand Down
37 changes: 10 additions & 27 deletions core/metrics/logging_meter.hxx
Expand Up @@ -29,46 +29,29 @@ namespace couchbase::core::metrics
{
class logging_value_recorder;

class logging_meter : public couchbase::metrics::meter
class logging_meter
: public couchbase::metrics::meter
, public std::enable_shared_from_this<logging_meter>
{
private:
asio::steady_timer emit_report_;
logging_meter_options options_;
std::mutex recorders_mutex_{};
// service name -> operation name -> recorder
std::map<std::string, std::map<std::string, std::shared_ptr<logging_value_recorder>>> recorders_{};

void log_report() const;

void rearm_reporter()
{
emit_report_.expires_after(options_.emit_interval);
emit_report_.async_wait([this](std::error_code ec) {
if (ec == asio::error::operation_aborted) {
return;
}
log_report();
rearm_reporter();
});
}
void rearm_reporter();

public:
logging_meter(asio::io_context& ctx, logging_meter_options options)
: emit_report_(ctx)
, options_(options)
{
rearm_reporter();
}
logging_meter(asio::io_context& ctx, logging_meter_options options);

~logging_meter() override
{
emit_report_.cancel();
log_report();
}
~logging_meter() override;

void start()
{
rearm_reporter();
}
void start() override;

void stop() override;

std::shared_ptr<couchbase::metrics::value_recorder> get_value_recorder(const std::string& name,
const std::map<std::string, std::string>& tags) override;
Expand Down
4 changes: 3 additions & 1 deletion core/tracing/CMakeLists.txt
Expand Up @@ -6,4 +6,6 @@ target_link_libraries(
project_warnings
fmt::fmt
spdlog::spdlog)
target_include_directories(couchbase_tracing PRIVATE ../..)
target_include_directories(couchbase_tracing PRIVATE
${PROJECT_BINARY_DIR}/generated
${PROJECT_SOURCE_DIR})
23 changes: 19 additions & 4 deletions core/tracing/threshold_logging_tracer.cxx
Expand Up @@ -17,6 +17,8 @@

#include "threshold_logging_tracer.hxx"

#include "couchbase/build_info.hxx"

#include "constants.hxx"
#include "core/logger/logger.hxx"
#include "core/meta/version.hxx"
Expand All @@ -25,10 +27,11 @@
#include "core/utils/json.hxx"

#include <asio/steady_timer.hpp>
#include <tao/json/value.hpp>

#include <chrono>
#include <mutex>
#include <queue>
#include <tao/json/value.hpp>

namespace couchbase::core::tracing
{
Expand Down Expand Up @@ -285,6 +288,12 @@ class threshold_logging_tracer_impl
rearm_threshold_reporter();
}

void stop()
{
emit_orphan_report_.cancel();
emit_threshold_report_.cancel();
}

void add_orphan(std::shared_ptr<threshold_logging_span> span)
{
orphan_queue_.emplace(convert(std::move(span)));
Expand Down Expand Up @@ -338,7 +347,7 @@ class threshold_logging_tracer_impl
tao::json::value report
{
{ "count", queue.size() },
#if BACKEND_DEBUG_BUILD
#if COUCHBASE_CXX_CLIENT_DEBUG_BUILD
{ "emit_interval_ms", options_.orphaned_emit_interval.count() }, { "sample_size", options_.orphaned_sample_size },
#endif
};
Expand All @@ -361,7 +370,7 @@ class threshold_logging_tracer_impl
tao::json::value report
{
{ "count", queue.size() }, { "service", fmt::format("{}", service) },
#if BACKEND_DEBUG_BUILD
#if COUCHBASE_CXX_CLIENT_DEBUG_BUILD
{ "emit_interval_ms", options_.threshold_emit_interval.count() }, { "sample_size", options_.threshold_sample_size },
{ "threshold_ms",
std::chrono::duration_cast<std::chrono::microseconds>(options_.threshold_for_service(service)).count() },
Expand Down Expand Up @@ -403,8 +412,8 @@ threshold_logging_tracer::report(std::shared_ptr<threshold_logging_span> span)

threshold_logging_tracer::threshold_logging_tracer(asio::io_context& ctx, threshold_logging_options options)
: options_{ options }
, impl_(std::make_shared<threshold_logging_tracer_impl>(options_, ctx))
{
impl_ = std::make_shared<threshold_logging_tracer_impl>(options_, ctx);
}

void
Expand All @@ -413,6 +422,12 @@ threshold_logging_tracer::start()
impl_->start();
}

void
threshold_logging_tracer::stop()
{
impl_->stop();
}

void
threshold_logging_span::end()
{
Expand Down
4 changes: 2 additions & 2 deletions core/tracing/threshold_logging_tracer.hxx
Expand Up @@ -38,11 +38,11 @@ class threshold_logging_tracer
public:
threshold_logging_tracer(asio::io_context& ctx, threshold_logging_options options);

void start();

std::shared_ptr<couchbase::tracing::request_span> start_span(std::string name,
std::shared_ptr<couchbase::tracing::request_span> parent) override;
void report(std::shared_ptr<threshold_logging_span> span);
void start() override;
void stop() override;

private:
threshold_logging_options options_;
Expand Down
16 changes: 16 additions & 0 deletions couchbase/metrics/meter.hxx
Expand Up @@ -46,6 +46,22 @@ class meter
meter& operator=(meter&& other) = default;
virtual ~meter() = default;

/**
* SDK invokes this method when cluster is ready to emit metrics. Override it as NO-OP if no action is necessary.
*/
virtual void start()
{
/* do nothing */
}

/**
* SDK invokes this method when cluster is closed. Override it as NO-OP if no action is necessary.
*/
virtual void stop()
{
/* do nothing */
}

virtual std::shared_ptr<value_recorder> get_value_recorder(const std::string& name, const std::map<std::string, std::string>& tags) = 0;
};

Expand Down
16 changes: 16 additions & 0 deletions couchbase/tracing/request_tracer.hxx
Expand Up @@ -72,6 +72,22 @@ class request_tracer
request_tracer& operator=(request_tracer&& other) = default;
virtual ~request_tracer() = default;

/**
* SDK invokes this method when cluster is ready to trace. Override it as NO-OP if no action is necessary.
*/
virtual void start()
{
/* do nothing */
}

/**
* SDK invokes this method when cluster is closed. Override it as NO-OP if no action is necessary.
*/
virtual void stop()
{
/* do nothing */
}

virtual std::shared_ptr<request_span> start_span(std::string name, std::shared_ptr<request_span> parent = {}) = 0;
};

Expand Down