diff --git a/src/app/endpoint/metric.rs b/src/app/endpoint/metric.rs new file mode 100644 index 00000000..3bfa3f7e --- /dev/null +++ b/src/app/endpoint/metric.rs @@ -0,0 +1,64 @@ +use async_std::stream; +use async_trait::async_trait; +use chrono::{serde::ts_seconds, DateTime, Utc}; +use serde_derive::{Deserialize, Serialize}; +use svc_agent::mqtt::{ + IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ShortTermTimingProperties, +}; + +use crate::app::context::Context; +use crate::app::endpoint::prelude::*; +use crate::config::TelemetryConfig; + +#[derive(Debug, Deserialize)] +pub(crate) struct PullPayload {} + +#[derive(Serialize, Debug)] +pub(crate) struct MetricValue { + value: u64, + #[serde(with = "ts_seconds")] + timestamp: DateTime, +} + +#[derive(Serialize, Debug)] +#[serde(tag = "metric")] +pub(crate) enum Metric { + //IncomingQueue(MetricValue), + //OutgoingQueue(MetricValue), + #[serde(rename(serialize = "apps.conference.db_connections_total"))] + DbConnections(MetricValue), +} + +pub(crate) struct PullHandler; + +#[async_trait] +impl EventHandler for PullHandler { + type Payload = PullPayload; + + async fn handle( + context: &C, + _payload: Self::Payload, + evp: &IncomingEventProperties, + start_timestamp: DateTime, + ) -> Result { + match context.config().telemetry { + TelemetryConfig { + id: Some(ref account_id), + } => { + let outgoing_event_payload = vec![Metric::DbConnections(MetricValue { + value: context.db().state().connections as u64, + timestamp: Utc::now(), + })]; + + let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp); + let props = evp.to_event("metric.create", short_term_timing); + let outgoing_event = + OutgoingEvent::multicast(outgoing_event_payload, props, account_id); + let boxed_event = Box::new(outgoing_event) as Box; + Ok(Box::new(stream::once(boxed_event))) + } + + _ => Ok(Box::new(stream::empty())), + } + } +} diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index aea53af6..179febd8 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -154,6 +154,7 @@ macro_rules! event_routes { // Event routes configuration: label => EventHandler event_routes!( + "metric.pull" => metric::PullHandler, "subscription.delete" => subscription::DeleteHandler, "subscription.create" => subscription::CreateHandler ); @@ -162,6 +163,7 @@ event_routes!( mod agent; mod message; +mod metric; mod room; pub(crate) mod rtc; pub(crate) mod rtc_signal; diff --git a/src/app/mod.rs b/src/app/mod.rs index 91c91f56..94a5528e 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -2,16 +2,21 @@ use std::sync::Arc; use std::thread; use async_std::task; +use chrono::Utc; use futures::StreamExt; use log::{error, info}; +use serde_json::json; use svc_agent::{ - mqtt::{AgentBuilder, ConnectionMode, Notification, QoS, SubscriptionTopic}, - AgentId, Authenticable, SharedGroup, Subscription, + mqtt::{ + Agent, AgentBuilder, ConnectionMode, IntoPublishableDump, Notification, OutgoingRequest, + OutgoingRequestProperties, QoS, ShortTermTimingProperties, SubscriptionTopic, + }, + AccountId, AgentId, Authenticable, SharedGroup, Subscription, }; use svc_authn::token::jws_compact; use svc_authz::cache::Cache as AuthzCache; -use crate::config; +use crate::config::{self, KruonisConfig}; use crate::db::ConnectionPool; use context::{AppContext, JanusTopics}; use message_handler::MessageHandler; @@ -107,6 +112,21 @@ pub(crate) async fn run( .subscribe(&subscription, QoS::AtLeastOnce, Some(&group)) .map_err(|err| format!("Error subscribing to backend responses topic: {}", err))?; + agent + .subscribe( + &Subscription::unicast_requests(), + QoS::AtMostOnce, + Some(&group), + ) + .map_err(|err| format!("Error subscribing to unicast requests: {}", err))?; + + if let KruonisConfig { + id: Some(ref kruonis_id), + } = config.kruonis + { + subscribe_to_kruonis(kruonis_id, &mut agent)?; + } + let janus_responses_topic = subscription .subscription_topic(&agent_id, API_VERSION) .map_err(|err| format!("Error building janus responses subscription topic: {}", err))?; @@ -142,6 +162,31 @@ pub(crate) async fn run( Ok(()) } +fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<(), String> { + let timing = ShortTermTimingProperties::new(Utc::now()); + let topic = Subscription::unicast_requests_from(kruonis_id) + .subscription_topic(agent.id(), API_VERSION) + .map_err(|err| format!("Failed to build subscription topic: {:?}", err))?; + let props = OutgoingRequestProperties::new("kruonis.subscribe", &topic, "", timing); + let event = OutgoingRequest::multicast(json!({}), props, kruonis_id); + let message = Box::new(event) as Box; + + let dump = message + .into_dump(agent.address()) + .map_err(|err| format!("Failed to dump message: {}", err))?; + + info!( + "Outgoing message = '{}' sending to the topic = '{}'", + dump.payload(), + dump.topic(), + ); + + agent + .publish_dump(dump) + .map_err(|err| format!("Failed to publish message: {}", err))?; + Ok(()) +} + pub(crate) mod context; pub(crate) mod endpoint; pub(crate) mod handle_id; diff --git a/src/config.rs b/src/config.rs index 1473e057..b0145109 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,10 @@ pub(crate) struct Config { pub(crate) authz: Authz, pub(crate) mqtt: AgentConfig, pub(crate) sentry: Option, + #[serde(default)] + pub(crate) telemetry: TelemetryConfig, + #[serde(default)] + pub(crate) kruonis: KruonisConfig, } #[derive(Clone, Debug, Deserialize)] @@ -31,3 +35,13 @@ pub(crate) fn load() -> Result { parser.merge(config::Environment::with_prefix("APP").separator("__"))?; parser.try_into::() } + +#[derive(Clone, Debug, Deserialize, Default)] +pub(crate) struct TelemetryConfig { + pub(crate) id: Option, +} + +#[derive(Clone, Debug, Deserialize, Default)] +pub(crate) struct KruonisConfig { + pub(crate) id: Option, +}