From 9218accf25abbafc4952211ea7a4e5befe9982f6 Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Tue, 8 Sep 2020 12:15:11 +0300 Subject: [PATCH] Add label to message.broadcast and metrics (#146) --- docs/src/api/message/broadcast.md | 1 + src/app/context.rs | 17 ++-- src/app/endpoint/message.rs | 10 +++ src/app/endpoint/metric.rs | 46 +++++++++- ...ollector.rs => db_pool_stats_collector.rs} | 4 +- src/app/metrics/dynamic_stats_collector.rs | 88 +++++++++++++++++++ src/app/metrics/mod.rs | 6 +- src/app/mod.rs | 4 +- src/db/mod.rs | 6 +- src/test_helpers/context.rs | 8 +- 10 files changed, 173 insertions(+), 17 deletions(-) rename src/app/metrics/{stats_collector.rs => db_pool_stats_collector.rs} (98%) create mode 100644 src/app/metrics/dynamic_stats_collector.rs diff --git a/docs/src/api/message/broadcast.md b/docs/src/api/message/broadcast.md index d37ecbf3..a52f957c 100644 --- a/docs/src/api/message/broadcast.md +++ b/docs/src/api/message/broadcast.md @@ -18,6 +18,7 @@ Name | Type | Default | Description ----------------- | ---------- | ---------- | ------------------ room_id | Uuid | _required_ | A destination room identifier. The room must be opened. data | JsonObject | _required_ | JSON object. +label | String | _optional_ | A label to group messages by in metrics. diff --git a/src/app/context.rs b/src/app/context.rs index 5bf1fab8..5867cb29 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -4,7 +4,7 @@ use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::cache::ConnectionPool as RedisConnectionPool; use svc_authz::ClientMap as Authz; -use crate::app::metrics::StatsCollector; +use crate::app::metrics::{DbPoolStatsCollector, DynamicStatsCollector}; use crate::config::Config; use crate::db::ConnectionPool as Db; @@ -19,7 +19,8 @@ pub(crate) struct AppContext { janus_topics: JanusTopics, queue_counter: Option, redis_pool: Option, - db_pool_stats: Option, + db_pool_stats: Option, + dynamic_stats: Option>, } impl AppContext { @@ -35,6 +36,7 @@ impl AppContext { queue_counter: None, redis_pool: None, db_pool_stats: None, + dynamic_stats: Some(Arc::new(DynamicStatsCollector::start())), } } @@ -52,7 +54,7 @@ impl AppContext { } } - pub(crate) fn db_pool_stats(self, stats: StatsCollector) -> Self { + pub(crate) fn db_pool_stats(self, stats: DbPoolStatsCollector) -> Self { Self { db_pool_stats: Some(stats), ..self @@ -68,7 +70,8 @@ pub(crate) trait Context: Sync { fn janus_topics(&self) -> &JanusTopics; fn queue_counter(&self) -> &Option; fn redis_pool(&self) -> &Option; - fn db_pool_stats(&self) -> &Option; + fn db_pool_stats(&self) -> &Option; + fn dynamic_stats(&self) -> Option<&DynamicStatsCollector>; } impl Context for AppContext { @@ -100,9 +103,13 @@ impl Context for AppContext { &self.redis_pool } - fn db_pool_stats(&self) -> &Option { + fn db_pool_stats(&self) -> &Option { &self.db_pool_stats } + + fn dynamic_stats(&self) -> Option<&DynamicStatsCollector> { + self.dynamic_stats.as_deref() + } } /////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/endpoint/message.rs b/src/app/endpoint/message.rs index a52446a2..0edb9c7e 100644 --- a/src/app/endpoint/message.rs +++ b/src/app/endpoint/message.rs @@ -85,6 +85,7 @@ impl RequestHandler for UnicastHandler { pub(crate) struct BroadcastRequest { room_id: Uuid, data: JsonValue, + label: Option, } pub(crate) struct BroadcastHandler; @@ -107,6 +108,12 @@ impl RequestHandler for BroadcastHandler { room }; + if let Some(stats) = context.dynamic_stats() { + if let Some(label) = payload.label { + stats.collect(&format!("message_broadcast_{}", label), 1); + } + } + // Respond and broadcast to the room topic. let response = shared::build_response(ResponseStatus::OK, json!({}), reqp, start_timestamp, None); @@ -400,6 +407,7 @@ mod test { let payload = BroadcastRequest { room_id: room.id(), data: json!({ "key": "value" }), + label: None, }; let messages = handle_request::(&context, &sender, payload) @@ -435,6 +443,7 @@ mod test { let payload = BroadcastRequest { room_id: Uuid::new_v4(), data: json!({ "key": "value" }), + label: None, }; let err = handle_request::(&context, &sender, payload) @@ -464,6 +473,7 @@ mod test { let payload = BroadcastRequest { room_id: room.id(), data: json!({ "key": "value" }), + label: None, }; let err = handle_request::(&context, &sender, payload) diff --git a/src/app/endpoint/metric.rs b/src/app/endpoint/metric.rs index 5e240e39..f8240073 100644 --- a/src/app/endpoint/metric.rs +++ b/src/app/endpoint/metric.rs @@ -33,7 +33,7 @@ impl MetricValue { Self { value, timestamp } } } -#[derive(Serialize, Debug, Copy, Clone)] +#[derive(Serialize, Debug, Clone)] #[serde(tag = "metric")] pub(crate) enum Metric { #[serde(rename(serialize = "apps.conference.incoming_requests_total"))] @@ -72,6 +72,11 @@ pub(crate) enum Metric { DbPoolTimeoutAverage(MetricValue), #[serde(rename(serialize = "apps.conference.max_db_pool_timeout_total"))] MaxDbPoolTimeout(MetricValue), + #[serde(serialize_with = "serialize_dynamic_metric")] + Dynamic { + key: String, + value: MetricValue, + }, } pub(crate) struct PullHandler; @@ -153,6 +158,10 @@ impl EventHandler for PullHandler { append_db_pool_stats(&mut metrics, context, now); + append_dynamic_stats(&mut metrics, context, now) + .map_err(|err| err.to_string()) + .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; + let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp); let props = evp.to_event("metric.create", short_term_timing); let outgoing_event = OutgoingEvent::multicast(metrics, props, account_id); @@ -184,3 +193,38 @@ fn append_db_pool_stats(metrics: &mut Vec, context: &dyn Context, now: D metrics.extend_from_slice(&m); } } + +fn append_dynamic_stats( + metrics: &mut Vec, + context: &dyn Context, + now: DateTime, +) -> anyhow::Result<()> { + if let Some(dynamic_stats) = context.dynamic_stats() { + for (key, value) in dynamic_stats.flush()? { + metrics.push(Metric::Dynamic { + key, + value: MetricValue::new(value as u64, now), + }); + } + } + + Ok(()) +} + +pub(crate) fn serialize_dynamic_metric( + key: K, + value: V, + serializer: S, +) -> std::result::Result +where + K: std::fmt::Display, + V: serde::Serialize, + S: serde::ser::Serializer, +{ + use serde::ser::SerializeMap; + + let mut map = serializer.serialize_map(Some(2))?; + map.serialize_entry("metric", &format!("app.conference.{}_total", key))?; + map.serialize_entry("value", &value)?; + map.end() +} diff --git a/src/app/metrics/stats_collector.rs b/src/app/metrics/db_pool_stats_collector.rs similarity index 98% rename from src/app/metrics/stats_collector.rs rename to src/app/metrics/db_pool_stats_collector.rs index 5c327ad1..24d3a2b2 100644 --- a/src/app/metrics/stats_collector.rs +++ b/src/app/metrics/db_pool_stats_collector.rs @@ -96,11 +96,11 @@ pub struct Stats { } #[derive(Debug, Clone)] -pub struct StatsCollector { +pub struct DbPoolStatsCollector { inner: Arc>, } -impl StatsCollector { +impl DbPoolStatsCollector { pub fn new() -> (Self, StatsTransmitter) { let inner: Arc> = Default::default(); let collector = Self { diff --git a/src/app/metrics/dynamic_stats_collector.rs b/src/app/metrics/dynamic_stats_collector.rs new file mode 100644 index 00000000..299e86de --- /dev/null +++ b/src/app/metrics/dynamic_stats_collector.rs @@ -0,0 +1,88 @@ +use std::collections::BTreeMap; +use std::thread; + +use anyhow::{Context, Result}; +use log::warn; + +enum Message { + Register { + key: String, + value: usize, + }, + Flush { + tx: crossbeam_channel::Sender>, + }, + Stop, +} + +pub(crate) struct DynamicStatsCollector { + tx: crossbeam_channel::Sender, +} + +impl DynamicStatsCollector { + pub(crate) fn start() -> Self { + let (tx, rx) = crossbeam_channel::unbounded(); + + thread::spawn(move || { + let mut data: BTreeMap = BTreeMap::new(); + + for message in rx { + match message { + Message::Register { key, value } => { + let current_value = data.get_mut(&key).map(|v| *v); + + match current_value { + Some(current_value) => data.insert(key, current_value + value), + None => data.insert(key, value), + }; + } + Message::Flush { tx } => { + let report = data.into_iter().collect::>(); + + if let Err(err) = tx.send(report) { + warn!("Failed to send dynamic stats collector report: {:?}", err); + } + + data = BTreeMap::new(); + } + Message::Stop => break, + } + } + }); + + Self { tx } + } + + pub(crate) fn collect(&self, key: impl Into, value: usize) { + let message = Message::Register { + key: key.into(), + value, + }; + + if let Err(err) = self.tx.send(message) { + warn!( + "Failed to register dynamic stats collector value: {:?}", + err + ); + } + } + + pub(crate) fn flush(&self) -> Result> { + let (tx, rx) = crossbeam_channel::bounded(1); + + self.tx + .send(Message::Flush { tx }) + .context("Failed to send flush message to the dynamic stats collector")?; + + rx.recv() + .context("Failed to receive dynamic stats collector report") + } +} + +impl Drop for DynamicStatsCollector { + fn drop(&mut self) { + if let Err(err) = self.tx.send(Message::Stop) { + warn!("Failed to stop dynamic stats collector: {:?}", err); + } + } +} diff --git a/src/app/metrics/mod.rs b/src/app/metrics/mod.rs index d2a879e7..d753a5b3 100644 --- a/src/app/metrics/mod.rs +++ b/src/app/metrics/mod.rs @@ -1,3 +1,5 @@ -pub(crate) use stats_collector::StatsCollector; +pub(crate) use db_pool_stats_collector::DbPoolStatsCollector; +pub(crate) use dynamic_stats_collector::DynamicStatsCollector; -mod stats_collector; +mod db_pool_stats_collector; +mod dynamic_stats_collector; diff --git a/src/app/mod.rs b/src/app/mod.rs index e1a2f06d..d453674f 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -21,7 +21,7 @@ use svc_authz::cache::{Cache as AuthzCache, ConnectionPool as RedisConnectionPoo use svc_error::{extension::sentry, Error as SvcError}; use crate::app::context::Context; -use crate::app::metrics::StatsCollector; +use crate::app::metrics::DbPoolStatsCollector; use crate::config::{self, Config, KruonisConfig}; use crate::db::ConnectionPool; use context::{AppContext, JanusTopics}; @@ -35,7 +35,7 @@ pub(crate) async fn run( db: &ConnectionPool, redis_pool: Option, authz_cache: Option, - db_pool_stats: StatsCollector, + db_pool_stats: DbPoolStatsCollector, ) -> Result<()> { // Config let config = config::load().expect("Failed to load config"); diff --git a/src/db/mod.rs b/src/db/mod.rs index 4b961b21..122223be 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -4,7 +4,7 @@ use diesel::r2d2::{ConnectionManager, Pool}; use std::sync::Arc; use std::time::Duration; -use crate::app::metrics::StatsCollector; +use crate::app::metrics::DbPoolStatsCollector; pub(crate) type ConnectionPool = Arc>>; @@ -14,9 +14,9 @@ pub(crate) fn create_pool( idle_size: Option, timeout: u64, enable_stats: bool, -) -> (ConnectionPool, StatsCollector) { +) -> (ConnectionPool, DbPoolStatsCollector) { let manager = ConnectionManager::::new(url); - let (collector, transmitter) = StatsCollector::new(); + let (collector, transmitter) = DbPoolStatsCollector::new(); let builder = Pool::builder() .max_size(size) diff --git a/src/test_helpers/context.rs b/src/test_helpers/context.rs index 84c4e710..5288d317 100644 --- a/src/test_helpers/context.rs +++ b/src/test_helpers/context.rs @@ -4,7 +4,7 @@ use svc_authz::cache::ConnectionPool as RedisConnectionPool; use svc_authz::ClientMap as Authz; use crate::app::context::{Context, JanusTopics}; -use crate::app::metrics::StatsCollector; +use crate::app::metrics::{DbPoolStatsCollector, DynamicStatsCollector}; use crate::config::Config; use crate::db::ConnectionPool as Db; @@ -93,7 +93,11 @@ impl Context for TestContext { &None } - fn db_pool_stats(&self) -> &Option { + fn db_pool_stats(&self) -> &Option { &None } + + fn dynamic_stats(&self) -> Option<&DynamicStatsCollector> { + None + } }