diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index 91aa2471..3fd7e55b 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -4,7 +4,7 @@ use svc_agent::mqtt::{ IncomingEvent, IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ResponseStatus, ShortTermTimingProperties, }; -use svc_agent::AgentId; +use svc_agent::{AccountId, AgentId}; use svc_authn::Authenticable; use svc_error::Error as SvcError; use uuid::Uuid; @@ -40,13 +40,15 @@ impl RoomEnterLeaveEventData { pub(crate) struct State { broker_account_id: svc_agent::AccountId, db: ConnectionPool, + me: AgentId, } impl State { - pub(crate) fn new(broker_account_id: svc_agent::AccountId, db: ConnectionPool) -> Self { + pub(crate) fn new(broker_account_id: AccountId, db: ConnectionPool, me: AgentId) -> Self { Self { broker_account_id, db, + me, } } } @@ -132,6 +134,7 @@ impl State { backend.handle_id(), agent_id, backend.id(), + &self.me, evt.properties().tracking(), ); @@ -219,8 +222,14 @@ mod test { use super::*; fn build_state(db: &TestDb) -> State { - let account_id = svc_agent::AccountId::new("mqtt-gateway", AUDIENCE); - State::new(account_id, db.connection_pool().clone()) + let broker_account_id = svc_agent::AccountId::new("mqtt-gateway", AUDIENCE); + let me = TestAgent::new("alpha", "conference", AUDIENCE); + + State::new( + broker_account_id, + db.connection_pool().clone(), + me.agent_id().to_owned(), + ) } #[test] diff --git a/src/app/janus.rs b/src/app/janus.rs index 6d77a90b..90489430 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use failure::{err_msg, format_err, Error}; use log::{info, warn}; @@ -501,22 +503,22 @@ impl AgentLeaveRequestBody { } } -pub(crate) fn agent_leave_request( +pub(crate) fn agent_leave_request( evp: IncomingEventProperties, session_id: i64, handle_id: i64, agent_id: &AgentId, + to: &T, me: &M, tracking: &TrackingProperties, ) -> Result, Error> where + T: Addressable, M: Addressable, { - let to = evp.as_agent_id().to_owned(); - let mut props = OutgoingRequestProperties::new( "janus_conference_agent.leave", - &response_topic(&to, me)?, + &response_topic(to, me)?, &generate_correlation_data(), ShortTermTimingProperties::new(Utc::now()), ); @@ -537,7 +539,7 @@ where Ok(OutgoingRequest::unicast( payload, props, - &to, + to, JANUS_API_VERSION, )) } @@ -982,6 +984,7 @@ where )), } } + pub(crate) async fn handle_event( payload: Arc>, janus: Arc, @@ -992,15 +995,15 @@ pub(crate) async fn handle_event( match message.payload() { IncomingEvent::WebRtcUp(ref inev) => { - use std::str::FromStr; let rtc_stream_id = Uuid::from_str(inev.opaque_id())?; - let conn = janus.db.get()?; + // If the event relates to a publisher's handle, // we will find the corresponding stream and send event w/ updated stream object // to the room's topic. if let Some(rtc_stream) = janus_rtc_stream::start(rtc_stream_id, &conn)? { let rtc_id = rtc_stream.rtc_id(); + let room = room::FindQuery::new() .time(room::now()) .rtc_id(rtc_id) @@ -1019,16 +1022,16 @@ pub(crate) async fn handle_event( Ok(vec![]) } } - IncomingEvent::HangUp(ref inev) => { - use std::str::FromStr; + IncomingEvent::Detached(ref inev) => { let rtc_stream_id = Uuid::from_str(inev.opaque_id())?; - let conn = janus.db.get()?; + // If the event relates to a publisher's handle, // we will find the corresponding stream and send event w/ updated stream object // to the room's topic. if let Some(rtc_stream) = janus_rtc_stream::stop(rtc_stream_id, &conn)? { let rtc_id = rtc_stream.rtc_id(); + let room = room::FindQuery::new() .time(room::now()) .rtc_id(rtc_id) @@ -1051,10 +1054,10 @@ pub(crate) async fn handle_event( Ok(vec![]) } - IncomingEvent::Media(_) + IncomingEvent::HangUp(_) + | IncomingEvent::Media(_) | IncomingEvent::Timeout(_) - | IncomingEvent::SlowLink(_) - | IncomingEvent::Detached(_) => { + | IncomingEvent::SlowLink(_) => { // Ignore these kinds of events. Ok(vec![]) } diff --git a/src/app/mod.rs b/src/app/mod.rs index 3b865e00..1f894050 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -95,7 +95,11 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option) -> rtc_signal: endpoint::rtc_signal::State::new(authz.clone(), db.clone(), agent_id.clone()), rtc_stream: endpoint::rtc_stream::State::new(authz.clone(), db.clone()), message: endpoint::message::State::new(agent_id.clone(), db.clone()), - subscription: endpoint::subscription::State::new(config.broker_id, db.clone()), + subscription: endpoint::subscription::State::new( + config.broker_id, + db.clone(), + agent_id.clone(), + ), system: endpoint::system::State::new(agent_id.clone(), authz.clone(), db.clone()), }); diff --git a/src/backend/janus.rs b/src/backend/janus.rs index d68669d5..5646d551 100644 --- a/src/backend/janus.rs +++ b/src/backend/janus.rs @@ -344,12 +344,6 @@ pub(crate) struct HangUpEvent { reason: String, } -impl HangUpEvent { - pub(crate) fn opaque_id(&self) -> &str { - &self.opaque_id - } -} - //////////////////////////////////////////////////////////////////////////////// // Audio or video bytes being received by plugin handle @@ -396,6 +390,12 @@ pub(crate) struct DetachedEvent { opaque_id: String, } +impl DetachedEvent { + pub(crate) fn opaque_id(&self) -> &str { + &self.opaque_id + } +} + //////////////////////////////////////////////////////////////////////////////// // Janus Gateway actual status