Skip to content

Commit

Permalink
Add 2 Sidecar actions to register and push metrics (#365)
Browse files Browse the repository at this point in the history
* WIP add support metric points

* Address Bob's comments

* Move the metrics HashMap to the AppInstance to avoid recreating identical metrics at each push

* Remove MetricDefinition, use already existing MetricContext

* Fix coding style

* Implement missing process_immediately for new SidecarActions

Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>

---------

Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
Co-authored-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
iamluc and bwoebi committed Mar 26, 2024
1 parent 1ff1bfe commit 1d1c247
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
2 changes: 1 addition & 1 deletion ddtelemetry/src/data/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Distribution {
pub interval: u64,
}

#[derive(Serialize, Debug, Clone, Copy)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[serde(rename_all = "snake_case")]
#[repr(C)]
pub enum MetricNamespace {
Expand Down
2 changes: 1 addition & 1 deletion ddtelemetry/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl MetricBuckets {
}
}

#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct MetricContext {
pub namespace: data::metrics::MetricNamespace,
pub name: String,
Expand Down
67 changes: 61 additions & 6 deletions sidecar/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use datadog_ipc::tarpc;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils;
use datadog_trace_utils::trace_utils::{SendData, TracerHeaderTags};
use ddcommon::Endpoint;
use ddcommon::{tag::Tag, Endpoint};
use ddtelemetry::{
data,
metrics::{ContextKey, MetricContext},
worker::{
store::Store, LifecycleAction, TelemetryActions, TelemetryWorkerBuilder,
TelemetryWorkerHandle, MAX_ITEMS,
Expand Down Expand Up @@ -101,6 +102,8 @@ pub enum RequestIdentifier {
#[derive(Debug, Deserialize, Serialize)]
pub enum SidecarAction {
Telemetry(TelemetryActions),
RegisterTelemetryMetric(MetricContext),
AddTelemetryMetricPoint((String, f64, Vec<Tag>)),
PhpComposerTelemetryFile(PathBuf),
}

Expand Down Expand Up @@ -365,12 +368,39 @@ impl RuntimeInfo {
struct AppInstance {
telemetry: TelemetryWorkerHandle,
telemetry_worker_shutdown: Shared<BoxFuture<'static, Option<()>>>,
telemetry_metrics: HashMap<String, ContextKey>,
}

impl AppInstance {
pub fn register_metric(&mut self, metric: MetricContext) {
if !self.telemetry_metrics.contains_key(&metric.name) {
self.telemetry_metrics.insert(
metric.name.clone(),
self.telemetry.register_metric_context(
metric.name,
metric.tags,
metric.metric_type,
metric.common,
metric.namespace,
),
);
}
}

pub fn to_telemetry_point(
&self,
(name, val, tags): (String, f64, Vec<Tag>),
) -> TelemetryActions {
TelemetryActions::AddPoint((val, *self.telemetry_metrics.get(&name).unwrap(), tags))
}
}

struct EnqueuedTelemetryData {
dependencies: Store<data::Dependency>,
configurations: Store<data::Configuration>,
integrations: Store<data::Integration>,
metrics: Vec<MetricContext>,
points: Vec<(String, f64, Vec<Tag>)>,
actions: Vec<TelemetryActions>,
computed_dependencies: Vec<Shared<ManualFuture<Arc<Vec<data::Dependency>>>>>,
}
Expand All @@ -381,6 +411,8 @@ impl Default for EnqueuedTelemetryData {
dependencies: Store::new(MAX_ITEMS),
configurations: Store::new(MAX_ITEMS),
integrations: Store::new(MAX_ITEMS),
metrics: Vec::new(),
points: Vec::new(),
actions: Vec::new(),
computed_dependencies: Vec::new(),
}
Expand Down Expand Up @@ -411,6 +443,9 @@ impl EnqueuedTelemetryData {
SidecarAction::PhpComposerTelemetryFile(composer_path) => self
.computed_dependencies
.push(Self::extract_composer_telemetry(composer_path).shared()),

SidecarAction::RegisterTelemetryMetric(m) => self.metrics.push(m),
SidecarAction::AddTelemetryMetricPoint(p) => self.points.push(p),
}
}
}
Expand Down Expand Up @@ -438,7 +473,10 @@ impl EnqueuedTelemetryData {
}
}

pub async fn process_immediately(sidecar_actions: Vec<SidecarAction>) -> Vec<TelemetryActions> {
pub async fn process_immediately(
sidecar_actions: Vec<SidecarAction>,
app: &mut AppInstance,
) -> Vec<TelemetryActions> {
let mut actions = vec![];
for action in sidecar_actions {
match action {
Expand All @@ -448,6 +486,10 @@ impl EnqueuedTelemetryData {
actions.push(TelemetryActions::AddDependecy(nested.clone()));
}
}
SidecarAction::RegisterTelemetryMetric(metric) => app.register_metric(metric),
SidecarAction::AddTelemetryMetricPoint(point) => {
actions.push(app.to_telemetry_point(point));
}
}
}
actions
Expand Down Expand Up @@ -907,6 +949,7 @@ impl SidecarServer {
let instance = AppInstance {
telemetry: handle,
telemetry_worker_shutdown: worker_join.map(Result::ok).boxed().shared(),
telemetry_metrics: HashMap::new(),
};

instance.telemetry.send_msgs(inital_actions).await.ok();
Expand Down Expand Up @@ -1028,8 +1071,9 @@ impl SidecarInterface for SidecarServer {
} else {
return;
};
if let Some(app) = app_future.await {
let actions = EnqueuedTelemetryData::process_immediately(actions).await;
if let Some(mut app) = app_future.await {
let actions =
EnqueuedTelemetryData::process_immediately(actions, &mut app).await;
app.telemetry.send_msgs(actions).await.ok();
}
});
Expand Down Expand Up @@ -1072,7 +1116,7 @@ impl SidecarInterface for SidecarServer {
let mut actions: Vec<TelemetryActions> = vec![];
enqueued_data.extract_telemetry_actions(&mut actions).await;

if let Some(app) = self
if let Some(mut app) = self
.get_app(
&instance_id,
&runtime_meta,
Expand All @@ -1082,7 +1126,18 @@ impl SidecarInterface for SidecarServer {
)
.await
{
let actions: Vec<_> = std::mem::take(&mut enqueued_data.actions);
// Register metrics
for metric in std::mem::take(&mut enqueued_data.metrics).into_iter() {
app.register_metric(metric);
}

let mut actions: Vec<_> = std::mem::take(&mut enqueued_data.actions);

// Send metric points
for point in std::mem::take(&mut enqueued_data.points) {
actions.push(app.to_telemetry_point(point));
}

// drop on stop
if actions.iter().any(|action| {
matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop))
Expand Down

0 comments on commit 1d1c247

Please sign in to comment.