-
Notifications
You must be signed in to change notification settings - Fork 3
Description
Add graceful shutdown for metrics collection task.
The collect_tokio_metrics function runs an infinite loop without any mechanism to observe the shutdown signal. When the application receives SIGINT/SIGTERM, this task will be abruptly terminated rather than shutting down cleanly, potentially losing in-flight metrics or leaving resources unclean.
Thread the graceful_rx signal through and break the loop when shutdown is signaled:
-async fn collect_tokio_metrics(monitor: RuntimeMonitor) {
+async fn collect_tokio_metrics(monitor: RuntimeMonitor, mut graceful_rx: watch::Receiver<()>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut intervals = monitor.intervals();
loop {
- interval.tick().await;
- if let Some(metrics) = intervals.next() {
- metrics::gauge!("tokio.runtime.workers_count").set(metrics.workers_count as f64);
- metrics::counter!("tokio.runtime.park_total").absolute(metrics.total_park_count);
- metrics::gauge!("tokio.runtime.park_max").set(metrics.max_park_count as f64);
- metrics::gauge!("tokio.runtime.park_min").set(metrics.min_park_count as f64);
- metrics::histogram!("tokio.runtime.busy_duration_ns")
- .record(metrics.total_busy_duration.as_nanos() as f64);
+ tokio::select! {
+ _ = interval.tick() => {
+ if let Some(metrics) = intervals.next() {
+ metrics::gauge!("tokio.runtime.workers_count").set(metrics.workers_count as f64);
+ metrics::counter!("tokio.runtime.park_total").absolute(metrics.total_park_count);
+ metrics::gauge!("tokio.runtime.park_max").set(metrics.max_park_count as f64);
+ metrics::gauge!("tokio.runtime.park_min").set(metrics.min_park_count as f64);
+ metrics::histogram!("tokio.runtime.busy_duration_ns")
+ .record(metrics.total_busy_duration.as_nanos() as f64);
+ }
+ }
+ _ = graceful_rx.changed() => {
+ info!("Shutting down Tokio metrics collection");
+ break;
+ }
}
}
}Then update the call site at line 79:
- let tokio_metrics_handle = tokio::spawn(collect_tokio_metrics(runtime_monitor));
+ let tokio_metrics_handle = tokio::spawn(collect_tokio_metrics(runtime_monitor, graceful_rx.clone()));📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async fn collect_tokio_metrics(monitor: RuntimeMonitor, mut graceful_rx: watch::Receiver<()>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut intervals = monitor.intervals();
loop {
tokio::select! {
_ = interval.tick() => {
if let Some(metrics) = intervals.next() {
metrics::gauge!("tokio.runtime.workers_count").set(metrics.workers_count as f64);
metrics::counter!("tokio.runtime.park_total").absolute(metrics.total_park_count);
metrics::gauge!("tokio.runtime.park_max").set(metrics.max_park_count as f64);
metrics::gauge!("tokio.runtime.park_min").set(metrics.min_park_count as f64);
metrics::histogram!("tokio.runtime.busy_duration_ns")
.record(metrics.total_busy_duration.as_nanos() as f64);
}
}
_ = graceful_rx.changed() => {
info!("Shutting down Tokio metrics collection");
break;
}
}
}
}
let tokio_metrics_handle = tokio::spawn(collect_tokio_metrics(runtime_monitor, graceful_rx.clone()));
Originally posted by @coderabbitai[bot] in #149 (comment)