diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index 7db34cad..eacfbf48 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -1495,6 +1495,165 @@ mod test { }); } + #[test] + fn connect_to_rtc_reserve_overflow() { + async_std::task::block_on(async { + let mut rng = rand::thread_rng(); + let db = TestDb::new(); + let mut authz = TestAuthz::new(); + let new_reader = TestAgent::new("web", "new-reader", USR_AUDIENCE); + + let (rtcs, backend) = db + .connection_pool() + .get() + .map(|conn| { + let now = Utc::now(); + + // Lets say we have a single backend with cap=800 + // Somehow reserves of all rooms that were allocated to it overflow its capacity + // We should allow users to connect to rooms with reserves if reserve and cap allows them + // But not allow to connect to room with no reserve + + // Setup three rooms with 500, 600 and none. + let room1 = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(now), + Bound::Excluded(now + Duration::hours(1)), + )) + .backend(RoomBackend::Janus) + .reserve(500) + .insert(&conn); + + let room2 = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(now), + Bound::Excluded(now + Duration::hours(1)), + )) + .reserve(600) + .backend(RoomBackend::Janus) + .insert(&conn); + + let room3 = factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(now), + Bound::Excluded(now + Duration::hours(1)), + )) + .backend(RoomBackend::Janus) + .insert(&conn); + + // Insert rtcs for each room. + let rtc1 = factory::Rtc::new(room1.id()).insert(&conn); + let rtc2 = factory::Rtc::new(room2.id()).insert(&conn); + let rtc3 = factory::Rtc::new(room3.id()).insert(&conn); + + // Insert alpha backend. + let backend1 = { + let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE); + let id = agent.agent_id().to_owned(); + factory::JanusBackend::new(id, rng.gen(), rng.gen()) + .balancer_capacity(700) + .capacity(800) + .insert(&conn) + }; + + // Insert writer for room 1 + factory::Agent::new() + .agent_id(TestAgent::new("web", "writer1", USR_AUDIENCE).agent_id()) + .room_id(room1.id()) + .status(AgentStatus::Connected) + .insert(&conn); + + // Insert 450 readers for room 1 + for i in 0..450 { + factory::Agent::new() + .agent_id( + TestAgent::new("web", &format!("reader1-{}", i), USR_AUDIENCE) + .agent_id(), + ) + .room_id(room1.id()) + .status(AgentStatus::Connected) + .insert(&conn); + } + + // Insert writer for room 3 + factory::Agent::new() + .agent_id(TestAgent::new("web", "writer3", USR_AUDIENCE).agent_id()) + .room_id(room2.id()) + .status(AgentStatus::Connected) + .insert(&conn); + + // Insert reader for room 3 + factory::Agent::new() + .agent_id(TestAgent::new("web", "reader3", USR_AUDIENCE).agent_id()) + .room_id(room3.id()) + .status(AgentStatus::Connected) + .insert(&conn); + + // We need these in_progress recordings since they bind each room to its respective backend + shared_helpers::insert_recording(&conn, &rtc1, &backend1); + shared_helpers::insert_recording(&conn, &rtc2, &backend1); + shared_helpers::insert_recording(&conn, &rtc3, &backend1); + + ([rtc1, rtc2, rtc3], backend1) + }) + .unwrap(); + + // Allow user to read the rtcs. + for rtc in rtcs.iter() { + let room_id = rtc.room_id().to_string(); + let rtc_id = rtc.id().to_string(); + let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; + authz.allow(new_reader.account_id(), object, "read"); + } + + let mut context = TestContext::new(db, authz); + + // First two rooms have reserves AND there is free capacity so we can connect to them + for rtc in rtcs.iter().take(2) { + let payload = ConnectRequest { + id: rtc.id(), + intent: ConnectIntent::Read, + }; + + // Make an rtc.connect request. + let messages = + handle_request::(&mut context, &new_reader, payload) + .await + .expect("RTC connect failed"); + + // Assert outgoing request goes to the expected backend. + let (_req, _reqp, topic) = + find_request::(messages.as_slice()); + + let expected_topic = format!( + "agents/{}/api/{}/in/{}", + backend.id(), + janus::JANUS_API_VERSION, + context.config().id, + ); + + assert_eq!(topic, &expected_topic); + } + + let payload = ConnectRequest { + id: rtcs[2].id(), + intent: ConnectIntent::Read, + }; + + // Last room has NO reserve AND there is free capacity BUT it was exhausted by first two rooms + // So we cant connect to this room + let err = handle_request::(&mut context, &new_reader, payload) + .await + .expect_err("Connected to RTC while expected capacity exceeded error"); + + assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE); + assert_eq!(err.kind(), "capacity_exceeded"); + }); + } + #[test] fn connect_to_rtc_not_authorized() { async_std::task::block_on(async { diff --git a/src/db/janus_backend.rs b/src/db/janus_backend.rs index 080ee252..5d332662 100644 --- a/src/db/janus_backend.rs +++ b/src/db/janus_backend.rs @@ -337,6 +337,8 @@ const FREE_CAPACITY_SQL: &str = r#" janus_backend_load AS ( SELECT backend_id, + SUM(taken) AS total_taken, + SUM(reserve) AS total_reserve, SUM(GREATEST(taken, reserve)) AS load FROM ( SELECT DISTINCT ON(backend_id, room_id) @@ -356,13 +358,25 @@ const FREE_CAPACITY_SQL: &str = r#" GROUP BY backend_id ) SELECT - GREATEST( - COALESCE(jb.capacity, 2147483647) - - COALESCE(jbl.load, 0) - + GREATEST(COALESCE(rl.reserve, 0) - COALESCE(rl.taken, 0), 0), - 0 + ( + CASE + WHEN COALESCE(jb.capacity, 2147483647) <= COALESCE(jbl.total_taken, 0) THEN 0 + ELSE ( + CASE + WHEN COALESCE(ar.reserve, 0) > COALESCE(rl.taken, 0) + THEN LEAST( + COALESCE(ar.reserve, 0) - COALESCE(rl.taken, 0), + COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.total_taken, 0) + ) + ELSE + GREATEST(COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.load, 0), 0) + END + ) + END )::INT AS free_capacity FROM rtc + LEFT JOIN active_room AS ar + ON ar.id = rtc.room_id LEFT JOIN room_load as rl ON rl.room_id = rtc.room_id LEFT JOIN recording AS rec