diff --git a/rust-arroyo/src/processing/strategies/run_task_in_threads.rs b/rust-arroyo/src/processing/strategies/run_task_in_threads.rs index 5de3e763..d48a959b 100644 --- a/rust-arroyo/src/processing/strategies/run_task_in_threads.rs +++ b/rust-arroyo/src/processing/strategies/run_task_in_threads.rs @@ -1,18 +1,18 @@ use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::runtime::{Handle, Runtime}; use tokio::task::JoinHandle; -use crate::gauge; use crate::processing::strategies::{ merge_commit_request, CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy, SubmitError, }; use crate::types::Message; use crate::utils::timing::Deadline; +use crate::{gauge, timer}; use super::StrategyError; @@ -218,6 +218,7 @@ where fn join(&mut self, timeout: Option) -> Result, StrategyError> { let deadline = timeout.map(Deadline::new); + let start = Instant::now(); // Poll until there are no more messages or timeout is hit while self.message_carried_over.is_some() || !self.handles.is_empty() { @@ -240,6 +241,12 @@ where } self.handles.clear(); + timer!( + "arroyo.strategies.run_task_in_threads.join_time", + start.elapsed(), + "strategy_name" => self.metric_strategy_name + ); + let next_commit = self.next_step.join(deadline.map(|d| d.remaining()))?; Ok(merge_commit_request(