Skip to content

Commit

Permalink
feat(appsignal sink): Normalize metrics (vectordotdev#18217)
Browse files Browse the repository at this point in the history
* feat(appsignal sink): Normalize metrics

Implement a normaliser for the AppSignal sink to convert absolute
counter metrics to incremental counters, and incremental gauges to
absolute gauges. The AppSignal API ignores absolute counters and
incremental gauges, so this change adds support for absolute counters
and incremental gauges.

This normaliser is inspired by the DataDog normaliser.

* Refactor metric normalizer tests

Move the methods that generate test metrics and the code that
compares metric inputs and normalized outputs to the `test_util`
metrics module. Previously these test helpers were duplicated
across DataDog, StatsD and AppSignal's metric sinks.

Rename the `run_comparisons` method to `assert_normalize`, as it
asserts the results of running a normalizer's `.normalize` method.

Move the duplicated test implementations to the `test_util::metrics`
module, in a separate tests sub-module, and make them generic over
the normalizer. Use these test definitions in the DataDog, StatsD
and AppSignal's metric sink tests.

* Fix integration tests

Since the AppSignal sink now normalises counters from absolute to
incremental, absolute counters that are only emitted once do not
result in an outgoing HTTP request being emitted by the sink.

Address this by emitting the absolute counters in the tests at
least twice. This also implicitly tests the metrics' normalisation.
  • Loading branch information
unflxw committed Aug 21, 2023
1 parent 71343bd commit 61c0ae8
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 552 deletions.
52 changes: 34 additions & 18 deletions src/sinks/appsignal/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,23 @@ async fn metrics_real_endpoint() {
#[tokio::test]
async fn metrics_shape() {
let events: Vec<_> = (0..5)
.map(|index| {
Event::Metric(Metric::new(
format!("counter_{}", index),
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
.flat_map(|index| {
vec![
Event::Metric(Metric::new(
format!("counter_{}", index),
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
)),
Event::Metric(Metric::new(
format!("counter_{}", index),
MetricKind::Absolute,
MetricValue::Counter {
value: (index + index) as f64,
},
)),
]
})
.collect();
let api_key = push_api_key();
Expand Down Expand Up @@ -146,11 +155,11 @@ async fn metrics_shape() {
.collect();
assert_eq!(
vec![
("counter_0", "absolute", 0.0),
("counter_1", "absolute", 1.0),
("counter_2", "absolute", 2.0),
("counter_3", "absolute", 3.0),
("counter_4", "absolute", 4.0),
("counter_0", "incremental", 0.0),
("counter_1", "incremental", 1.0),
("counter_2", "incremental", 2.0),
("counter_3", "incremental", 3.0),
("counter_4", "incremental", 4.0),
],
metrics
);
Expand Down Expand Up @@ -231,11 +240,18 @@ async fn error_scenario_real_endpoint() {

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events = vec![Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
))];
let events = vec![
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
)),
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 2.0 },
)),
];
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

mod config;
mod encoder;
mod normalizer;
mod request_builder;
mod service;
mod sink;
Expand Down
78 changes: 78 additions & 0 deletions src/sinks/appsignal/normalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use vector_core::event::{Metric, MetricValue};

use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

#[derive(Default)]
pub(crate) struct AppsignalMetricsNormalizer;

impl MetricNormalize for AppsignalMetricsNormalizer {
fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
// We only care about making sure that counters are incremental, and that gauges are
// always absolute. Other metric types are currently unsupported.
match &metric.value() {
// We always send counters as incremental and gauges as absolute. Realistically, any
// system sending an incremental gauge update is kind of doing it wrong, but alas.
MetricValue::Counter { .. } => state.make_incremental(metric),
MetricValue::Gauge { .. } => state.make_absolute(metric),
// Otherwise, send it through as-is.
_ => Some(metric),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use crate::event::{Metric, MetricKind, MetricValue};

use super::AppsignalMetricsNormalizer;
use crate::test_util::metrics::{assert_normalize, tests};

#[test]
fn absolute_counter() {
tests::absolute_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
}

#[test]
fn incremental_counter() {
tests::incremental_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
}

#[test]
fn mixed_counter() {
tests::mixed_counter_normalize_to_incremental(AppsignalMetricsNormalizer);
}

#[test]
fn absolute_gauge() {
tests::absolute_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
}

#[test]
fn incremental_gauge() {
tests::incremental_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
}

#[test]
fn mixed_gauge() {
tests::mixed_gauge_normalize_to_absolute(AppsignalMetricsNormalizer);
}

#[test]
fn other_metrics() {
let metric = Metric::new(
"set",
MetricKind::Incremental,
MetricValue::Set {
values: BTreeSet::new(),
},
);

assert_normalize(
AppsignalMetricsNormalizer,
vec![metric.clone()],
vec![Some(metric)],
);
}
}
15 changes: 13 additions & 2 deletions src/sinks/appsignal/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::{stream::BoxStream, StreamExt};
use futures_util::future::ready;
use tower::{Service, ServiceBuilder};
use vector_core::{
event::Event,
Expand All @@ -7,12 +8,14 @@ use vector_core::{
};

use crate::{
codecs::Transformer, internal_events::SinkRequestBuildError,
sinks::util::builder::SinkBuilderExt, sinks::util::Compression,
codecs::Transformer,
internal_events::SinkRequestBuildError,
sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression},
};

use super::{
encoder::AppsignalEncoder,
normalizer::AppsignalMetricsNormalizer,
request_builder::{AppsignalRequest, AppsignalRequestBuilder},
};

Expand All @@ -32,8 +35,16 @@ where
{
pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = ServiceBuilder::new().service(self.service);
let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();

input
.filter_map(move |event| {
ready(if let Event::Metric(metric) = event {
normalizer.normalize(metric).map(Event::Metric)
} else {
Some(event)
})
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(
None,
Expand Down
Loading

0 comments on commit 61c0ae8

Please sign in to comment.