From 449eaecd8d5b14911609a38bbe09570ee50654f2 Mon Sep 17 00:00:00 2001 From: ianisimov Date: Thu, 30 Apr 2026 16:20:17 -0700 Subject: [PATCH 1/2] fix: health should remove prometheus metrics after collectors removal Signed-off-by: ianisimov --- crates/health/src/collectors/firmware.rs | 4 ++ crates/health/src/collectors/logs/periodic.rs | 6 +++ crates/health/src/collectors/nmxt.rs | 4 ++ .../src/collectors/nvue/rest/collector.rs | 4 ++ crates/health/src/collectors/runtime.rs | 5 ++ crates/health/src/collectors/sensors.rs | 4 ++ crates/health/src/metrics.rs | 14 +++++ crates/health/src/otlp/convert.rs | 3 +- crates/health/src/processor/health_report.rs | 1 + crates/health/src/sink/events.rs | 1 + crates/health/src/sink/mod.rs | 52 +++++++++++++++++++ crates/health/src/sink/otlp.rs | 1 + crates/health/src/sink/prometheus.rs | 20 +++++++ crates/health/src/sink/tracing.rs | 7 +++ 14 files changed, 125 insertions(+), 1 deletion(-) diff --git a/crates/health/src/collectors/firmware.rs b/crates/health/src/collectors/firmware.rs index 13a8962dde..9ce8283a79 100644 --- a/crates/health/src/collectors/firmware.rs +++ b/crates/health/src/collectors/firmware.rs @@ -59,6 +59,10 @@ impl PeriodicCollector for FirmwareCollector { fn collector_type(&self) -> &'static str { "firmware_collector" } + + async fn stop(&mut self) { + self.emit_event(CollectorEvent::CollectorRemoved); + } } impl FirmwareCollector { diff --git a/crates/health/src/collectors/logs/periodic.rs b/crates/health/src/collectors/logs/periodic.rs index ccce7070eb..6dbcf6f543 100644 --- a/crates/health/src/collectors/logs/periodic.rs +++ b/crates/health/src/collectors/logs/periodic.rs @@ -92,6 +92,12 @@ impl PeriodicCollector for LogsCollector { fn collector_type(&self) -> &'static str { "logs_collector" } + + async fn stop(&mut self) { + if let Some(data_sink) = &self.data_sink { + data_sink.handle_event(&self.event_context, &CollectorEvent::CollectorRemoved); + } + } } impl LogsCollector { diff --git a/crates/health/src/collectors/nmxt.rs b/crates/health/src/collectors/nmxt.rs index 5ce4181ecb..28e350ff93 100644 --- a/crates/health/src/collectors/nmxt.rs +++ b/crates/health/src/collectors/nmxt.rs @@ -189,6 +189,10 @@ impl PeriodicCollector for NmxtCollector { fn collector_type(&self) -> &'static str { "nmxt" } + + async fn stop(&mut self) { + self.emit_event(CollectorEvent::CollectorRemoved); + } } impl NmxtCollector { diff --git a/crates/health/src/collectors/nvue/rest/collector.rs b/crates/health/src/collectors/nvue/rest/collector.rs index efa63b9690..3f76808179 100644 --- a/crates/health/src/collectors/nvue/rest/collector.rs +++ b/crates/health/src/collectors/nvue/rest/collector.rs @@ -248,6 +248,10 @@ impl PeriodicCollector for NvueRestCollector { fn collector_type(&self) -> &'static str { COLLECTOR_NAME } + + async fn stop(&mut self) { + self.emit_event(CollectorEvent::CollectorRemoved); + } } impl NvueRestCollector { diff --git a/crates/health/src/collectors/runtime.rs b/crates/health/src/collectors/runtime.rs index db91d34298..f49fc7e68b 100644 --- a/crates/health/src/collectors/runtime.rs +++ b/crates/health/src/collectors/runtime.rs @@ -73,6 +73,10 @@ pub trait PeriodicCollector: Send + 'static { /// Returns the type identifier for this collector fn collector_type(&self) -> &'static str; + + fn stop(&mut self) -> impl std::future::Future + Send { + async {} + } } pub type EventStream<'a> = BoxStream<'a, Result>; @@ -389,6 +393,7 @@ impl Collector { tokio::select! { _ = cancel_token_clone.cancelled() => { tracing::info!(endpoint = ?endpoint.addr, "collector cancelled"); + runner.stop().await; break; } _ = async { diff --git a/crates/health/src/collectors/sensors.rs b/crates/health/src/collectors/sensors.rs index 4ca67cab40..746a751360 100644 --- a/crates/health/src/collectors/sensors.rs +++ b/crates/health/src/collectors/sensors.rs @@ -83,6 +83,10 @@ impl PeriodicCollector for SensorCollector { fn collector_type(&self) -> &'static str { "sensor_collector" } + + async fn stop(&mut self) { + self.emit_event(CollectorEvent::CollectorRemoved); + } } /// Monitored entity with its associated sensors diff --git a/crates/health/src/metrics.rs b/crates/health/src/metrics.rs index 575c61f524..8e97552d9d 100644 --- a/crates/health/src/metrics.rs +++ b/crates/health/src/metrics.rs @@ -200,6 +200,16 @@ impl CollectorRegistry { Ok(metrics) } + pub fn unregister_gauge_metrics( + &self, + metrics: &GaugeMetrics, + ) -> Result<(), prometheus::Error> { + self.registry + .registry + .unregister(Box::new(metrics.clone())) + .map(|_| ()) + } + pub fn registry(&self) -> &Registry { &self.registry.registry } @@ -354,6 +364,10 @@ impl GaugeMetrics { let current_gen = self.current_generation.load(Ordering::Acquire); self.gauges.retain(|_, data| data.generation == current_gen); } + + pub fn clear(&self) { + self.gauges.clear(); + } } impl Collector for GaugeMetrics { diff --git a/crates/health/src/otlp/convert.rs b/crates/health/src/otlp/convert.rs index 20162ba153..4c630f445d 100644 --- a/crates/health/src/otlp/convert.rs +++ b/crates/health/src/otlp/convert.rs @@ -118,7 +118,8 @@ fn convert_event(event: &CollectorEvent, observed_nanos: u64) -> Option None, + | CollectorEvent::MetricCollectionEnd + | CollectorEvent::CollectorRemoved => None, } } diff --git a/crates/health/src/processor/health_report.rs b/crates/health/src/processor/health_report.rs index c430702047..0e464444f0 100644 --- a/crates/health/src/processor/health_report.rs +++ b/crates/health/src/processor/health_report.rs @@ -240,6 +240,7 @@ impl EventProcessor for HealthReportProcessor { return vec![CollectorEvent::HealthReport(Arc::new(report))]; } CollectorEvent::Log(_) + | CollectorEvent::CollectorRemoved | CollectorEvent::Firmware(_) | CollectorEvent::HealthReport(_) => {} } diff --git a/crates/health/src/sink/events.rs b/crates/health/src/sink/events.rs index 42bb1df987..bd026c02b4 100644 --- a/crates/health/src/sink/events.rs +++ b/crates/health/src/sink/events.rs @@ -137,6 +137,7 @@ pub enum CollectorEvent { MetricCollectionStart, Metric(Box), MetricCollectionEnd, + CollectorRemoved, Log(Box), Firmware(FirmwareInfo), HealthReport(Arc), diff --git a/crates/health/src/sink/mod.rs b/crates/health/src/sink/mod.rs index df1feb5d02..997ebae8ac 100644 --- a/crates/health/src/sink/mod.rs +++ b/crates/health/src/sink/mod.rs @@ -196,6 +196,58 @@ mod tests { assert!(export_after_metric.contains("test_sink_hw_sensor_temperature_celsius")); } + #[tokio::test] + async fn test_prometheus_sink_removes_collector_metrics() { + let metrics_manager = + Arc::new(MetricsManager::new("test").expect("should create metrics manager")); + let sink = PrometheusSink::new(metrics_manager.clone(), "test_sink") + .expect("sink should initialize"); + + let context = EventContext { + endpoint_key: "42:9e:b1:bd:9d:dd".to_string(), + addr: BmcAddr { + ip: "10.0.0.1".parse().expect("valid ip"), + port: Some(443), + mac: MacAddress::from_str("42:9e:b1:bd:9d:dd").unwrap(), + }, + collector_type: "sensor_collector", + metadata: Some(EndpointMetadata::Machine(MachineData { + machine_id: "fm100htjtiaehv1n5vh67tbmqq4eabcjdng40f7jupsadbedhruh6rag1l0" + .parse() + .expect("valid machine id"), + machine_serial: None, + })), + rack_id: None, + }; + + let metric_event = CollectorEvent::Metric( + SensorHealthData { + key: "metric_key".to_string(), + name: "hw_sensor".to_string(), + metric_type: "temperature".to_string(), + unit: "celsius".to_string(), + value: 42.0, + labels: vec![(Cow::Borrowed("sensor"), "temp1".to_string())], + context: None, + } + .into(), + ); + + sink.handle_event(&context, &metric_event); + let export_before_remove = metrics_manager + .export_all() + .expect("metrics export should work"); + assert!(export_before_remove.contains("test_sink_hw_sensor_temperature_celsius")); + + sink.handle_event(&context, &CollectorEvent::CollectorRemoved); + + let export_after_remove = metrics_manager + .export_all() + .expect("metrics export should work"); + assert!(!export_after_remove.contains("test_sink_hw_sensor_temperature_celsius")); + assert!(!export_after_remove.contains("endpoint_key=\"42:9e:b1:bd:9d:dd\"")); + } + #[tokio::test] async fn test_prometheus_sink_sweeps_stale_metrics_per_collection_window() { let metrics_manager = diff --git a/crates/health/src/sink/otlp.rs b/crates/health/src/sink/otlp.rs index 9f72f25c25..be183376df 100644 --- a/crates/health/src/sink/otlp.rs +++ b/crates/health/src/sink/otlp.rs @@ -49,6 +49,7 @@ pub(crate) fn is_otlp_relevant(event: &CollectorEvent) -> bool { CollectorEvent::Metric(_) | CollectorEvent::MetricCollectionStart | CollectorEvent::MetricCollectionEnd + | CollectorEvent::CollectorRemoved ) } diff --git a/crates/health/src/sink/prometheus.rs b/crates/health/src/sink/prometheus.rs index 42d1e5ebeb..fd368d7449 100644 --- a/crates/health/src/sink/prometheus.rs +++ b/crates/health/src/sink/prometheus.rs @@ -133,6 +133,25 @@ impl PrometheusSink { } } } + + fn remove_collector_metrics(&self, context: &EventContext) { + let Some(endpoint_metrics) = self.stream_metrics.get::(context.endpoint_key()) else { + return; + }; + let Some((_, metrics)) = endpoint_metrics.remove(context.collector_type) else { + return; + }; + + metrics.clear(); + if let Err(error) = self.collector_registry.unregister_gauge_metrics(&metrics) { + tracing::warn!( + ?error, + endpoint_key = context.endpoint_key(), + collector = context.collector_type, + "Failed to unregister Prometheus stream metrics" + ); + } + } } impl DataSink for PrometheusSink { @@ -187,6 +206,7 @@ impl DataSink for PrometheusSink { entry.value().sweep_stale(); } } + CollectorEvent::CollectorRemoved => self.remove_collector_metrics(context), CollectorEvent::Log(_) | CollectorEvent::Firmware(_) | CollectorEvent::HealthReport(_) => {} diff --git a/crates/health/src/sink/tracing.rs b/crates/health/src/sink/tracing.rs index 12f12d9bf0..a27ae88940 100644 --- a/crates/health/src/sink/tracing.rs +++ b/crates/health/src/sink/tracing.rs @@ -52,6 +52,13 @@ impl DataSink for TracingSink { "Metric collection end" ); } + CollectorEvent::CollectorRemoved => { + tracing::info!( + endpoint = %context.endpoint_key(), + collector = %context.collector_type, + "Collector removed" + ); + } CollectorEvent::Log(record) => { tracing::info!( endpoint = %context.endpoint_key(), From 12e2d7b1acc6440669470f4e9321fec1f8fdff95 Mon Sep 17 00:00:00 2001 From: ianisimov Date: Thu, 30 Apr 2026 16:41:53 -0700 Subject: [PATCH 2/2] fix: health should remove prometheus metrics after collectors removal Signed-off-by: ianisimov --- crates/health/src/processor/health_report.rs | 18 ++++++++++++- crates/health/src/processor/rack_leak.rs | 27 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/crates/health/src/processor/health_report.rs b/crates/health/src/processor/health_report.rs index 0e464444f0..15eee9b98c 100644 --- a/crates/health/src/processor/health_report.rs +++ b/crates/health/src/processor/health_report.rs @@ -239,8 +239,10 @@ impl EventProcessor for HealthReportProcessor { return vec![CollectorEvent::HealthReport(Arc::new(report))]; } + CollectorEvent::CollectorRemoved => { + self.windows.remove(&Self::stream_key(context)); + } CollectorEvent::Log(_) - | CollectorEvent::CollectorRemoved | CollectorEvent::Firmware(_) | CollectorEvent::HealthReport(_) => {} } @@ -322,4 +324,18 @@ mod tests { assert!(report.successes.is_empty()); assert_eq!(report.alerts.len(), 1); } + + #[test] + fn collector_removed_clears_metric_window() { + let processor = HealthReportProcessor::new(); + let context = test_context(); + + let _ = processor.process_event(&context, &CollectorEvent::MetricCollectionStart); + assert_eq!(processor.windows.len(), 1); + + let emitted = processor.process_event(&context, &CollectorEvent::CollectorRemoved); + + assert!(emitted.is_empty()); + assert!(processor.windows.is_empty()); + } } diff --git a/crates/health/src/processor/rack_leak.rs b/crates/health/src/processor/rack_leak.rs index 01aa35f880..505ee0f326 100644 --- a/crates/health/src/processor/rack_leak.rs +++ b/crates/health/src/processor/rack_leak.rs @@ -84,6 +84,13 @@ impl EventProcessor for RackLeakProcessor { return Vec::new(); }; + if matches!(event, CollectorEvent::CollectorRemoved) { + if let Some(mut entry) = self.racks.get_mut(rack_id) { + entry.leaking_trays.remove(context.endpoint_key()); + } + return Vec::new(); + } + let CollectorEvent::HealthReport(report) = event else { return Vec::new(); }; @@ -277,6 +284,26 @@ mod tests { assert_eq!(report.alerts.len(), 1, "rack should still be in alert"); } + #[test] + fn removed_tray_is_no_longer_counted() { + let processor = RackLeakProcessor::new(2); + + let ctx_a = context_with_rack("42:9e:b1:bd:9d:dd", "rack-1"); + let ctx_b = context_with_rack("42:9e:b1:bd:9d:ee", "rack-1"); + + processor.process_event(&ctx_a, &tray_leak_report(true)); + processor.process_event(&ctx_b, &tray_leak_report(true)); + + let emitted = processor.process_event(&ctx_a, &CollectorEvent::CollectorRemoved); + + assert!(emitted.is_empty()); + let Some(rack) = processor.racks.get(ctx_a.rack_id().expect("rack id")) else { + panic!("expected rack state"); + }; + assert_eq!(rack.leaking_trays.len(), 1); + assert!(rack.leaking_trays.contains(ctx_b.endpoint_key())); + } + #[test] fn separate_racks_are_independent() { let processor = RackLeakProcessor::new(2);