diff --git a/Cargo.lock b/Cargo.lock index 77e9405e..04a318b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,7 +185,7 @@ dependencies = [ "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", - "svc-agent 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", + "svc-agent 0.9.6 (git+https://github.com/netology-group/svc-agent-rs)", "svc-authn 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authz 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "svc-error 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1948,7 +1948,7 @@ dependencies = [ [[package]] name = "svc-agent" version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/netology-group/svc-agent-rs#da9c5749bb9e07f3b08a6a5a9852c9043bef30c6" dependencies = [ "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2007,7 +2007,7 @@ dependencies = [ "sentry 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)", - "svc-agent 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", + "svc-agent 0.9.6 (git+https://github.com/netology-group/svc-agent-rs)", "svc-authn 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authz 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2871,7 +2871,7 @@ dependencies = [ "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum string 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d24114bfcceb867ca7f71a0d3fe45d45619ec47a6fbfa98cb14e14250bfa5d6d" "checksum surf 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "741a8008f8a833ef16f47df94a30754478fb2c2bf822b9c2e6f7f09203b97ace" -"checksum svc-agent 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)" = "1f2a2f1093e76c95b8d77d5c8b9452151e01be7b809f3bea5665dfd10f7b0bfb" +"checksum svc-agent 0.9.6 (git+https://github.com/netology-group/svc-agent-rs)" = "" "checksum svc-authn 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "155857a0822913ba103a75f043e2e8594e78d071fe4440b443870e7559681664" "checksum svc-authz 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "437d2f44d68fe60946c17de439fafa10831c67f1ae0ce7bd1a6b98e76160adc3" "checksum svc-error 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "a698f5401ae58a79c3e8c7345a43427bb7f53c9a9dac094f8c0e2e7b2374fd16" diff --git a/Cargo.toml b/Cargo.toml index 2440ed3a..735f748f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,6 @@ svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-a [dev-dependencies] rand = "0.7" + +[patch.crates-io] +svc-agent = { git = "https://github.com/netology-group/svc-agent-rs" } diff --git a/src/app/endpoint/rtc_stream.rs b/src/app/endpoint/rtc_stream.rs index 73b22577..2fd8f88c 100644 --- a/src/app/endpoint/rtc_stream.rs +++ b/src/app/endpoint/rtc_stream.rs @@ -93,6 +93,7 @@ impl State { Some(room_id), inreq.payload().rtc_id, inreq.payload().time, + None, inreq.payload().offset, Some(std::cmp::min( inreq.payload().limit.unwrap_or_else(|| MAX_LIMIT), diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index a2db1e10..07903372 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use serde_derive::{Deserialize, Serialize}; use svc_agent::mqtt::{ - Connection, IncomingEvent, IncomingEventProperties, OutgoingEvent, ResponseStatus, + Connection, IncomingEvent, IncomingEventProperties, OutgoingEvent, Publishable, ResponseStatus, ShortTermTimingProperties, }; use svc_agent::AgentId; @@ -10,7 +10,7 @@ use svc_error::Error as SvcError; use uuid::Uuid; use crate::app::endpoint; -use crate::db::{agent, room, ConnectionPool}; +use crate::db::{agent, janus_backend, janus_rtc_stream, room, ConnectionPool}; /////////////////////////////////////////////////////////////////////////////// @@ -100,11 +100,54 @@ impl State { let row_count = agent::DeleteQuery::new(agent_id, room_id).execute(&conn)?; if row_count == 1 { + // Event to room topic. let payload = RoomEnterLeaveEventData::new(room_id.to_owned(), agent_id.to_owned()); let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp); let props = evt.properties().to_event("room.leave", short_term_timing); let to_uri = format!("rooms/{}/events", room_id); - OutgoingEvent::broadcast(payload, props, &to_uri).into() + let outgoing_event = OutgoingEvent::broadcast(payload, props, &to_uri); + let mut messages: Vec> = vec![Box::new(outgoing_event)]; + + // `agent.leave` requests to Janus instances that host active streams in this room. + let streams = janus_rtc_stream::ListQuery::new() + .room_id(room_id) + .active(true) + .execute(&conn)?; + + let mut backend_ids = streams + .iter() + .map(|stream| stream.backend_id()) + .collect::>(); + + backend_ids.dedup(); + + let backends = janus_backend::ListQuery::new() + .ids(&backend_ids[..]) + .execute(&conn)?; + + for backend in backends { + let result = crate::app::janus::agent_leave_request( + evt.properties().to_owned(), + backend.session_id(), + backend.handle_id(), + agent_id, + backend.id(), + evt.properties().tracking(), + ); + + match result { + Ok(req) => messages.push(Box::new(req)), + Err(_) => { + return SvcError::builder() + .status(ResponseStatus::UNPROCESSABLE_ENTITY) + .detail("error creating a backend request") + .build() + .into() + } + } + } + + messages.into() } else { let err = format!( "the agent is not found for agent_id = '{}', room = '{}'", diff --git a/src/app/janus.rs b/src/app/janus.rs index 8b017b9e..30fbd941 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -7,8 +7,8 @@ use std::ops::Bound; use std::sync::Arc; use svc_agent::mqtt::{ compat::{into_event, IncomingEnvelope}, - IncomingRequestProperties, OutgoingRequest, OutgoingRequestProperties, Publishable, - ResponseStatus, ShortTermTimingProperties, TrackingProperties, + IncomingEventProperties, IncomingRequestProperties, OutgoingRequest, OutgoingRequestProperties, + Publishable, ResponseStatus, ShortTermTimingProperties, TrackingProperties, }; use svc_agent::{Addressable, AgentId}; use svc_error::Error as SvcError; @@ -38,6 +38,7 @@ pub(crate) enum Transaction { ReadStream(ReadStreamTransaction), UploadStream(UploadStreamTransaction), Trickle(TrickleTransaction), + AgentLeave(AgentLeaveTransaction), } //////////////////////////////////////////////////////////////////////////////// @@ -192,13 +193,15 @@ impl CreateStreamTransaction { pub(crate) struct CreateStreamRequestBody { method: &'static str, id: Uuid, + agent_id: AgentId, } impl CreateStreamRequestBody { - pub(crate) fn new(id: Uuid) -> Self { + pub(crate) fn new(id: Uuid, agent_id: AgentId) -> Self { Self { method: "stream.create", id, + agent_id, } } } @@ -226,8 +229,9 @@ where short_term_timing, ); + let agent_id = reqp.to_connection().agent_id().to_owned(); + let body = CreateStreamRequestBody::new(rtc_id, agent_id); let transaction = Transaction::CreateStream(CreateStreamTransaction::new(reqp)); - let body = CreateStreamRequestBody::new(rtc_id); let payload = MessageRequest::new( &to_base64(&transaction)?, @@ -257,13 +261,15 @@ impl ReadStreamTransaction { pub(crate) struct ReadStreamRequestBody { method: &'static str, id: Uuid, + agent_id: AgentId, } impl ReadStreamRequestBody { - pub(crate) fn new(id: Uuid) -> Self { + pub(crate) fn new(id: Uuid, agent_id: AgentId) -> Self { Self { method: "stream.read", id, + agent_id, } } } @@ -291,8 +297,9 @@ where short_term_timing, ); + let agent_id = reqp.to_connection().agent_id().to_owned(); + let body = ReadStreamRequestBody::new(rtc_id, agent_id); let transaction = Transaction::ReadStream(ReadStreamTransaction::new(reqp)); - let body = ReadStreamRequestBody::new(rtc_id); let payload = MessageRequest::new( &to_base64(&transaction)?, @@ -406,6 +413,68 @@ where //////////////////////////////////////////////////////////////////////////////// +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct AgentLeaveTransaction { + evp: IncomingEventProperties, +} + +impl AgentLeaveTransaction { + pub(crate) fn new(evp: IncomingEventProperties) -> Self { + Self { evp } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct AgentLeaveRequestBody { + method: &'static str, + agent_id: AgentId, +} + +impl AgentLeaveRequestBody { + pub(crate) fn new(agent_id: AgentId) -> Self { + Self { + method: "agent.leave", + agent_id, + } + } +} + +pub(crate) fn agent_leave_request( + evp: IncomingEventProperties, + session_id: i64, + handle_id: i64, + agent_id: &AgentId, + to: &A, + tracking: &TrackingProperties, +) -> Result, Error> +where + A: Addressable, +{ + let mut props = OutgoingRequestProperties::new( + "janus_conference_agent.leave", + IGNORE, + IGNORE, + ShortTermTimingProperties::new(Utc::now()), + ); + + props.set_tracking(tracking.to_owned()); + + let transaction = Transaction::AgentLeave(AgentLeaveTransaction::new(evp)); + let body = AgentLeaveRequestBody::new(agent_id.to_owned()); + + let payload = MessageRequest::new( + &to_base64(&transaction)?, + session_id, + handle_id, + serde_json::to_value(&body)?, + None, + ); + + Ok(OutgoingRequest::unicast(payload, props, to)) +} + +//////////////////////////////////////////////////////////////////////////////// + pub(crate) struct State { db: ConnectionPool, } diff --git a/src/db/janus_backend.rs b/src/db/janus_backend.rs index fadf754d..d64d7c94 100644 --- a/src/db/janus_backend.rs +++ b/src/db/janus_backend.rs @@ -32,19 +32,28 @@ impl Object { //////////////////////////////////////////////////////////////////////////////// -pub(crate) struct ListQuery { +pub(crate) struct ListQuery<'a> { + ids: Option<&'a [&'a AgentId]>, offset: Option, limit: Option, } -impl ListQuery { +impl<'a> ListQuery<'a> { pub(crate) fn new() -> Self { Self { + ids: None, offset: None, limit: None, } } + pub(crate) fn ids(self, ids: &'a [&'a AgentId]) -> Self { + Self { + ids: Some(ids), + ..self + } + } + pub(crate) fn offset(self, offset: i64) -> Self { Self { offset: Some(offset), @@ -63,6 +72,9 @@ impl ListQuery { use diesel::prelude::*; let mut q = janus_backend::table.into_boxed(); + if let Some(ids) = self.ids { + q = q.filter(janus_backend::id.eq_any(ids)) + } if let Some(offset) = self.offset { q = q.offset(offset); } diff --git a/src/db/janus_rtc_stream.rs b/src/db/janus_rtc_stream.rs index 66ff9a8e..e50bf098 100644 --- a/src/db/janus_rtc_stream.rs +++ b/src/db/janus_rtc_stream.rs @@ -109,10 +109,16 @@ impl FindQuery { //////////////////////////////////////////////////////////////////////////////// +const ACTIVE_SQL: &str = r#"( + lower("janus_rtc_stream"."time") is not null + and upper("janus_rtc_stream"."time") is null +)"#; + pub(crate) struct ListQuery { room_id: Option, rtc_id: Option, time: Option