Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/datadog/datadog_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,24 @@ void DatadogAgent::flush() {
return;
}

// Ideally:
/*auto [encode_result, duration] = mesure([&trace_chunks] {*/
/* std::string body;*/
/* msgpack_encode(body, trace_chunks);*/
/*});*/

std::string body;

auto beg = std::chrono::steady_clock::now();
auto encode_result = msgpack_encode(body, trace_chunks);
auto end = std::chrono::steady_clock::now();

telemetry::distribution::add(
metrics::tracer::trace_chunk_serialization_duration,
std::chrono::duration_cast<std::chrono::microseconds>(end - beg).count());
telemetry::distribution::add(metrics::tracer::trace_chunk_serialized_bytes,
static_cast<uint64_t>(body.size()));

if (auto* error = encode_result.if_error()) {
logger_->log_error(*error);
return;
Expand Down Expand Up @@ -311,11 +327,17 @@ void DatadogAgent::flush() {
};

telemetry::counter::increment(metrics::tracer::api::requests);
telemetry::distribution::add(metrics::tracer::api::bytes_sent,
static_cast<uint64_t>(body.size()));

auto post_result =
http_client_->post(traces_endpoint_, std::move(set_request_headers),
std::move(body), std::move(on_response),
std::move(on_error), clock_().tick + request_timeout_);
if (auto* error = post_result.if_error()) {
// NOTE(@dmehala): `technical` is a better kind of errors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

telemetry::counter::increment(metrics::tracer::api::errors,
{"type:network"});
logger_->log_error(
error->with_prefix("Unexpected error submitting traces: "));
}
Expand Down
125 changes: 98 additions & 27 deletions src/datadog/telemetry/telemetry_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,40 @@ using namespace datadog::tracing;
using namespace std::chrono_literals;

namespace datadog::telemetry {
namespace internal_metrics {

/// The number of logs created with a given log level. Useful for calculating
/// impact for other features (automatic sending of logs). Levels should be one
/// of `debug`, `info`, `warn`, `error`, `critical`.
const telemetry::Counter logs_created{"logs_created", "general", true};

/// The number of requests sent to the api endpoint in the agent that errored,
/// tagged by the error type (e.g. `type:timeout`, `type:network`,
/// `type:status_code`) and Endpoint (`endpoint:agent`, `endpoint:agentless`).
const telemetry::Counter errors{"telemetry_api.errors", "telemetry", true};

/// The number of requests sent to a telemetry endpoint, regardless of success,
/// tagged by the endpoint (`endpoint:agent`, `endpoint:agentless`).
const telemetry::Counter requests{"telemetry_api.requests", "telemetry", true};

/// The number of responses received from the endpoint, tagged with status code
/// (`status_code:200`, `status_code:404`) and endpoint (`endpoint:agent`,
/// `endpoint:agentless`).
const telemetry::Counter responses{"telemetry_api.responses", "telemetry",
true};

/// The size of the payload sent to the stats endpoint in bytes, tagged by the
/// endpoint (`endpoint:agent`, `endpoint:agentless`).
const telemetry::Distribution bytes_sent{"telemetry_api.bytes", "telemetry",
true};

/// The time it takes to send the payload sent to the endpoint in ms, tagged by
/// the endpoint (`endpoint:agent`, `endpoint:agentless`).
const telemetry::Distribution request_duration{"telemetry_api.ms", "telemetry",
true};

} // namespace internal_metrics

namespace {

HTTPClient::URL make_telemetry_endpoint(HTTPClient::URL url) {
Expand Down Expand Up @@ -174,26 +208,8 @@ Telemetry::Telemetry(FinalizedConfiguration config,
host_info_(get_host_info()) {
// Callback for successful telemetry HTTP requests, to examine HTTP
// status.
telemetry_on_response_ = [logger = logger_](
int response_status,
const DictReader& /*response_headers*/,
std::string response_body) {
if (response_status < 200 || response_status >= 300) {
logger->log_error([&](auto& stream) {
stream << "Unexpected telemetry response status " << response_status
<< " with body (if any, starts on next line):\n"
<< response_body;
});
}
};

// Callback for unsuccessful telemetry HTTP requests.
telemetry_on_error_ = [logger = logger_](Error error) {
logger->log_error(error.with_prefix(
"Error occurred during HTTP request for telemetry: "));
};

send_telemetry("app-started", app_started());
http_client_->drain(clock_().tick + 2s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this is here to make sure app-started is the absolute first. What is the problem if it is not the first?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per telemetry doc:

Telemetry Lifecycle

The first event we should receive is app-started. [...]

schedule_tasks();
}

Expand All @@ -216,20 +232,23 @@ Telemetry::~Telemetry() {
// The app-closing message is bundled with a message containing the
// final metric values.
send_telemetry("app-closing", app_closing());
http_client_->drain(clock_().tick + 1s);
http_client_->drain(clock_().tick + 2s);
}
}

Telemetry::Telemetry(Telemetry&& rhs)
: config_(std::move(rhs.config_)),
logger_(std::move(rhs.logger_)),
telemetry_on_response_(std::move(rhs.telemetry_on_response_)),
telemetry_on_error_(std::move(rhs.telemetry_on_error_)),
telemetry_endpoint_(std::move(rhs.telemetry_endpoint_)),
tracer_signature_(std::move(rhs.tracer_signature_)),
http_client_(rhs.http_client_),
clock_(std::move(rhs.clock_)),
scheduler_(std::move(rhs.scheduler_)),
counters_(std::move(rhs.counters_)),
counters_snapshot_(std::move(rhs.counters_snapshot_)),
rates_(std::move(rhs.rates_)),
rates_snapshot_(std::move(rhs.rates_snapshot_)),
distributions_(std::move(rhs.distributions_)),
seq_id_(rhs.seq_id_),
config_seq_ids_(rhs.config_seq_ids_),
host_info_(rhs.host_info_) {
Expand All @@ -242,13 +261,17 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) {
cancel_tasks(rhs.tasks_);
std::swap(config_, rhs.config_);
std::swap(logger_, rhs.logger_);
std::swap(telemetry_on_response_, rhs.telemetry_on_response_);
std::swap(telemetry_on_error_, rhs.telemetry_on_error_);
std::swap(telemetry_endpoint_, rhs.telemetry_endpoint_);
std::swap(http_client_, rhs.http_client_);
std::swap(tracer_signature_, rhs.tracer_signature_);
std::swap(http_client_, rhs.http_client_);
std::swap(clock_, rhs.clock_);
std::swap(scheduler_, rhs.scheduler_);
std::swap(counters_, rhs.counters_);
std::swap(counters_snapshot_, rhs.counters_snapshot_);
std::swap(rates_, rhs.rates_);
std::swap(rates_snapshot_, rhs.rates_snapshot_);
std::swap(distributions_, rhs.distributions_);
std::swap(seq_id_, rhs.seq_id_);
std::swap(config_seq_ids_, rhs.config_seq_ids_);
std::swap(host_info_, rhs.host_info_);
Expand All @@ -259,16 +282,19 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) {

void Telemetry::log_error(std::string message) {
if (!config_.report_logs) return;
increment_counter(internal_metrics::logs_created, {"level:error"});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to do these inside the log function itself?

Copy link
Collaborator Author

@dmehala dmehala Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because increment_counter(internal_metrics::logs_created, {"level:error"}); is more readable than increment_counter(internal_metrics::logs_created, {std::string("level:) + to_string(level)"});

log(std::move(message), LogLevel::ERROR);
}

void Telemetry::log_error(std::string message, std::string stacktrace) {
if (!config_.report_logs) return;
increment_counter(internal_metrics::logs_created, {"level:error"});
log(std::move(message), LogLevel::ERROR, stacktrace);
}

void Telemetry::log_warning(std::string message) {
if (!config_.report_logs) return;
increment_counter(internal_metrics::logs_created, {"level:warning"});
log(std::move(message), LogLevel::WARNING);
}

Expand All @@ -293,10 +319,55 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) {
}
};

auto post_result = http_client_->post(
telemetry_endpoint_, set_telemetry_headers, std::move(payload),
telemetry_on_response_, telemetry_on_error_, clock_().tick + 5s);
auto telemetry_on_response = [this, logger = logger_](
int response_status,
const DictReader& /*response_headers*/,
std::string response_body) {
if (response_status >= 500) {
increment_counter(internal_metrics::responses,
{"status_code:5xx", "endpoint:agent"});
} else if (response_status >= 400) {
increment_counter(internal_metrics::responses,
{"status_code:4xx", "endpoint:agent"});
} else if (response_status >= 300) {
increment_counter(internal_metrics::responses,
{"status_code:3xx", "endpoint:agent"});
} else if (response_status >= 200) {
increment_counter(internal_metrics::responses,
{"status_code:2xx", "endpoint:agent"});
} else if (response_status >= 100) {
increment_counter(internal_metrics::responses,
{"status_code:1xx", "endpoint:agent"});
}

if (response_status < 200 || response_status >= 300) {
logger->log_error([&](auto& stream) {
stream << "Unexpected telemetry response status " << response_status
<< " with body (if any, starts on next line):\n"
<< response_body;
});
}
};

// Callback for unsuccessful telemetry HTTP requests.
auto telemetry_on_error = [this, logger = logger_](Error error) {
increment_counter(internal_metrics::errors,
{"type:network", "endpoint:agent"});
logger->log_error(error.with_prefix(
"Error occurred during HTTP request for telemetry: "));
};

increment_counter(internal_metrics::requests, {"endpoint:agent"});
add_datapoint(internal_metrics::bytes_sent, {"endpoint:agent"},
payload.size());

auto post_result =
http_client_->post(telemetry_endpoint_, set_telemetry_headers,
std::move(payload), std::move(telemetry_on_response),
std::move(telemetry_on_error), clock_().tick + 5s);
if (auto* error = post_result.if_error()) {
increment_counter(internal_metrics::errors,
{"type:network", "endpoint:agent"});
logger_->log_error(
error->with_prefix("Unexpected error submitting telemetry event: "));
}
Expand Down
2 changes: 0 additions & 2 deletions src/datadog/telemetry/telemetry_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class Telemetry final {
/// Shared pointer to the user logger instance.
std::shared_ptr<tracing::Logger> logger_;
std::vector<tracing::EventScheduler::Cancel> tasks_;
tracing::HTTPClient::ResponseHandler telemetry_on_response_;
tracing::HTTPClient::ErrorHandler telemetry_on_error_;
tracing::HTTPClient::URL telemetry_endpoint_;
tracing::TracerSignature tracer_signature_;
std::shared_ptr<tracing::HTTPClient> http_client_;
Expand Down
36 changes: 36 additions & 0 deletions src/datadog/telemetry_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,36 @@ namespace datadog::tracing::metrics {

namespace tracer {
const telemetry::Counter spans_created = {"spans_created", "tracers", true};
const telemetry::Counter spans_dropped = {"spans_dropped", "tracers", true};
const telemetry::Counter spans_finished = {"spans_finished", "tracers", true};

const telemetry::Counter trace_segments_created = {"trace_segments_created",
"tracers", true};

const telemetry::Counter trace_segments_closed = {"trace_segments_closed",
"tracers", true};

const telemetry::Distribution trace_chunk_size = {"trace_chunk_size", "tracers",
true};

const telemetry::Distribution trace_chunk_serialized_bytes = {
"trace_chunk_serialization.bytes", "tracers", true};

const telemetry::Distribution trace_chunk_serialization_duration = {
"trace_chunk_serialization.ms", "tracers", true};

const telemetry::Counter trace_chunks_enqueued = {"trace_chunks_enqueued",
"tracers", true};

const telemetry::Counter trace_chunks_enqueued_for_serialization = {
"trace_chunks_enqueued_for_serialization", "tracers", true};

const telemetry::Counter trace_chunks_dropped = {"trace_chunks_dropped",
"tracers", true};

const telemetry::Counter trace_chunks_sent = {"trace_chunks_sent", "tracers",
true};

const telemetry::Counter context_header_truncated = {
"context_header.truncated",
"tracers",
Expand All @@ -20,8 +43,21 @@ const telemetry::Counter context_header_truncated = {
namespace api {
const telemetry::Counter requests = {"trace_api.requests", "tracers", true};
const telemetry::Counter responses = {"trace_api.responses", "tracers", true};
const telemetry::Distribution bytes_sent = {"trace_api.bytes", "tracers", true};
const telemetry::Distribution request_duration = {"trace_api.ms", "tracers",
true};
const telemetry::Counter errors = {"trace_api.errors", "tracers", true};
} // namespace api

namespace trace_context {
const telemetry::Counter injected = {"context_header_style.injected", "tracers",
true};
const telemetry::Counter extracted = {"context_header_style.extracted",
"tracers", true};
const telemetry::Counter truncated = {"context_header.truncated", "tracers",
true};
} // namespace trace_context

} // namespace tracer

} // namespace datadog::tracing::metrics
Loading