From 395c4fbe46a6606efdbfca91fd18f6624fc0e8bf Mon Sep 17 00:00:00 2001 From: Shamir Khodzha Date: Wed, 16 Dec 2020 01:25:27 +0300 Subject: [PATCH] running request timings metrics --- src/app/message_handler.rs | 75 ++++++++++++++++++- src/app/metrics/aggregator.rs | 28 ++++++- src/app/metrics/dynamic_stats_collector.rs | 85 ++++++++++++++++++++++ src/app/metrics/metric.rs | 41 +++++++++++ src/app/metrics/mod.rs | 1 + src/app/mod.rs | 17 ++++- 6 files changed, 241 insertions(+), 6 deletions(-) diff --git a/src/app/message_handler.rs b/src/app/message_handler.rs index 2f0936b5..59e4163d 100644 --- a/src/app/message_handler.rs +++ b/src/app/message_handler.rs @@ -1,5 +1,6 @@ use std::future::Future; use std::pin::Pin; +use std::time::Instant; use async_std::prelude::*; use async_std::stream::{self, Stream}; @@ -22,16 +23,20 @@ use crate::app::{endpoint, API_VERSION}; pub(crate) type MessageStream = Box> + Send + Unpin>; +type TimingChannel = crossbeam_channel::Sender<(std::time::Duration, String)>; + pub(crate) struct MessageHandler { agent: Agent, global_context: C, + tx: TimingChannel, } impl MessageHandler { - pub(crate) fn new(agent: Agent, global_context: C) -> Self { + pub(crate) fn new(agent: Agent, global_context: C, tx: TimingChannel) -> Self { Self { agent, global_context, + tx, } } @@ -86,10 +91,38 @@ impl MessageHandler { message: &IncomingMessage, topic: &str, ) -> Result<(), AppError> { + let mut timer = if msg_context.dynamic_stats().is_some() { + Some(MessageHandlerTiming::new(self.tx.clone())) + } else { + None + }; + match message { - IncomingMessage::Request(req) => self.handle_request(msg_context, req, topic).await, - IncomingMessage::Response(resp) => self.handle_response(msg_context, resp, topic).await, - IncomingMessage::Event(event) => self.handle_event(msg_context, event, topic).await, + IncomingMessage::Request(req) => { + if let Some(ref mut timer) = timer { + timer.set_method(req.properties().method().into()); + } + self.handle_request(msg_context, req, topic).await + } + IncomingMessage::Response(resp) => { + if let Some(ref mut timer) = timer { + timer.set_method("response".into()); + } + + self.handle_response(msg_context, resp, topic).await + } + IncomingMessage::Event(event) => { + if let Some(ref mut timer) = timer { + let label = match event.properties().label() { + Some(label) => format!("event-{}", label), + None => "event-none".into(), + }; + + timer.set_method(label); + } + + self.handle_event(msg_context, event, topic).await + } } } @@ -422,3 +455,37 @@ impl<'async_trait, H: 'async_trait + endpoint::EventHandler> EventEnvelopeHandle Box::pin(handle_envelope::(context, event)) } } + +struct MessageHandlerTiming { + start: Instant, + sender: TimingChannel, + method: String, +} + +impl MessageHandlerTiming { + fn new(sender: TimingChannel) -> Self { + Self { + start: Instant::now(), + method: "none".into(), + sender, + } + } + + fn set_method(&mut self, method: String) { + self.method = method; + } +} + +impl Drop for MessageHandlerTiming { + fn drop(&mut self) { + if let Err(e) = self + .sender + .try_send((self.start.elapsed(), self.method.clone())) + { + warn!( + crate::LOG, + "Failed to send msg handler future timing, reason = {:?}", e + ); + } + } +} diff --git a/src/app/metrics/aggregator.rs b/src/app/metrics/aggregator.rs index ad801bd7..707b7372 100644 --- a/src/app/metrics/aggregator.rs +++ b/src/app/metrics/aggregator.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc}; use svc_agent::AgentId; use crate::app::context::GlobalContext; -use crate::app::metrics::{Metric, MetricKey, Tags}; +use crate::app::metrics::{Metric, MetricKey, PercentileReport, Tags}; pub(crate) struct Aggregator<'a, C: GlobalContext> { context: &'a C, @@ -188,6 +188,32 @@ fn append_dynamic_stats( tags.clone(), )); } + + for (method, PercentileReport { p95, p99, max }) in dynamic_stats.get_handler_timings()? { + let tags = + Tags::build_running_futures_tags(crate::APP_VERSION, context.agent_id(), method); + + metrics.push(Metric::new( + MetricKey::RunningRequestDurationP95, + p95, + now, + tags.clone(), + )); + + metrics.push(Metric::new( + MetricKey::RunningRequestDurationP99, + p99, + now, + tags.clone(), + )); + + metrics.push(Metric::new( + MetricKey::RunningRequestDurationMax, + max, + now, + tags.clone(), + )); + } } Ok(()) diff --git a/src/app/metrics/dynamic_stats_collector.rs b/src/app/metrics/dynamic_stats_collector.rs index 1daffc0d..3ae48546 100644 --- a/src/app/metrics/dynamic_stats_collector.rs +++ b/src/app/metrics/dynamic_stats_collector.rs @@ -1,5 +1,7 @@ use std::collections::BTreeMap; +use std::convert::TryFrom; use std::thread; +use std::time::Duration; use anyhow::{Context, Result}; use svc_agent::AgentId; @@ -17,15 +19,30 @@ enum Message { GetJanusTimeouts { tx: crossbeam_channel::Sender>, }, + HandlerTiming { + duration: Duration, + method: String, + }, + GetHandlerTimings { + tx: crossbeam_channel::Sender>, + }, } +#[derive(Clone)] pub(crate) struct DynamicStatsCollector { tx: crossbeam_channel::Sender, } +pub(crate) struct PercentileReport { + pub p95: u64, + pub p99: u64, + pub max: u64, +} + struct State { data: BTreeMap, janus_timeouts: BTreeMap, + futures_timings: BTreeMap>, } impl DynamicStatsCollector { @@ -36,6 +53,7 @@ impl DynamicStatsCollector { let mut state = State { data: BTreeMap::new(), janus_timeouts: BTreeMap::new(), + futures_timings: BTreeMap::new(), }; for message in rx { @@ -82,6 +100,53 @@ impl DynamicStatsCollector { ); } } + Message::HandlerTiming { duration, method } => { + let vec = state.futures_timings.entry(method).or_default(); + let micros = match u64::try_from(duration.as_micros()) { + Ok(micros) => micros, + Err(_) => u64::MAX, + }; + + vec.push(micros); + } + Message::GetHandlerTimings { tx } => { + let vec = state + .futures_timings + .into_iter() + .map(|(method, mut values)| { + values.sort_unstable(); + + let count = values.len(); + let p95_idx = (count as f32 * 0.95) as usize; + let p99_idx = (count as f32 * 0.99) as usize; + let max_idx = count - 1; + let max = values[max_idx]; + + let p95 = if p95_idx < max_idx { + (values[p95_idx] + max) / 2 + } else { + max + }; + + let p99 = if p99_idx < max_idx { + (values[p99_idx] + max) / 2 + } else { + max + }; + + (method, PercentileReport { p95, p99, max }) + }) + .collect::>(); + + if let Err(err) = tx.send(vec) { + warn!( + crate::LOG, + "Failed to send dynamic stats collector report: {}", err, + ); + } + + state.futures_timings = BTreeMap::new(); + } } } }); @@ -133,6 +198,26 @@ impl DynamicStatsCollector { rx.recv() .context("Failed to receive dynamic stats collector report") } + + pub(crate) fn record_future_time(&self, duration: Duration, method: String) { + if let Err(err) = self.tx.send(Message::HandlerTiming { duration, method }) { + warn!( + crate::LOG, + "Failed to register dynamic stats collector value: {}", err + ); + } + } + + pub(crate) fn get_handler_timings(&self) -> Result> { + let (tx, rx) = crossbeam_channel::bounded(1); + + self.tx + .send(Message::GetHandlerTimings { tx }) + .context("Failed to send GetHandlerTimings message to the dynamic stats collector")?; + + rx.recv() + .context("Failed to receive dynamic stats collector report") + } } impl Drop for DynamicStatsCollector { diff --git a/src/app/metrics/metric.rs b/src/app/metrics/metric.rs index 8649fbb1..e5066cf1 100644 --- a/src/app/metrics/metric.rs +++ b/src/app/metrics/metric.rs @@ -76,6 +76,13 @@ pub enum Tags { account_audience: String, backend_label: String, }, + RunningFuture { + version: String, + agent_label: String, + account_label: String, + account_audience: String, + method: String, + }, Empty, } @@ -108,6 +115,16 @@ impl Tags { backend_label: janus_id.label().to_owned(), } } + + pub fn build_running_futures_tags(version: &str, agent_id: &AgentId, method: String) -> Self { + Tags::RunningFuture { + version: version.to_owned(), + agent_label: agent_id.label().to_owned(), + account_label: agent_id.as_account_id().label().to_owned(), + account_audience: agent_id.as_account_id().audience().to_owned(), + method, + } + } } #[derive(Serialize, Clone, Debug)] @@ -149,6 +166,12 @@ pub(crate) enum MetricKey { RunningRequests, #[serde(rename(serialize = "apps.conference.janus_timeouts_total"))] JanusTimeoutsTotal, + #[serde(rename(serialize = "apps.conference.running_request_p95_microseconds"))] + RunningRequestDurationP95, + #[serde(rename(serialize = "apps.conference.running_request_p99_microseconds"))] + RunningRequestDurationP99, + #[serde(rename(serialize = "apps.conference.running_request_max_microseconds"))] + RunningRequestDurationMax, } #[derive(Serialize, Clone, Debug)] @@ -190,6 +213,12 @@ pub(crate) enum MetricKey2 { RunningRequests, #[serde(rename(serialize = "janus_timeouts_total"))] JanusTimeoutsTotal, + #[serde(rename(serialize = "running_request_p95_microseconds"))] + RunningRequestDurationP95, + #[serde(rename(serialize = "running_request_p99_microseconds"))] + RunningRequestDurationP99, + #[serde(rename(serialize = "running_request_max_microseconds"))] + RunningRequestDurationMax, } impl From for MetricKey2 { @@ -213,6 +242,9 @@ impl From for MetricKey2 { MetricKey::JanusBackendAgentLoad => MetricKey2::JanusBackendAgentLoad, MetricKey::RunningRequests => MetricKey2::RunningRequests, MetricKey::JanusTimeoutsTotal => MetricKey2::JanusTimeoutsTotal, + MetricKey::RunningRequestDurationP95 => MetricKey2::RunningRequestDurationP95, + MetricKey::RunningRequestDurationP99 => MetricKey2::RunningRequestDurationP99, + MetricKey::RunningRequestDurationMax => MetricKey2::RunningRequestDurationMax, } } } @@ -238,6 +270,15 @@ impl std::fmt::Display for MetricKey2 { MetricKey2::Dynamic(key) => write!(f, "{}_total", key), MetricKey2::RunningRequests => write!(f, "running_requests_total"), MetricKey2::JanusTimeoutsTotal => write!(f, "janus_backend_agent_load_total"), + MetricKey2::RunningRequestDurationP95 => { + write!(f, "running_request_duration_p95_microseconds") + } + MetricKey2::RunningRequestDurationP99 => { + write!(f, "running_request_duration_p99_microseconds") + } + MetricKey2::RunningRequestDurationMax => { + write!(f, "running_request_duration_max_microseconds") + } } } } diff --git a/src/app/metrics/mod.rs b/src/app/metrics/mod.rs index 9cfb0d90..856d183c 100644 --- a/src/app/metrics/mod.rs +++ b/src/app/metrics/mod.rs @@ -1,5 +1,6 @@ pub(crate) use aggregator::Aggregator; pub(crate) use dynamic_stats_collector::DynamicStatsCollector; +pub(crate) use dynamic_stats_collector::PercentileReport; pub(crate) use metric::{Metric, Metric2, MetricKey, Tags}; pub(crate) use stats_route::StatsRoute; diff --git a/src/app/mod.rs b/src/app/mod.rs index a5bc40b1..22fa2503 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -87,6 +87,17 @@ pub(crate) async fn run( let running_requests = Arc::new(AtomicI64::new(0)); let stats_collector = Arc::new(DynamicStatsCollector::start()); + let stats_collector_ = stats_collector.clone(); + + let (handler_timer_tx, handler_timer_rx) = crossbeam_channel::bounded(500); + std::thread::Builder::new() + .name("msg-handler-timings".into()) + .spawn(move || { + for (dur, method) in handler_timer_rx { + stats_collector_.record_future_time(dur, method); + } + }) + .expect("Failed to start msg-handler-timings thread"); // Context let context = AppContext::new( @@ -106,7 +117,11 @@ pub(crate) async fn run( }; // Message handler - let message_handler = Arc::new(MessageHandler::new(agent.clone(), context)); + let message_handler = Arc::new(MessageHandler::new( + agent.clone(), + context, + handler_timer_tx, + )); StatsRoute::start(config, message_handler.clone()); // Message loop