From 11808eb1aa9eaed00e24a7961feca3aef0d5b910 Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Wed, 5 Aug 2020 15:23:53 +0300 Subject: [PATCH] Reserve and limit overhaul (#123) * Rename room subscribers limit to reserve * Remove subscribers count checking on room enter * Overhaul limiting and reserve consideration --- docs/src/api/room/create.md | 12 +- docs/src/api/room/update.md | 14 +- .../down.sql | 1 + .../up.sql | 1 + src/app/endpoint/room.rs | 95 ++-------- src/app/endpoint/rtc.rs | 169 +++++++++++++++++- src/app/endpoint/rtc_signal.rs | 151 +--------------- src/db/agent.rs | 45 +---- src/db/janus_backend.rs | 59 ++++-- src/db/room.rs | 27 +-- src/schema.rs | 2 +- src/test_helpers/factory.rs | 20 +-- 12 files changed, 263 insertions(+), 333 deletions(-) create mode 100644 migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/down.sql create mode 100644 migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/up.sql diff --git a/docs/src/api/room/create.md b/docs/src/api/room/create.md index b8a9d335..91a90816 100644 --- a/docs/src/api/room/create.md +++ b/docs/src/api/room/create.md @@ -15,12 +15,12 @@ method | String | _required_ | Always `room.create`. **Payload** -Name | Type | Default | Description ------------------ | ---------- | ---------- | ------------------ -time | [i64, i64) | _required_ | A [lt, rt) range of unix time (seconds) or null (unbounded). -audience | String | _required_ | The room audience. -backend | String | none | The room backend. Available values: janus, none. -subscribers_limit | i32 | _optional_ | The maximum number of simultaneously entered subscribers allowed. +Name | Type | Default | Description +-------- | ---------- | ---------- | ------------------ +time | [i64, i64) | _required_ | A [lt, rt) range of unix time (seconds) or null (unbounded). +audience | String | _required_ | The room audience. +backend | String | none | The room backend. Available values: janus, none. +reserve | i32 | _optional_ | The number of slots for subscribers to reserve on the server. ## Unicast response diff --git a/docs/src/api/room/update.md b/docs/src/api/room/update.md index 7e176470..08383624 100644 --- a/docs/src/api/room/update.md +++ b/docs/src/api/room/update.md @@ -14,13 +14,13 @@ method | String | _required_ | Always `room.update`. **Payload** -Name | Type | Default | Description ------------------ | ---------- | ---------- | ------------------ -id | String | _required_ | The room identifier. The room must not be expired. -time | [i64, i64) | _optional_ | A [lt, rt) range of unix time (seconds) or null (unbounded). -audience | String | _optional_ | The room audience. -backend | String | _optional_ | The room backend. Available values: janus, none. -subscribers_limit | i32 | _optional_ | The maximum number of simultaneously entered subscribers allowed. +Name | Type | Default | Description +-------- | ---------- | ---------- | ------------------ +id | String | _required_ | The room identifier. The room must not be expired. +time | [i64, i64) | _optional_ | A [lt, rt) range of unix time (seconds) or null (unbounded). +audience | String | _optional_ | The room audience. +backend | String | _optional_ | The room backend. Available values: janus, none. +reserve | i32 | _optional_ | The number of slots for subscribers to reserve on the server. ## Unicast response diff --git a/migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/down.sql b/migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/down.sql new file mode 100644 index 00000000..713054f9 --- /dev/null +++ b/migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/down.sql @@ -0,0 +1 @@ +ALTER TABLE room RENAME COLUMN reserve TO subscribers_limit; diff --git a/migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/up.sql b/migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/up.sql new file mode 100644 index 00000000..ee17963c --- /dev/null +++ b/migrations/2020-08-03-212533_rename_subscribers_limit_to_reserve/up.sql @@ -0,0 +1 @@ +ALTER TABLE room RENAME COLUMN subscribers_limit TO reserve; diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index e06a7b58..98255f08 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -42,7 +42,7 @@ pub(crate) struct CreateRequest { audience: String, #[serde(default = "CreateRequest::default_backend")] backend: db::room::RoomBackend, - subscribers_limit: Option, + reserve: Option, } impl CreateRequest { @@ -75,8 +75,8 @@ impl RequestHandler for CreateHandler { let mut q = db::room::InsertQuery::new(payload.time, &payload.audience, payload.backend); - if let Some(subscribers_limit) = payload.subscribers_limit { - q = q.subscribers_limit(subscribers_limit); + if let Some(reserve) = payload.reserve { + q = q.reserve(reserve); } let conn = context.db().get()?; @@ -300,28 +300,12 @@ impl RequestHandler for EnterHandler { let conn = context.db().get()?; // Find opened room. - let room = db::room::FindQuery::new() + db::room::FindQuery::new() .id(payload.id) .time(db::room::now()) .execute(&conn)? .ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; - - // Check if not full house in the room. - if let Some(subscribers_limit) = room.subscribers_limit() { - let agents_count = db::agent::RoomCountQuery::new(room.id()).execute(&conn)?; - - if agents_count > subscribers_limit as i64 { - let err = format!( - "Subscribers limit in the room has been reached ({})", - subscribers_limit - ); - - return Err(err).status(ResponseStatus::SERVICE_UNAVAILABLE)?; - } - } - - room + .status(ResponseStatus::NOT_FOUND)? }; // Authorize subscribing to the room's events. @@ -482,7 +466,7 @@ mod test { time: time.clone(), audience: USR_AUDIENCE.to_owned(), backend: db::room::RoomBackend::Janus, - subscribers_limit: Some(123), + reserve: Some(123), }; let messages = handle_request::(&context, &agent, payload) @@ -495,7 +479,7 @@ mod test { assert_eq!(room.audience(), USR_AUDIENCE); assert_eq!(room.time(), &time); assert_eq!(room.backend(), db::room::RoomBackend::Janus); - assert_eq!(room.subscribers_limit(), Some(123)); + assert_eq!(room.reserve(), Some(123)); // Assert notification. let (room, evp, topic) = find_event::(messages.as_slice()); @@ -504,7 +488,7 @@ mod test { assert_eq!(room.audience(), USR_AUDIENCE); assert_eq!(room.time(), &time); assert_eq!(room.backend(), db::room::RoomBackend::Janus); - assert_eq!(room.subscribers_limit(), Some(123)); + assert_eq!(room.reserve(), Some(123)); }); } @@ -519,7 +503,7 @@ mod test { time: (Bound::Included(Utc::now()), Bound::Unbounded), audience: USR_AUDIENCE.to_owned(), backend: db::room::RoomBackend::Janus, - subscribers_limit: None, + reserve: None, }; let err = handle_request::(&context, &agent, payload) @@ -664,9 +648,7 @@ mod test { Bound::Excluded(now + Duration::hours(3)), ); - let payload = UpdateRequest::new(room.id()) - .time(time) - .subscribers_limit(Some(123)); + let payload = UpdateRequest::new(room.id()).time(time).reserve(Some(123)); let messages = handle_request::(&context, &agent, payload) .await @@ -679,7 +661,7 @@ mod test { assert_eq!(resp_room.audience(), room.audience()); assert_eq!(resp_room.time(), &time); assert_eq!(resp_room.backend(), db::room::RoomBackend::Janus); - assert_eq!(resp_room.subscribers_limit(), Some(123)); + assert_eq!(resp_room.reserve(), Some(123)); }); } @@ -820,8 +802,6 @@ mod test { } mod enter { - use chrono::SubsecRound; - use crate::test_helpers::prelude::*; use super::super::*; @@ -878,59 +858,6 @@ mod test { }); } - #[test] - fn enter_room_full_house() { - async_std::task::block_on(async { - let db = TestDb::new(); - - // Create room and fill it up with agents. - let room = { - let conn = db - .connection_pool() - .get() - .expect("Failed to get DB connection"); - - let now = Utc::now().trunc_subsecs(0); - - let room = factory::Room::new() - .audience(USR_AUDIENCE) - .time((Bound::Included(now), Bound::Unbounded)) - .backend(db::room::RoomBackend::Janus) - .subscribers_limit(1) - .insert(&conn); - - let publisher = TestAgent::new("web", "publisher", USR_AUDIENCE); - shared_helpers::insert_agent(&conn, publisher.agent_id(), room.id()); - - let subscriber = TestAgent::new("web", "subscriber", USR_AUDIENCE); - shared_helpers::insert_agent(&conn, subscriber.agent_id(), room.id()); - - room - }; - - // Allow agent to subscribe to the rooms' events. - let agent = TestAgent::new("web", "user123", USR_AUDIENCE); - let mut authz = TestAuthz::new(); - let room_id = room.id().to_string(); - - authz.allow( - agent.account_id(), - vec!["rooms", &room_id, "events"], - "subscribe", - ); - - // Make room.enter request. - let context = TestContext::new(db, authz); - let payload = EnterRequest { id: room.id() }; - - let err = handle_request::(&context, &agent, payload) - .await - .expect_err("Unexpected success on room entering"); - - assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE); - }); - } - #[test] fn enter_room_not_authorized() { async_std::task::block_on(async { diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index c85703e6..46a9465c 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -275,6 +275,7 @@ impl RequestHandler for ConnectHandler { .authorize(room.audience(), reqp, object, "read") .await?; + // Choose backend to connect. let backend = { let conn = context.db().get()?; @@ -293,9 +294,9 @@ impl RequestHandler for ConnectHandler { .execute(&conn)? .ok_or("no backend found for stream") .status(ResponseStatus::UNPROCESSABLE_ENTITY)?, - None => db::janus_backend::least_loaded(&conn)? + None => db::janus_backend::least_loaded(room.id(), &conn)? .ok_or("no available backends") - .status(ResponseStatus::UNPROCESSABLE_ENTITY)?, + .status(ResponseStatus::SERVICE_UNAVAILABLE)?, } }; @@ -604,10 +605,15 @@ mod test { } mod connect { + use std::ops::Bound; + + use chrono::{Duration, Utc}; + use rand::Rng; use serde_json::Value as JsonValue; use svc_agent::{AccountId, AgentId}; use crate::backend::janus; + use crate::db::room::RoomBackend; use crate::test_helpers::prelude::*; use crate::util::from_base64; @@ -693,7 +699,7 @@ mod test { let messages = handle_request::(&context, &agent, payload) .await - .expect("RTC reading failed"); + .expect("RTC connect failed"); // Assert outgoing request to Janus. let (req, _reqp, topic) = find_request::(messages.as_slice()); @@ -770,7 +776,7 @@ mod test { let messages = handle_request::(&context, &agent, payload) .await - .expect("rtc reading failed"); + .expect("RTC connect failed"); // Ensure we're balanced to the backend with the stream. let (req, _reqp, topic) = find_request::(messages.as_slice()); @@ -839,7 +845,7 @@ mod test { let messages = handle_request::(&context, &agent, payload) .await - .expect("rtc reading failed"); + .expect("RTC connect failed"); // Ensure we're balanced to the least loaded backend. let (req, _reqp, topic) = find_request::(messages.as_slice()); @@ -856,6 +862,159 @@ mod test { }); } + #[test] + fn connect_to_rtc_with_reservation() { + async_std::task::block_on(async { + let mut rng = rand::thread_rng(); + let db = TestDb::new(); + let mut authz = TestAuthz::new(); + + let (rtc, backend) = db + .connection_pool() + .get() + .map(|conn| { + let now = Utc::now(); + + // Insert room with reserve. + let room1 = shared_helpers::insert_room(&conn); + + let room2 = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(now), + Bound::Excluded(now + Duration::hours(1)), + )) + .backend(RoomBackend::Janus) + .reserve(15) + .insert(&conn); + + // Insert rtcs. + let rtc1 = factory::Rtc::new(room1.id()).insert(&conn); + let rtc2 = factory::Rtc::new(room2.id()).insert(&conn); + + // The first backend is big enough but has some load. + let backend1_id = { + let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE); + agent.agent_id().to_owned() + }; + + let backend1 = + factory::JanusBackend::new(backend1_id, rng.gen(), rng.gen()) + .subscribers_limit(20) + .insert(&conn); + + // The second backend is too small but has no load. + let backend2_id = { + let agent = TestAgent::new("beta", "janus", SVC_AUDIENCE); + agent.agent_id().to_owned() + }; + + factory::JanusBackend::new(backend2_id, rng.gen(), rng.gen()) + .subscribers_limit(5) + .insert(&conn); + + // Insert stream. + factory::JanusRtcStream::new(USR_AUDIENCE) + .backend(&backend1) + .rtc(&rtc1) + .insert(&conn); + + // Insert active agent. + let agent = TestAgent::new("web", "user456", USR_AUDIENCE); + shared_helpers::insert_agent(&conn, agent.agent_id(), rtc1.room_id()); + + // It should balance to the first one despite of the load. + (rtc2, backend1) + }) + .unwrap(); + + // Allow user to read the rtc. + let agent = TestAgent::new("web", "user123", USR_AUDIENCE); + let room_id = rtc.room_id().to_string(); + let rtc_id = rtc.id().to_string(); + let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; + authz.allow(agent.account_id(), object, "read"); + + // Make rtc.connect request. + let context = TestContext::new(db, authz); + let payload = ConnectRequest { id: rtc.id() }; + + let messages = handle_request::(&context, &agent, payload) + .await + .expect("RTC connect failed"); + + // Ensure we're balanced to the right backend. + let (req, _reqp, topic) = find_request::(messages.as_slice()); + + let expected_topic = format!( + "agents/{}/api/{}/in/{}", + backend.id(), + janus::JANUS_API_VERSION, + context.config().id, + ); + + assert_eq!(topic, expected_topic); + assert_eq!(req.session_id, backend.session_id()); + }); + } + + #[test] + fn connect_to_rtc_full_server() { + async_std::task::block_on(async { + let mut rng = rand::thread_rng(); + let db = TestDb::new(); + let mut authz = TestAuthz::new(); + + let rtc = db + .connection_pool() + .get() + .map(|conn| { + // Insert rtc. + let rtc = shared_helpers::insert_rtc(&conn); + + // Insert backend. + let backend_id = { + let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE); + agent.agent_id().to_owned() + }; + + let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen()) + .subscribers_limit(1) + .insert(&conn); + + // Insert active stream. + factory::JanusRtcStream::new(USR_AUDIENCE) + .backend(&backend) + .rtc(&rtc) + .insert(&conn); + + // Insert active agent. + let agent = TestAgent::new("web", "user456", USR_AUDIENCE); + shared_helpers::insert_agent(&conn, agent.agent_id(), rtc.room_id()); + + rtc + }) + .unwrap(); + + // Allow user to read the rtc. + let agent = TestAgent::new("web", "user123", USR_AUDIENCE); + let room_id = rtc.room_id().to_string(); + let rtc_id = rtc.id().to_string(); + let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; + authz.allow(agent.account_id(), object, "read"); + + // Make rtc.connect request. + let context = TestContext::new(db, authz); + let payload = ConnectRequest { id: rtc.id() }; + + let err = handle_request::(&context, &agent, payload) + .await + .expect_err("Unexpected success on rtc connecting"); + + assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE); + }); + } + #[test] fn connect_to_rtc_not_authorized() { async_std::task::block_on(async { diff --git a/src/app/endpoint/rtc_signal.rs b/src/app/endpoint/rtc_signal.rs index 304c3d84..53f917ef 100644 --- a/src/app/endpoint/rtc_signal.rs +++ b/src/app/endpoint/rtc_signal.rs @@ -71,32 +71,6 @@ impl RequestHandler for CreateHandler { .status(ResponseStatus::BAD_REQUEST)?; if is_recvonly { - // Check for full house on the server - { - let conn = context.db().get()?; - let backend_id = payload.handle_id.backend_id(); - - let backend = db::janus_backend::FindQuery::new() - .id(backend_id.to_owned()) - .execute(&conn)? - .ok_or_else(|| format!("backend not found id = '{}'", backend_id)) - .status(ResponseStatus::NOT_FOUND)?; - - if let Some(subscribers_limit) = backend.subscribers_limit() { - let agents_count = db::agent::JanusBackendCountQuery::new(backend.id()) - .execute(&conn)?; - - if agents_count > subscribers_limit as i64 { - let err = format!( - "Subscribers limit on the server has been reached ({})", - subscribers_limit - ); - - return Err(err).status(ResponseStatus::SERVICE_UNAVAILABLE)?; - } - } - } - // Authorization let authz_time = authorize(context, &payload, reqp, "read").await?; @@ -281,7 +255,6 @@ fn is_sdp_recvonly(jsep: &JsonValue) -> anyhow::Result { mod test { mod create { use diesel::prelude::*; - use rand::Rng; use serde::Deserialize; use serde_json::json; use uuid::Uuid; @@ -292,7 +265,7 @@ mod test { use super::super::*; - const SENDRECV_SDP_OFFER: &str = r#" + const SDP_OFFER: &str = r#" v=0 o=- 20518 0 IN IP4 0.0.0.0 s=- @@ -339,53 +312,6 @@ mod test { a=extmap:2 urn:ietf:params:rtp-hdrext:sdes:mid "#; - const RECVONLY_SDP_OFFER: &str = r#" - v=0 - o=- 20518 0 IN IP4 0.0.0.0 - s=- - t=0 0 - a=group:BUNDLE audio video - a=group:LS audio video - a=ice-options:trickle - m=audio 54609 UDP/TLS/RTP/SAVPF 109 0 8 - c=IN IP4 203.0.113.141 - a=mid:audio - a=msid:ma ta - a=recvonly - a=rtpmap:109 opus/48000/2 - a=rtpmap:0 PCMU/8000 - a=rtpmap:8 PCMA/8000 - a=maxptime:120 - a=ice-ufrag:074c6550 - a=ice-pwd:a28a397a4c3f31747d1ee3474af08a068 - a=fingerprint:sha-256 19:E2:1C:3B:4B:9F:81:E6:B8:5C:F4:A5:A8:D8:73:04:BB:05:2F:70:9F:04:A9:0E:05:E9:26:33:E8:70:88:A2 - a=setup:actpass - a=tls-id:1 - a=rtcp-mux - a=rtcp-mux-only - a=rtcp-rsize - a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level - a=extmap:2 urn:ietf:params:rtp-hdrext:sdes:mid - a=candidate:0 1 UDP 2122194687 192.0.2.4 61665 typ host - a=candidate:1 1 UDP 1685987071 203.0.113.141 54609 typ srflx raddr 192.0.2.4 rport 61665 - a=end-of-candidates - m=video 54609 UDP/TLS/RTP/SAVPF 99 120 - c=IN IP4 203.0.113.141 - a=mid:video - a=msid:ma tb - a=recvonly - a=rtpmap:99 H264/90000 - a=fmtp:99 profile-level-id=4d0028;packetization-mode=1 - a=rtpmap:120 VP8/90000 - a=rtcp-fb:99 nack - a=rtcp-fb:99 nack pli - a=rtcp-fb:99 ccm fir - a=rtcp-fb:120 nack - a=rtcp-fb:120 nack pli - a=rtcp-fb:120 ccm fir - a=extmap:2 urn:ietf:params:rtp-hdrext:sdes:mid - "#; - #[derive(Debug, PartialEq, Deserialize)] struct RtcSignalCreateJanusRequestOffer { janus: String, @@ -447,7 +373,7 @@ mod test { let payload = CreateRequest { handle_id, - jsep: json!({ "type": "offer", "sdp": SENDRECV_SDP_OFFER }), + jsep: json!({ "type": "offer", "sdp": SDP_OFFER }), label: Some(String::from("whatever")), }; @@ -473,7 +399,7 @@ mod test { assert_eq!(payload.body.method, "stream.create"); assert_eq!(payload.body.id, rtc.id()); assert_eq!(payload.jsep.r#type, "offer"); - assert_eq!(payload.jsep.sdp, SENDRECV_SDP_OFFER); + assert_eq!(payload.jsep.sdp, SDP_OFFER); // Assert rtc stream presence in the DB. let conn = context.db().get().unwrap(); @@ -490,75 +416,6 @@ mod test { }); } - #[test] - fn create_rtc_signal_full_house() { - async_std::task::block_on(async { - let db = TestDb::new(); - let mut authz = TestAuthz::new(); - - // Insert a janus backend and an rtc. - let (backend, rtc) = db - .connection_pool() - .get() - .map(|conn| { - let mut rng = rand::thread_rng(); - let agent = TestAgent::new("alpha", "janus-gateway", SVC_AUDIENCE); - let agent_id = agent.agent_id().to_owned(); - - let backend = factory::JanusBackend::new(agent_id, rng.gen(), rng.gen()) - .subscribers_limit(1) - .insert(&conn); - - let rtc = shared_helpers::insert_rtc(&conn); - - factory::JanusRtcStream::new(USR_AUDIENCE) - .backend(&backend) - .rtc(&rtc) - .insert(&conn); - - let publisher = TestAgent::new("web", "publisher", USR_AUDIENCE); - shared_helpers::insert_agent(&conn, publisher.agent_id(), rtc.room_id()); - - let subscriber = TestAgent::new("web", "subscriber", USR_AUDIENCE); - shared_helpers::insert_agent(&conn, subscriber.agent_id(), rtc.room_id()); - - (backend, rtc) - }) - .unwrap(); - - // Allow user to read the rtc. - let agent = TestAgent::new("web", "user123", USR_AUDIENCE); - let room_id = rtc.room_id().to_string(); - let rtc_id = rtc.id().to_string(); - let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; - authz.allow(agent.account_id(), object, "read"); - - // Make rtc_signal.create request. - let context = TestContext::new(db, authz); - let rtc_stream_id = Uuid::new_v4(); - - let handle_id = HandleId::new( - rtc_stream_id, - rtc.id(), - backend.handle_id(), - backend.session_id(), - backend.id().to_owned(), - ); - - let payload = CreateRequest { - handle_id, - jsep: json!({ "type": "offer", "sdp": RECVONLY_SDP_OFFER }), - label: Some(String::from("whatever")), - }; - - let err = handle_request::(&context, &agent, payload) - .await - .expect_err("Unexpected success on rtc creation"); - - assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE); - }); - } - #[test] fn create_rtc_signal_for_offer_unauthorized() { async_std::task::block_on(async { @@ -591,7 +448,7 @@ mod test { let payload = CreateRequest { handle_id, - jsep: json!({ "type": "offer", "sdp": SENDRECV_SDP_OFFER }), + jsep: json!({ "type": "offer", "sdp": SDP_OFFER }), label: Some(String::from("whatever")), }; diff --git a/src/db/agent.rs b/src/db/agent.rs index 64d2750d..2a503f22 100644 --- a/src/db/agent.rs +++ b/src/db/agent.rs @@ -6,7 +6,7 @@ use svc_agent::AgentId; use uuid::Uuid; use super::room::Object as Room; -use crate::schema::{agent, janus_rtc_stream, room, rtc}; +use crate::schema::agent; //////////////////////////////////////////////////////////////////////////////// @@ -126,49 +126,6 @@ impl<'a> ListQuery<'a> { //////////////////////////////////////////////////////////////////////////////// -pub(crate) struct RoomCountQuery { - room_id: Uuid, -} - -impl RoomCountQuery { - pub(crate) fn new(room_id: Uuid) -> Self { - Self { room_id } - } - - pub(crate) fn execute(&self, conn: &PgConnection) -> Result { - use diesel::prelude::*; - - agent::table - .filter(agent::room_id.eq(self.room_id)) - .select(diesel::dsl::count(agent::id)) - .get_result(conn) - } -} - -//////////////////////////////////////////////////////////////////////////////// - -pub(crate) struct JanusBackendCountQuery<'a> { - backend_id: &'a AgentId, -} - -impl<'a> JanusBackendCountQuery<'a> { - pub(crate) fn new(backend_id: &'a AgentId) -> Self { - Self { backend_id } - } - - pub(crate) fn execute(&self, conn: &PgConnection) -> Result { - use diesel::prelude::*; - - agent::table - .inner_join(room::table.inner_join(rtc::table.inner_join(janus_rtc_stream::table))) - .filter(janus_rtc_stream::backend_id.eq(self.backend_id)) - .select(diesel::dsl::count(agent::id)) - .get_result(conn) - } -} - -//////////////////////////////////////////////////////////////////////////////// - #[derive(Debug, Insertable)] #[table_name = "agent"] pub(crate) struct InsertQuery<'a> { diff --git a/src/db/janus_backend.rs b/src/db/janus_backend.rs index 8820f712..a8ed0ec6 100644 --- a/src/db/janus_backend.rs +++ b/src/db/janus_backend.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use diesel::pg::PgConnection; use diesel::result::Error; use svc_agent::AgentId; +use uuid::Uuid; use crate::schema::janus_backend; @@ -29,10 +30,6 @@ impl Object { pub(crate) fn session_id(&self) -> i64 { self.session_id } - - pub(crate) fn subscribers_limit(&self) -> Option { - self.subscribers_limit - } } //////////////////////////////////////////////////////////////////////////////// @@ -167,24 +164,54 @@ impl<'a> DeleteQuery<'a> { //////////////////////////////////////////////////////////////////////////////// +// Returns the least loaded backend capable to host the room with its reserve considering: +// - actual number of online agents; +// - optional backend subscribers limit; +// - optional reserved capacity. const LEAST_LOADED_SQL: &str = r#" - select jb.* - from janus_backend as jb - left join ( - select * - from janus_rtc_stream - where lower(time) is not null - and upper(time) is null - ) as jrs - on jrs.backend_id = jb.id - group by jb.id - order by count(jrs.id) + WITH + room_load AS ( + SELECT + r.id AS room_id, + GREATEST(COALESCE(r.reserve, 0), COUNT(a.id)) AS taken + FROM room AS r + LEFT JOIN agent AS a + ON a.room_id = r.id + GROUP BY r.id + ), + janus_backend_load AS ( + SELECT + jrs.backend_id, + SUM(taken) AS taken + FROM room_load as rl + LEFT JOIN rtc + ON rtc.room_id = rl.room_id + LEFT JOIN janus_rtc_stream AS jrs + ON jrs.rtc_id = rtc.id + GROUP BY jrs.backend_id + ) + SELECT jb.* + FROM janus_backend AS jb + LEFT JOIN janus_rtc_stream AS jrs + ON jrs.backend_id = jb.id + LEFT JOIN rtc + ON rtc.id = jrs.rtc_id + LEFT JOIN janus_backend_load AS jbl + ON jbl.backend_id = jb.id + LEFT JOIN room AS r2 + ON 1 = 1 + WHERE r2.id = $1 + AND COALESCE(jb.subscribers_limit, 2147483647) - COALESCE(jbl.taken, 0) > COALESCE(r2.reserve, 0) + ORDER BY COALESCE(jb.subscribers_limit, 2147483647) - COALESCE(jbl.taken, 0) DESC + LIMIT 1 "#; -pub(crate) fn least_loaded(conn: &PgConnection) -> Result, Error> { +pub(crate) fn least_loaded(room_id: Uuid, conn: &PgConnection) -> Result, Error> { use diesel::prelude::*; + use diesel::sql_types::Uuid; diesel::sql_query(LEAST_LOADED_SQL) + .bind::(room_id) .get_result(conn) .optional() } diff --git a/src/db/room.rs b/src/db/room.rs index 6e47ad80..6f28f866 100644 --- a/src/db/room.rs +++ b/src/db/room.rs @@ -43,7 +43,7 @@ type AllColumns = ( room::audience, room::created_at, room::backend, - room::subscribers_limit, + room::reserve, ); const ALL_COLUMNS: AllColumns = ( @@ -52,7 +52,7 @@ const ALL_COLUMNS: AllColumns = ( room::audience, room::created_at, room::backend, - room::subscribers_limit, + room::reserve, ); //////////////////////////////////////////////////////////////////////////////// @@ -86,7 +86,7 @@ pub(crate) struct Object { created_at: DateTime, backend: RoomBackend, #[serde(skip_serializing_if = "Option::is_none")] - subscribers_limit: Option, + reserve: Option, } impl Object { @@ -107,8 +107,9 @@ impl Object { self.backend } - pub(crate) fn subscribers_limit(&self) -> Option { - self.subscribers_limit + #[cfg(test)] + pub(crate) fn reserve(&self) -> Option { + self.reserve } } @@ -206,7 +207,7 @@ pub(crate) struct InsertQuery<'a> { time: Time, audience: &'a str, backend: RoomBackend, - subscribers_limit: Option, + reserve: Option, } impl<'a> InsertQuery<'a> { @@ -215,13 +216,13 @@ impl<'a> InsertQuery<'a> { time, audience, backend, - subscribers_limit: None, + reserve: None, } } - pub(crate) fn subscribers_limit(self, value: i32) -> Self { + pub(crate) fn reserve(self, value: i32) -> Self { Self { - subscribers_limit: Some(value), + reserve: Some(value), ..self } } @@ -263,7 +264,7 @@ pub(crate) struct UpdateQuery { time: Option