From 42979fba0321b365a74e120bbfe249f7b6a8f14a Mon Sep 17 00:00:00 2001 From: Shamir Khodzha Date: Fri, 29 May 2020 12:14:38 +0300 Subject: [PATCH] queue len metrics --- Cargo.toml | 2 +- src/app/context.rs | 16 +++++++- src/app/endpoint/metric.rs | 74 ++++++++++++++++++++++++++++++++----- src/app/mod.rs | 3 +- src/test_helpers/context.rs | 6 ++- 5 files changed, 87 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d88a797b..d7572ea6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ rand = "0.7" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -svc-agent = { version = "0.13", features = ["diesel"] } +svc-agent = { version = "0.13", features = ["diesel", "queue-counter"] } svc-authn = { version = "0.5", features = ["jose", "diesel"] } svc-authz = "0.10" svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } diff --git a/src/app/context.rs b/src/app/context.rs index 51dc753d..c00f6e03 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use svc_agent::AgentId; +use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::ClientMap as Authz; use crate::config::Config; @@ -15,6 +15,7 @@ pub(crate) struct AppContext { db: Db, agent_id: AgentId, janus_topics: JanusTopics, + queue_counter: Option, } impl AppContext { @@ -27,6 +28,14 @@ impl AppContext { db, agent_id, janus_topics, + queue_counter: None, + } + } + + pub(crate) fn add_queue_counter(self, qc: QueueCounterHandle) -> Self { + Self { + queue_counter: Some(qc), + ..self } } } @@ -37,6 +46,7 @@ pub(crate) trait Context: Sync { fn db(&self) -> &Db; fn agent_id(&self) -> &AgentId; fn janus_topics(&self) -> &JanusTopics; + fn queue_counter(&self) -> &Option; } impl Context for AppContext { @@ -59,6 +69,10 @@ impl Context for AppContext { fn janus_topics(&self) -> &JanusTopics { &self.janus_topics } + + fn queue_counter(&self) -> &Option { + &self.queue_counter + } } /////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/endpoint/metric.rs b/src/app/endpoint/metric.rs index 68681e7d..048734da 100644 --- a/src/app/endpoint/metric.rs +++ b/src/app/endpoint/metric.rs @@ -3,7 +3,8 @@ use async_trait::async_trait; use chrono::{serde::ts_seconds, DateTime, Utc}; use serde_derive::{Deserialize, Serialize}; use svc_agent::mqtt::{ - IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ShortTermTimingProperties, + IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ResponseStatus, + ShortTermTimingProperties, }; use crate::app::context::Context; @@ -11,7 +12,14 @@ use crate::app::endpoint::prelude::*; use crate::config::TelemetryConfig; #[derive(Debug, Deserialize)] -pub(crate) struct PullPayload {} +pub(crate) struct PullPayload { + #[serde(default = "default_duration")] + duration: u64, +} + +fn default_duration() -> u64 { + 5 +} #[derive(Serialize, Debug)] pub(crate) struct MetricValue { @@ -23,8 +31,18 @@ pub(crate) struct MetricValue { #[derive(Serialize, Debug)] #[serde(tag = "metric")] pub(crate) enum Metric { - //IncomingQueue(MetricValue), - //OutgoingQueue(MetricValue), + #[serde(rename(serialize = "apps.conference.incoming_requests_total"))] + IncomingQueueRequests(MetricValue), + #[serde(rename(serialize = "apps.conference.incoming_responses_total"))] + IncomingQueueResponses(MetricValue), + #[serde(rename(serialize = "apps.conference.incoming_events_total"))] + IncomingQueueEvents(MetricValue), + #[serde(rename(serialize = "apps.conference.outgoing_requests_total"))] + OutgoingQueueRequests(MetricValue), + #[serde(rename(serialize = "apps.conference.outgoing_responses_total"))] + OutgoingQueueResponses(MetricValue), + #[serde(rename(serialize = "apps.conference.outgoing_events_total"))] + OutgoingQueueEvents(MetricValue), #[serde(rename(serialize = "apps.conference.db_connections_total"))] DbConnections(MetricValue), } @@ -37,7 +55,7 @@ impl EventHandler for PullHandler { async fn handle( context: &C, - _payload: Self::Payload, + payload: Self::Payload, evp: &IncomingEventProperties, start_timestamp: DateTime, ) -> Result { @@ -45,15 +63,51 @@ impl EventHandler for PullHandler { TelemetryConfig { id: Some(ref account_id), } => { - let outgoing_event_payload = vec![Metric::DbConnections(MetricValue { + let now = Utc::now(); + + let mut metrics = if let Some(qc) = context.queue_counter() { + let stats = qc + .get_stats(payload.duration) + .status(ResponseStatus::BAD_REQUEST)?; + + vec![ + Metric::IncomingQueueRequests(MetricValue { + value: stats.incoming_requests, + timestamp: now, + }), + Metric::IncomingQueueResponses(MetricValue { + value: stats.incoming_responses, + timestamp: now, + }), + Metric::IncomingQueueEvents(MetricValue { + value: stats.incoming_events, + timestamp: now, + }), + Metric::OutgoingQueueRequests(MetricValue { + value: stats.outgoing_requests, + timestamp: now, + }), + Metric::OutgoingQueueResponses(MetricValue { + value: stats.outgoing_responses, + timestamp: now, + }), + Metric::OutgoingQueueEvents(MetricValue { + value: stats.outgoing_events, + timestamp: now, + }), + ] + } else { + vec![] + }; + + metrics.push(Metric::DbConnections(MetricValue { value: context.db().state().connections as u64, - timestamp: Utc::now(), - })]; + timestamp: now, + })); let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp); let props = evp.to_event("metric.create", short_term_timing); - let outgoing_event = - OutgoingEvent::multicast(outgoing_event_payload, props, account_id); + let outgoing_event = OutgoingEvent::multicast(metrics, props, account_id); let boxed_event = Box::new(outgoing_event) as Box; Ok(Box::new(stream::once(boxed_event))) diff --git a/src/app/mod.rs b/src/app/mod.rs index 63caf1ab..0f28999d 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -76,7 +76,8 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option) -> let janus_topics = subscribe(&mut agent, &agent_id, &config)?; // Context - let context = AppContext::new(config.clone(), authz, db.clone(), janus_topics); + let context = AppContext::new(config.clone(), authz, db.clone(), janus_topics) + .add_queue_counter(agent.get_queue_counter()); // Message handler let message_handler = Arc::new(MessageHandler::new(agent.clone(), context)); diff --git a/src/test_helpers/context.rs b/src/test_helpers/context.rs index e8d1b66e..6a57d439 100644 --- a/src/test_helpers/context.rs +++ b/src/test_helpers/context.rs @@ -1,5 +1,5 @@ use serde_json::json; -use svc_agent::AgentId; +use svc_agent::{queue_counter::QueueCounterHandle, AgentId}; use svc_authz::ClientMap as Authz; use crate::app::context::{Context, JanusTopics}; @@ -82,4 +82,8 @@ impl Context for TestContext { fn janus_topics(&self) -> &JanusTopics { &self.janus_topics } + + fn queue_counter(&self) -> &Option { + &None + } }