Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 159 additions & 114 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"pulse-common",
"pulse-drop-tester",
"pulse-metrics",
"pulse-promcli",
"pulse-promloadgen",
Expand All @@ -17,7 +18,7 @@ assert_matches = "1.5.0"
async-trait = "0.1"
aws-config = "1.8.5"
aws-credential-types = "1.2.5"
aws-sdk-sqs = "1.81.0"
aws-sdk-sqs = "1.82.1"
aws-sigv4 = "1.3.4"
aws-smithy-async = "1.2.5"
aws-smithy-http = "0.62.3"
Expand All @@ -40,8 +41,8 @@ bd-test-helpers = { git = "https://github.com/bitdriftlabs/shared-core.gi
bd-time = { git = "https://github.com/bitdriftlabs/shared-core.git" }
built = { version = "0.8", features = ["git2"] }
bytes = "1"
cc = "1.2.33"
clap = { version = "4.5.45", features = ["derive", "env"] }
cc = "1.2.34"
clap = { version = "4.5.46", features = ["derive", "env"] }
comfy-table = "7.1.4"
console-subscriber = "0.4.1"
criterion = { version = "0.7", features = ["html_reports"] }
Expand Down Expand Up @@ -129,7 +130,7 @@ topk = "0.5.0"
topological-sort = "0.2.2"
tracing = "0.1.41"
unwrap-infallible = "0.1.5"
url = "2.5.6"
url = "2.5.7"
uuid = { version = "1.18.0", features = ["v4"] }

vrl = { git = "https://github.com/mattklein123/vrl.git", branch = "performance-20250625", default-features = false, features = [
Expand Down
23 changes: 23 additions & 0 deletions pulse-drop-tester/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
edition = "2024"
license-file = "../LICENSE"
name = "pulse-drop-tester"
publish = false
version = "1.0.0"

[lib]
doctest = false

[dependencies]
anyhow.workspace = true
bd-log.workspace = true
bd-server-stats.workspace = true
clap.workspace = true
ctor.workspace = true
log.workspace = true
pretty_assertions.workspace = true
protobuf.workspace = true
pulse-common = { path = "../pulse-common" }
pulse-metrics = { path = "../pulse-metrics" }
pulse-protobuf = { path = "../pulse-protobuf" }
vrl.workspace = true
43 changes: 43 additions & 0 deletions pulse-drop-tester/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Drop tester

The drop tester binary (build via `cargo build --bin pulse-drop-tester`) is used to run self
contained tests.

The configuration for the tests is defined in Protobuf and passed to the tool as YAML. See here for
the [configuration definition](../pulse-protobuf/proto/pulse/drop_tester/v1/drop_tester.proto). The
binary is invoked with the following options:

```
Usage: pulse-drop-tester [OPTIONS] --config <CONFIG>

Options:
-c, --config <CONFIG>
--proxy-config <PROXY_CONFIG>
-h, --help Print help
```

The `-c` option is based the test config. Optionally `--proxy-config` can be used to pass a proxy
configuration to load drop processor configs from. This makes it easier to keep the real
configuration and the test cases in sync.

An example test file look as follows:

```yaml
test_cases:
- config:
rules:
- name: foo
conditions:
- metric_name:
exact: bar
metrics:
- input: bar:1|c
dropped_by: foo
- input: baz:1|g
```

The `dropped_by` test case specifies which rule name should drop the metric. If the metric should
not be dropped just leave it empty/missing.

Currently all input and output metrics are specified in DogStatsD format, even though internally
Prometheus style metrics will work just fine. In the future we will support both formats in tests.
128 changes: 128 additions & 0 deletions pulse-drop-tester/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// pulse - bitdrift's observability proxy
// Copyright Bitdrift, Inc. All rights reserved.
//
// Use of this source code is governed by a source available license that can be found in the
// LICENSE file or at:
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt

#[cfg(test)]
mod test;

use anyhow::{anyhow, bail};
use bd_server_stats::stats::Collector;
use config::bootstrap::v1::bootstrap::Config;
use config::processor::v1::processor::processor_config::Processor_type;
use drop::DropConfig;
use drop::drop_processor_config::Config_source;
use protobuf::Message;
use pulse_common::proto::yaml_to_proto;
use pulse_metrics::pipeline::processor::drop::TranslatedDropConfig;
use pulse_metrics::protos::metric::{DownstreamId, MetricSource, ParsedMetric};
use pulse_metrics::protos::statsd;
use pulse_protobuf::protos::pulse::config;
use pulse_protobuf::protos::pulse::config::common::v1::common::wire_protocol::StatsD;
use pulse_protobuf::protos::pulse::config::processor::v1::drop;
use pulse_protobuf::protos::pulse::config::processor::v1::processor::ProcessorConfig;
use pulse_protobuf::protos::pulse::drop_tester::v1::drop_tester::drop_test_case::Config_type;
use pulse_protobuf::protos::pulse::drop_tester::v1::drop_tester::{DropTestCase, DropTesterConfig};
use std::time::Instant;

#[ctor::ctor]
fn global_init() {
bd_log::SwapLogger::initialize();
}

fn run_test_case(test_case: DropTestCase, proxy_config: Option<&Config>) -> anyhow::Result<usize> {
fn extract_from_config(
field_name: &str,
proxy_config: Option<&Config>,
processor_name: &str,
extract: impl Fn(&ProcessorConfig) -> Option<DropConfig>,
) -> anyhow::Result<DropConfig> {
let Some(config) = proxy_config else {
bail!("{field_name} requires passing a proxy config via --proxy-config");
};
config
.pipeline()
.processors
.iter()
.find_map(|(name, value)| {
if name.as_str() == processor_name {
return extract(value);
}
None
})
.ok_or_else(|| anyhow!("no processor named '{processor_name} found in proxy config"))
}

let drop_config: DropConfig = match test_case.config_type.as_ref().expect("pgv") {
Config_type::Config(config) => Ok(config.clone()),
Config_type::DropProcessorName(processor_name) => extract_from_config(
"mutate_processor_name",
proxy_config,
processor_name,
|value| {
if let Some(Processor_type::Drop(drop)) = &value.processor_type {
return Some(match drop.config_source.as_ref().expect("pgv") {
Config_source::Inline(config) => config.clone(),
Config_source::FileSource(_) => {
// TODO(mattklein123): Support file source if needed.
return None;
},
});
}

None
},
),
}?;

let drop_config = TranslatedDropConfig::new(&drop_config, &Collector::default().scope("test"))?;

let mut num_metrics = 0;
for metric in test_case.metrics {
num_metrics += 1;

// TODO(mattklein123): Support parsing other formats. Probably a limited PromQL query of the
// metric?
let mut input = statsd::parse(
&metric.input.clone().into_bytes(),
StatsD::default_instance(),
)
.map_err(|e| anyhow!("unable to parse input '{}' as statsd: {e}", metric.input))?;
log::debug!("parsed input metric: {input}");
input.timestamp = 0;
let parsed_input = ParsedMetric::new(
input,
MetricSource::PromRemoteWrite,
Instant::now(),
DownstreamId::LocalOrigin,
);

let dropped_by = drop_config.drop_sample(&parsed_input).unwrap_or("");
if metric.dropped_by.as_str() != dropped_by {
bail!(
"expected metric '{}' to be dropped by '{}' but actually dropped by '{}'",
metric.input,
metric.dropped_by,
dropped_by
);
}
}

Ok(num_metrics)
}

pub fn run(config: &str, proxy_config: Option<&str>) -> anyhow::Result<()> {
let config: DropTesterConfig = yaml_to_proto(config)?;
let proxy_config: Option<Config> = proxy_config.map(yaml_to_proto).transpose()?;

let num_test_cases = config.test_cases.len();
let mut num_metrics = 0;
for test_case in config.test_cases {
num_metrics += run_test_case(test_case, proxy_config.as_ref())?;
}
log::info!("processed {num_test_cases} test case(s) and {num_metrics} test metrics(s)");

Ok(())
}
32 changes: 32 additions & 0 deletions pulse-drop-tester/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// pulse - bitdrift's observability proxy
// Copyright Bitdrift, Inc. All rights reserved.
//
// Use of this source code is governed by a source available license that can be found in the
// LICENSE file or at:
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt

use clap::Parser;
use pulse_drop_tester::run;

#[derive(Parser)]
struct Options {
#[arg(short = 'c', long = "config")]
pub config: String,

#[arg(long = "proxy-config")]
pub proxy_config: Option<String>,
}

fn main() -> anyhow::Result<()> {
let options = Options::parse();
log::info!("loading test config from: {}", options.config);
let config = std::fs::read_to_string(options.config)?;
let proxy_config = options
.proxy_config
.map(|proxy_config| {
log::info!("loading proxy config from: {proxy_config}");
std::fs::read_to_string(proxy_config)
})
.transpose()?;
run(&config, proxy_config.as_deref())
}
27 changes: 27 additions & 0 deletions pulse-drop-tester/src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// pulse - bitdrift's observability proxy
// Copyright Bitdrift, Inc. All rights reserved.
//
// Use of this source code is governed by a source available license that can be found in the
// LICENSE file or at:
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt

use crate::run;

#[test]
fn basic_case() {
let config = r"
test_cases:
- config:
rules:
- name: foo
conditions:
- metric_name:
exact: bar
metrics:
- input: bar:1|c
dropped_by: foo
- input: baz:1|g
";

run(config, None).unwrap();
}
16 changes: 11 additions & 5 deletions pulse-metrics/src/pipeline/processor/drop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ impl TranslatedDropRule {
// TranslatedDropConfig
//

struct TranslatedDropConfig {
pub struct TranslatedDropConfig {
rules: Vec<TranslatedDropRule>,
}

impl TranslatedDropConfig {
fn new(config: &DropConfig, scope: &Scope) -> anyhow::Result<Self> {
pub fn new(config: &DropConfig, scope: &Scope) -> anyhow::Result<Self> {
let rules = config
.rules
.iter()
Expand All @@ -201,8 +201,14 @@ impl TranslatedDropConfig {
Ok(Self { rules })
}

fn drop_sample(&self, sample: &ParsedMetric) -> bool {
self.rules.iter().any(|rule| rule.drop_sample(sample))
pub fn drop_sample(&self, sample: &ParsedMetric) -> Option<&str> {
self.rules.iter().find_map(|rule| {
if rule.drop_sample(sample) {
Some(rule.name.as_str())
} else {
None
}
})
}
}

Expand Down Expand Up @@ -279,7 +285,7 @@ impl PipelineProcessor for DropProcessor {
let config = self.current_config.read();
samples
.into_iter()
.filter(|sample| !config.drop_sample(sample))
.filter(|sample| config.drop_sample(sample).is_none())
.collect()
};
log::debug!("forwarding {} sample(s)", samples.len());
Expand Down
2 changes: 1 addition & 1 deletion pulse-metrics/src/pipeline/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod aggregation;
mod buffer;
mod cardinality_limiter;
mod cardinality_tracker;
mod drop;
pub mod drop;
pub mod elision;
mod internode;
mod mutate;
Expand Down
45 changes: 45 additions & 0 deletions pulse-protobuf/proto/pulse/drop_tester/v1/drop_tester.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// pulse - bitdrift's observability proxy
// Copyright Bitdrift, Inc. All rights reserved.
//
// Use of this source code is governed by a source available license that can be found in the
// LICENSE file or at:
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt

syntax = "proto3";
package pulse.drop_tester.v1;

import "pulse/config/processor/v1/drop.proto";
import "validate/validate.proto";

// A metric transform to test in a given test context.
message MetricDropTest {
// The input metric. Currently this string must be specified in statsd format. For example:
// foo:1|c|#foo:bar
string input = 1 [(validate.rules).string = {min_len: 1}];

// The expected rule name that drops the metric. If the metric is not meant to be dropped, this
// field should be left empty.
string dropped_by = 2;
}

// An individual test case, composed of a drop config and a number of test transforms to perform.
message DropTestCase {
oneof config_type {
option (validate.required) = true;

// The drop config to test against.
config.processor.v1.DropConfig config = 1 [(validate.rules).string = {min_len: 1}];

// The name of the drop processor in the supplied proxy config to load the program from.
string drop_processor_name = 2 [(validate.rules).string = {min_len: 1}];
}

// 1 or more metrics that will be tested against the above parameters.
repeated MetricDropTest metrics = 3 [(validate.rules).repeated .min_items = 1];
}

// Root configuration for a test run. Each test run is composed of 1 or more test cases.
message DropTesterConfig {
// The test cases in the test run.
repeated DropTestCase test_cases = 1 [(validate.rules).repeated .min_items = 1];
}
1 change: 1 addition & 0 deletions pulse-protobuf/src/protos/pulse/drop_tester/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod v1;
Loading
Loading