diff --git a/src/app/janus.rs b/src/app/janus.rs index e659d07f..b76fb1da 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -24,7 +24,7 @@ use crate::app::message_handler::{MessageStream, SvcErrorSugar}; use crate::app::API_VERSION; use crate::backend::janus::{ CreateHandleRequest, CreateSessionRequest, ErrorResponse, IncomingEvent, IncomingResponse, - MessageRequest, StatusEvent, TrickleRequest, JANUS_API_VERSION, + MessageRequest, OpaqueId, StatusEvent, TrickleRequest, JANUS_API_VERSION, }; use crate::db::{janus_backend, janus_rtc_stream, recording, room, rtc}; use crate::util::{from_base64, generate_correlation_data, to_base64}; @@ -203,7 +203,7 @@ impl CreateRtcHandleTransaction { pub(crate) fn create_rtc_handle_request( reqp: IncomingRequestProperties, - rtc_handle_id: Uuid, + rtc_stream_id: Uuid, rtc_id: Uuid, session_id: i64, to: &A, @@ -227,7 +227,7 @@ where let transaction = Transaction::CreateRtcHandle(CreateRtcHandleTransaction::new( reqp, - rtc_handle_id, + rtc_stream_id, rtc_id, session_id, )); @@ -236,7 +236,7 @@ where &to_base64(&transaction)?, session_id, "janus.plugin.conference", - Some(&rtc_handle_id.to_string()), + Some(&rtc_stream_id.to_string()), ); Ok(OutgoingRequest::unicast( @@ -1132,52 +1132,61 @@ async fn handle_event_impl( } } IncomingEvent::HangUp(ref inev) => { - let rtc_stream_id = Uuid::from_str(inev.opaque_id()) - .map_err(|err| format!("Failed to parse opaque id as uuid: {}", err)) - .status(ResponseStatus::BAD_REQUEST)?; - - let conn = context.db().get()?; - - // If the event relates to a publisher's handle, - // we will find the corresponding stream and send event w/ updated stream object - // to the room's topic. - if let Some(rtc_stream) = janus_rtc_stream::stop(rtc_stream_id, &conn)? { - let rtc_id = rtc_stream.rtc_id(); - - let room = room::FindQuery::new() - .time(room::now()) - .rtc_id(rtc_id) - .execute(&conn)? - .ok_or_else(|| format!("a room for rtc = '{}' is not found", &rtc_id)) - .status(ResponseStatus::NOT_FOUND)?; - - // Publish the update event if only stream object has been changed - // (if there weren't any actual media stream, the object won't contain its start time) - if let Some(_) = rtc_stream.time() { - let event = endpoint::rtc_stream::update_event( - room.id(), - rtc_stream, - start_timestamp, - evp.tracking(), - )?; - - let boxed_event = Box::new(event) as Box; - return Ok(Box::new(stream::once(boxed_event))); - } - } - - Ok(Box::new(stream::empty())) + handle_hangup_detach(context, start_timestamp, inev, evp) + } + IncomingEvent::Detached(ref inev) => { + handle_hangup_detach(context, start_timestamp, inev, evp) } - IncomingEvent::Detached(_) - | IncomingEvent::Media(_) - | IncomingEvent::Timeout(_) - | IncomingEvent::SlowLink(_) => { + IncomingEvent::Media(_) | IncomingEvent::Timeout(_) | IncomingEvent::SlowLink(_) => { // Ignore these kinds of events. Ok(Box::new(stream::empty())) } } } +fn handle_hangup_detach( + context: &C, + start_timestamp: DateTime, + inev: &E, + evp: &IncomingEventProperties, +) -> Result { + let rtc_stream_id = Uuid::from_str(inev.opaque_id()) + .map_err(|err| format!("Failed to parse opaque id as uuid: {}", err)) + .status(ResponseStatus::BAD_REQUEST)?; + + let conn = context.db().get()?; + + // If the event relates to a publisher's handle, + // we will find the corresponding stream and send event w/ updated stream object + // to the room's topic. + if let Some(rtc_stream) = janus_rtc_stream::stop(rtc_stream_id, &conn)? { + let rtc_id = rtc_stream.rtc_id(); + + let room = room::FindQuery::new() + .time(room::now()) + .rtc_id(rtc_id) + .execute(&conn)? + .ok_or_else(|| format!("a room for rtc = '{}' is not found", &rtc_id)) + .status(ResponseStatus::NOT_FOUND)?; + + // Publish the update event if only stream object has been changed + // (if there weren't any actual media stream, the object won't contain its start time) + if let Some(_) = rtc_stream.time() { + let event = endpoint::rtc_stream::update_event( + room.id(), + rtc_stream, + start_timestamp, + evp.tracking(), + )?; + + let boxed_event = Box::new(event) as Box; + return Ok(Box::new(stream::once(boxed_event))); + } + } + + Ok(Box::new(stream::empty())) +} + pub(crate) async fn handle_status_event( context: &C, event: &MQTTIncomingEvent, diff --git a/src/backend/janus.rs b/src/backend/janus.rs index 8a64505d..bf8b0cc1 100644 --- a/src/backend/janus.rs +++ b/src/backend/janus.rs @@ -3,6 +3,10 @@ use serde_json::Value as JsonValue; pub(crate) const JANUS_API_VERSION: &str = "v1"; +pub(crate) trait OpaqueId { + fn opaque_id(&self) -> &str; +} + //////////////////////////////////////////////////////////////////////////////// // Creating a session @@ -326,8 +330,8 @@ pub(crate) struct WebRtcUpEvent { opaque_id: String, } -impl WebRtcUpEvent { - pub(crate) fn opaque_id(&self) -> &str { +impl OpaqueId for WebRtcUpEvent { + fn opaque_id(&self) -> &str { &self.opaque_id } } @@ -344,8 +348,8 @@ pub(crate) struct HangUpEvent { reason: String, } -impl HangUpEvent { - pub(crate) fn opaque_id(&self) -> &str { +impl OpaqueId for HangUpEvent { + fn opaque_id(&self) -> &str { &self.opaque_id } } @@ -395,6 +399,12 @@ pub(crate) struct DetachedEvent { opaque_id: String, } +impl OpaqueId for DetachedEvent { + fn opaque_id(&self) -> &str { + &self.opaque_id + } +} + //////////////////////////////////////////////////////////////////////////////// // Janus Gateway actual status