Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/datadog/telemetry/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ TelemetryProxy make_telemetry(const Ctor_param& init) {

TelemetryProxy& instance(
const tracing::Optional<Ctor_param>& init = tracing::nullopt) {
static TelemetryProxy telemetry(make_telemetry(*init));
static TelemetryProxy telemetry = make_telemetry(*init);
return telemetry;
}

Expand Down
107 changes: 75 additions & 32 deletions src/datadog/telemetry/telemetry_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,14 @@ Telemetry::Telemetry(FinalizedConfiguration config,
clock_(std::move(clock)),
scheduler_(event_scheduler),
host_info_(get_host_info()) {
send_telemetry("app-started", app_started());
http_client_->drain(clock_().tick + request_timeout);
app_started();
schedule_tasks();
}

void Telemetry::schedule_tasks() {
tasks_.emplace_back(scheduler_->schedule_recurring_event(
config_.heartbeat_interval, [this]() {
send_telemetry("app-heartbeat", heartbeat_and_telemetry());
}));
config_.heartbeat_interval,
[this]() { send_payload("app-heartbeat", heartbeat_and_telemetry()); }));

if (config_.report_metrics) {
tasks_.emplace_back(scheduler_->schedule_recurring_event(
Expand All @@ -228,11 +226,7 @@ void Telemetry::schedule_tasks() {
Telemetry::~Telemetry() {
if (!tasks_.empty()) {
cancel_tasks(tasks_);
capture_metrics();
// The app-closing message is bundled with a message containing the
// final metric values.
send_telemetry("app-closing", app_closing());
http_client_->drain(clock_().tick + request_timeout);
app_closing();
}
}

Expand Down Expand Up @@ -298,31 +292,80 @@ void Telemetry::log_warning(std::string message) {
log(std::move(message), LogLevel::WARNING);
}

void Telemetry::send_telemetry(StringView request_type, std::string payload) {
void Telemetry::app_started() {
auto payload = app_started_payload();

auto on_headers = [payload_size = payload.size(),
debug_enabled = config_.debug](DictWriter& headers) {
headers.set("Content-Type", "application/json");
headers.set("Content-Length", std::to_string(payload_size));
headers.set("DD-Telemetry-API-Version", "v2");
headers.set("DD-Client-Library-Language", "cpp");
headers.set("DD-Client-Library-Version", tracer_version);
headers.set("DD-Telemetry-Request-Type", "app-started");
if (debug_enabled) {
headers.set("DD-Telemetry-Debug-Enabled", "true");
}
};

auto on_response = [logger = logger_](int response_status, const DictReader&,
std::string response_body) {
if (response_status < 200 || response_status >= 300) {
logger->log_error([&](auto& stream) {
stream << "Unexpected telemetry response status " << response_status
<< " with body (if any, starts on next line):\n"
<< response_body;
});
}
};

auto on_error = [logger = logger_](Error error) {
logger->log_error(error.with_prefix(
"Error occurred during HTTP request for telemetry: "));
};

increment_counter(internal_metrics::requests, {"endpoint:agent"});
add_datapoint(internal_metrics::bytes_sent, {"endpoint:agent"},
payload.size());

auto post_result =
http_client_->post(telemetry_endpoint_, on_headers, std::move(payload),
std::move(on_response), std::move(on_error),
clock_().tick + request_timeout);
if (auto* error = post_result.if_error()) {
increment_counter(internal_metrics::errors,
{"type:network", "endpoint:agent"});
logger_->log_error(
error->with_prefix("Unexpected error submitting telemetry event: "));
}
}

void Telemetry::app_closing() {
// Capture metrics in-between two ticks to be sent with the last payload.
capture_metrics();

send_payload("app-closing", app_closing_payload());
http_client_->drain(clock_().tick + request_timeout);
}

void Telemetry::send_payload(StringView request_type, std::string payload) {
auto set_telemetry_headers = [request_type, payload_size = payload.size(),
debug_enabled = config_.debug,
tracer_signature =
&tracer_signature_](DictWriter& headers) {
/*
TODO:
Datadog-Container-ID
*/
debug_enabled =
config_.debug](DictWriter& headers) {
headers.set("Content-Type", "application/json");
headers.set("Content-Length", std::to_string(payload_size));
headers.set("DD-Telemetry-API-Version", "v2");
headers.set("DD-Client-Library-Language", "cpp");
headers.set("DD-Client-Library-Version", tracer_signature->library_version);
headers.set("DD-Client-Library-Version", tracer_version);
headers.set("DD-Telemetry-Request-Type", request_type);

if (debug_enabled) {
headers.set("DD-Telemetry-Debug-Enabled", "true");
}
};

auto telemetry_on_response = [this, logger = logger_](
int response_status,
const DictReader& /*response_headers*/,
std::string response_body) {
auto on_response = [this, logger = logger_](int response_status,
const DictReader&,
std::string response_body) {
if (response_status >= 500) {
increment_counter(internal_metrics::responses,
{"status_code:5xx", "endpoint:agent"});
Expand Down Expand Up @@ -350,7 +393,7 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) {
};

// Callback for unsuccessful telemetry HTTP requests.
auto telemetry_on_error = [this, logger = logger_](Error error) {
auto on_error = [this, logger = logger_](Error error) {
increment_counter(internal_metrics::errors,
{"type:network", "endpoint:agent"});
logger->log_error(error.with_prefix(
Expand All @@ -361,10 +404,10 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) {
add_datapoint(internal_metrics::bytes_sent, {"endpoint:agent"},
payload.size());

auto post_result = http_client_->post(
telemetry_endpoint_, set_telemetry_headers, std::move(payload),
std::move(telemetry_on_response), std::move(telemetry_on_error),
clock_().tick + request_timeout);
auto post_result =
http_client_->post(telemetry_endpoint_, set_telemetry_headers,
std::move(payload), std::move(on_response),
std::move(on_error), clock_().tick + request_timeout);
if (auto* error = post_result.if_error()) {
increment_counter(internal_metrics::errors,
{"type:network", "endpoint:agent"});
Expand All @@ -390,7 +433,7 @@ void Telemetry::send_configuration_change() {
telemetry_body["payload"] =
nlohmann::json{{"configuration", configuration_json}};

send_telemetry("app-client-configuration-change", telemetry_body.dump());
send_payload("app-client-configuration-change", telemetry_body.dump());
}

std::string Telemetry::heartbeat_and_telemetry() {
Expand Down Expand Up @@ -475,7 +518,7 @@ std::string Telemetry::heartbeat_and_telemetry() {
return message_batch_payload;
}

std::string Telemetry::app_closing() {
std::string Telemetry::app_closing_payload() {
auto batch_payloads = nlohmann::json::array();

auto app_closing = nlohmann::json::object({
Expand Down Expand Up @@ -532,7 +575,7 @@ std::string Telemetry::app_closing() {
return message_batch_payload;
}

std::string Telemetry::app_started() {
std::string Telemetry::app_started_payload() {
auto configuration_json = nlohmann::json::array();
auto product_json = nlohmann::json::object();

Expand Down
9 changes: 6 additions & 3 deletions src/datadog/telemetry/telemetry_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ class Telemetry final {
const std::vector<std::string>& tags, uint64_t value);

private:
void send_telemetry(tracing::StringView request_type, std::string payload);
void app_started();
void app_closing();

void send_payload(tracing::StringView request_type, std::string payload);

void schedule_tasks();

Expand All @@ -144,13 +147,13 @@ class Telemetry final {

// Constructs an `app-started` message using information provided when
// constructed and the tracer_config value passed in.
std::string app_started();
std::string app_started_payload();
// Constructs a messsage-batch containing `app-heartbeat`, and if metrics
// have been modified, a `generate-metrics` message.
std::string heartbeat_and_telemetry();
// Constructs a message-batch containing `app-closing`, and if metrics have
// been modified, a `generate-metrics` message.
std::string app_closing();
std::string app_closing_payload();
};

} // namespace datadog::telemetry