Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions pulse-metrics/src/pipeline/processor/drop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,50 @@ use std::sync::Arc;
// 1) If all name matches are exact, we can use an FST.
// 2) Otherwise we can use a RegexSet and perform a single match across all names.

//
// TranslatedStringMatchCondition
//

enum TranslatedStringMatchCondition {
Exact(String),
Regex(Regex),
}

impl TranslatedStringMatchCondition {
fn is_match(&self, value: &[u8]) -> bool {
match self {
Self::Exact(exact) => exact.as_bytes() == value,
Self::Regex(regex) => regex.is_match(value),
}
}
}

//
// TranslatedDropCondition
//

enum TranslatedDropCondition {
MetricName(Regex),
MetricName(TranslatedStringMatchCondition),
TagMatch {
name: String,
value_regex: Option<Regex>,
value_match: Option<TranslatedStringMatchCondition>,
},
ValueMatch(ValueMatch),
AndMatch(Vec<TranslatedDropCondition>),
NotMatch(Box<TranslatedDropCondition>),
}

impl TranslatedDropCondition {
fn string_match_to_regex(string_match: &StringMatch) -> anyhow::Result<Regex> {
// TODO(mattklein123): For simplicity we use regex for all matching currently.
fn string_match_to_regex(
string_match: &StringMatch,
) -> anyhow::Result<TranslatedStringMatchCondition> {
Ok(
match string_match.string_match_type.as_ref().expect("pgv") {
String_match_type::Exact(exact) => Regex::new(exact),
String_match_type::Regex(regex) => Regex::new(regex),
}?,
String_match_type::Exact(exact) => TranslatedStringMatchCondition::Exact(exact.to_string()),
String_match_type::Regex(regex) => {
TranslatedStringMatchCondition::Regex(Regex::new(regex)?)
},
},
)
}

Expand All @@ -67,7 +88,7 @@ impl TranslatedDropCondition {
},
Condition_type::TagMatch(tag_match) => Ok(Self::TagMatch {
name: tag_match.tag_name.to_string(),
value_regex: tag_match
value_match: tag_match
.tag_value
.as_ref()
.map(Self::string_match_to_regex)
Expand Down Expand Up @@ -108,15 +129,15 @@ impl TranslatedDropCondition {

fn drop_sample(&self, sample: &ParsedMetric) -> bool {
match self {
Self::MetricName(regex) => regex.is_match(sample.metric().get_id().name()),
Self::TagMatch { name, value_regex } => sample
Self::MetricName(value_match) => value_match.is_match(sample.metric().get_id().name()),
Self::TagMatch { name, value_match } => sample
.metric()
.get_id()
.tag(name.as_str())
.is_some_and(|tag_value| {
value_regex
value_match
.as_ref()
.is_none_or(|value_regex| value_regex.is_match(&tag_value.value))
.is_none_or(|value_match| value_match.is_match(&tag_value.value))
}),
Self::ValueMatch(value_match) => Self::is_value_match(sample, value_match),
Self::AndMatch(conditions) => conditions
Expand Down Expand Up @@ -162,6 +183,12 @@ impl TranslatedDropRule {
}

fn drop_sample(&self, sample: &ParsedMetric) -> bool {
log::trace!(
"checking drop rule {} mode {:?} for sample {}",
self.name,
self.mode,
sample.metric()
);
let drop = self
.conditions
.iter()
Expand Down
219 changes: 138 additions & 81 deletions pulse-metrics/src/pipeline/processor/drop/mod_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,126 @@ use prometheus::labels;
use pulse_protobuf::protos::pulse::config::processor::v1::drop;
use std::sync::Arc;

fn make_exact_match(name: &str) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::MetricName(StringMatch {
string_match_type: Some(String_match_type::Exact(name.to_string().into())),
..Default::default()
})),
..Default::default()
}
}

fn make_regex_match(name: &str) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::MetricName(StringMatch {
string_match_type: Some(String_match_type::Regex(name.to_string().into())),
..Default::default()
})),
..Default::default()
}
}

fn make_not_match(condition: DropCondition) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::NotMatch(Box::new(condition))),
..Default::default()
}
}

fn make_tag_match(name: &str) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::TagMatch(TagMatch {
tag_name: name.to_string().into(),
..Default::default()
})),
..Default::default()
}
}

fn make_tag_value_exact_match(tag_name: &str, tag_value: &str) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::TagMatch(TagMatch {
tag_name: tag_name.to_string().into(),
tag_value: Some(StringMatch {
string_match_type: Some(String_match_type::Exact(tag_value.to_string().into())),
..Default::default()
})
.into(),
..Default::default()
})),
..Default::default()
}
}

fn make_tag_value_regex_match(tag_name: &str, tag_value: &str) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::TagMatch(TagMatch {
tag_name: tag_name.to_string().into(),
tag_value: Some(StringMatch {
string_match_type: Some(String_match_type::Regex(tag_value.to_string().into())),
..Default::default()
})
.into(),
..Default::default()
})),
..Default::default()
}
}

fn make_and_match(conditions: Vec<DropCondition>) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::AndMatch(AndMatch {
conditions,
..Default::default()
})),
..Default::default()
}
}

#[tokio::test]
async fn regex_vs_exact() {
let (mut helper, context) = processor_factory_context_for_test();
let processor = Arc::new(
DropProcessor::new(
DropProcessorConfig {
config_source: Some(Config_source::Inline(DropConfig {
rules: vec![DropRule {
name: "rule1".into(),
mode: DropMode::ENABLED.into(),
conditions: vec![make_exact_match("exact_name")],
..Default::default()
}],
..Default::default()
})),
..Default::default()
},
context,
)
.await
.unwrap(),
);

make_mut(&mut helper.dispatcher)
.expect_send()
.times(1)
.returning(|metrics| {
assert_eq!(metrics, vec![make_metric("exact_name_with_more", &[], 0),]);
});
processor
.clone()
.recv_samples(vec![
make_metric("exact_name", &[], 0),
make_metric("exact_name_with_more", &[], 0),
])
.await;
helper.stats_helper.assert_counter_eq(
1,
"processor:dropped",
&labels! { "rule_name" => "rule1", "mode" => "enabled" },
);
}

#[tokio::test]
async fn not() {
let (mut helper, context) = processor_factory_context_for_test();
Expand All @@ -38,16 +158,7 @@ async fn not() {
rules: vec![DropRule {
name: "rule1".into(),
mode: DropMode::ENABLED.into(),
conditions: vec![DropCondition {
condition_type: Some(Condition_type::NotMatch(Box::new(DropCondition {
condition_type: Some(Condition_type::MetricName(StringMatch {
string_match_type: Some(String_match_type::Exact("exact_name".into())),
..Default::default()
})),
..Default::default()
}))),
..Default::default()
}],
conditions: vec![make_not_match(make_exact_match("exact_name"))],
..Default::default()
}],
..Default::default()
Expand Down Expand Up @@ -92,85 +203,31 @@ async fn all() {
name: "rule1".into(),
mode: DropMode::ENABLED.into(),
conditions: vec![
DropCondition {
condition_type: Some(Condition_type::MetricName(StringMatch {
string_match_type: Some(String_match_type::Exact("exact_name".into())),
..Default::default()
})),
..Default::default()
},
DropCondition {
condition_type: Some(Condition_type::MetricName(StringMatch {
string_match_type: Some(String_match_type::Regex("regex_name.*".into())),
..Default::default()
})),
..Default::default()
},
DropCondition {
condition_type: Some(Condition_type::TagMatch(TagMatch {
tag_name: "tag_present".into(),
..Default::default()
})),
..Default::default()
},
DropCondition {
condition_type: Some(Condition_type::TagMatch(TagMatch {
tag_name: "tag_exact".into(),
tag_value: Some(StringMatch {
string_match_type: Some(String_match_type::Exact("exact_value".into())),
..Default::default()
})
.into(),
..Default::default()
})),
..Default::default()
},
DropCondition {
condition_type: Some(Condition_type::TagMatch(TagMatch {
tag_name: "tag_regex".into(),
tag_value: Some(StringMatch {
string_match_type: Some(String_match_type::Exact("regex_value.*".into())),
..Default::default()
})
.into(),
..Default::default()
})),
..Default::default()
},
make_exact_match("exact_name"),
make_regex_match("regex_name.*"),
make_tag_match("tag_present"),
make_tag_value_exact_match("tag_exact", "exact_value"),
make_tag_value_regex_match("tag_regex", "regex_value.*"),
],
..Default::default()
},
DropRule {
name: "rule2".into(),
mode: DropMode::TESTING.into(),
conditions: vec![DropCondition {
condition_type: Some(Condition_type::AndMatch(AndMatch {
conditions: vec![
DropCondition {
condition_type: Some(Condition_type::MetricName(StringMatch {
string_match_type: Some(String_match_type::Exact(
"value_match_name".into(),
)),
..Default::default()
})),
..Default::default()
},
DropCondition {
condition_type: Some(Condition_type::ValueMatch(ValueMatch {
value_match_type: Some(Value_match_type::SimpleValue(SimpleValueMatch {
target: 100.0,
operator: ValueMatchOperator::NOT_EQUAL.into(),
..Default::default()
})),
..Default::default()
})),
conditions: vec![make_and_match(vec![
make_exact_match("value_match_name"),
DropCondition {
condition_type: Some(Condition_type::ValueMatch(ValueMatch {
value_match_type: Some(Value_match_type::SimpleValue(SimpleValueMatch {
target: 100.0,
operator: ValueMatchOperator::NOT_EQUAL.into(),
..Default::default()
},
],
})),
..Default::default()
})),
..Default::default()
})),
..Default::default()
}],
},
])],
..Default::default()
},
],
Expand Down
Loading