From f797bbf243f23f4ba9f183425ed26296ef7ab24a Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Thu, 8 Oct 2020 16:27:28 +0300 Subject: [PATCH] Add falling back to the least loaded backend (#163) --- Cargo.lock | 1 + Cargo.toml | 1 + src/app/endpoint/rtc.rs | 108 +++++++++++++++++++++++++----------- src/db/janus_backend.rs | 58 +++++++++++++++++++ src/db/room.rs | 1 - src/test_helpers/factory.rs | 13 +++++ 6 files changed, 149 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e345d5bc..6db6bf9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -371,6 +371,7 @@ dependencies = [ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "openssl 0.10.29 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "sentry 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.115 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.53 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 1ca23e90..b8a3ba74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ http = "0.1" lazy_static = "1.4" openssl = "*" rand = "0.7" +sentry = "0.18" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index 9b188d97..5961f151 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -298,6 +298,9 @@ impl RequestHandler for ConnectHandler { // There are 3 cases: // 1. Connecting as writer for the first time. There's no recording in that case. // Select the most loaded backend that is capable to host the room's reservation. + // If there's no capable backend then select the least loaded and send a warning + // to Sentry. If there are no backends at all then return `no available backends` + // error and also send it to Sentry. // 2. Connecting as reader with existing recording. Choose the backend of the active // recording because Janus doesn't support clustering and it must be the same server // that the writer is connected to. @@ -311,9 +314,34 @@ impl RequestHandler for ConnectHandler { .execute(&conn)? .ok_or("no backend found for stream") .status(ResponseStatus::UNPROCESSABLE_ENTITY)?, - None => db::janus_backend::most_loaded(room.id(), &conn)? - .ok_or("no available backends") - .status(ResponseStatus::SERVICE_UNAVAILABLE)?, + None => match db::janus_backend::most_loaded(room.id(), &conn)? { + Some(backend) => backend, + None => db::janus_backend::least_loaded(room.id(), &conn)? + .map(|backend| { + use sentry::protocol::{value::Value, Event, Level}; + + let mut extra = std::collections::BTreeMap::new(); + extra.insert(String::from("room_id"), Value::from(room_id)); + extra.insert(String::from("rtc_id"), Value::from(rtc_id)); + let backend_id = backend.id().to_string(); + extra.insert(String::from("backend_id"), Value::from(backend_id)); + + if let Some(reserve) = room.reserve() { + extra.insert(String::from("reserve"), Value::from(reserve)); + } + + sentry::capture_event(Event { + message: Some(String::from("No capable backends to host the reserve; falling back to the least loaded backend")), + level: Level::Warning, + extra, + ..Default::default() + }); + + backend + }) + .ok_or("no available backends") + .status(ResponseStatus::SERVICE_UNAVAILABLE)?, + }, }; // Create recording if a writer connects for the first time. @@ -1326,20 +1354,21 @@ mod test { } #[test] - fn connect_to_rtc_reservation_with_stopped_stream() { + fn connect_to_rtc_too_big_reserve() { async_std::task::block_on(async { let mut rng = rand::thread_rng(); let db = TestDb::new(); let mut authz = TestAuthz::new(); - let writer = TestAgent::new("web", "writer", USR_AUDIENCE); + let writer1 = TestAgent::new("web", "writer1", USR_AUDIENCE); + let writer2 = TestAgent::new("web", "writer2", USR_AUDIENCE); - let rtc = db + let (rtc, backend) = db .connection_pool() .get() .map(|conn| { let now = Utc::now(); - // Insert rooms with reserves. + // Insert room with big reserve and another one for load. let room1 = factory::Room::new() .audience(USR_AUDIENCE) .time(( @@ -1347,7 +1376,7 @@ mod test { Bound::Excluded(now + Duration::hours(1)), )) .backend(RoomBackend::Janus) - .reserve(15) + .reserve(100) .insert(&conn); let room2 = factory::Room::new() @@ -1357,38 +1386,43 @@ mod test { Bound::Excluded(now + Duration::hours(1)), )) .backend(RoomBackend::Janus) - .reserve(15) .insert(&conn); // Insert rtcs. let rtc1 = factory::Rtc::new(room1.id()).insert(&conn); - let rtc2 = factory::Rtc::new(room2.id()).insert(&conn); + let _rtc2 = factory::Rtc::new(room2.id()).insert(&conn); - // Insert a backend with capacity less than the sum of reserves. - let backend_id = { + // Insert backends with low balancer capacity. + let backend_id1 = { let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE); agent.agent_id().to_owned() }; - let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen()) - .capacity(20) - .insert(&conn); + let backend1 = + factory::JanusBackend::new(backend_id1, rng.gen(), rng.gen()) + .balancer_capacity(20) + .capacity(200) + .insert(&conn); - // Insert a stopped stream in the first room. - let stream = factory::JanusRtcStream::new(USR_AUDIENCE) - .backend(&backend) - .rtc(&rtc1) - .insert(&conn); + let backend_id2 = { + let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE); + agent.agent_id().to_owned() + }; - crate::db::janus_rtc_stream::start(stream.id(), &conn).unwrap(); - crate::db::janus_rtc_stream::stop(stream.id(), &conn).unwrap(); + let _backend2 = + factory::JanusBackend::new(backend_id2, rng.gen(), rng.gen()) + .balancer_capacity(50) + .capacity(200) + .insert(&conn); - factory::Recording::new() - .rtc(&rtc1) - .backend(&backend) + // Add some load on the biggest one. + factory::Agent::new() + .agent_id(writer2.agent_id()) + .room_id(room2.id()) + .status(AgentStatus::Ready) .insert(&conn); - rtc2 + (rtc1, backend1) }) .unwrap(); @@ -1396,11 +1430,11 @@ mod test { let room_id = rtc.room_id().to_string(); let rtc_id = rtc.id().to_string(); let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; - authz.allow(writer.account_id(), object, "update"); + authz.allow(writer1.account_id(), object, "update"); // Make an rtc.connect request. - // The reserve of the first room must be taken into account despited of the - // stream has been already stopped. + // Despite of none of the backends are capable to host the reserve it should + // select the least loaded one. let mut context = TestContext::new(db, authz); let payload = ConnectRequest { @@ -1408,11 +1442,21 @@ mod test { intent: ConnectIntent::Write, }; - let err = handle_request::(&mut context, &writer, payload) + let messages = handle_request::(&mut context, &writer1, payload) .await - .expect_err("Unexpected success on rtc connecting"); + .expect("RTC connect failed"); - assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE); + // Assert outgoing request goes to the expected backend. + let (_req, _reqp, topic) = find_request::(messages.as_slice()); + + let expected_topic = format!( + "agents/{}/api/{}/in/{}", + backend.id(), + janus::JANUS_API_VERSION, + context.config().id, + ); + + assert_eq!(topic, &expected_topic); }); } diff --git a/src/db/janus_backend.rs b/src/db/janus_backend.rs index e2133628..7db51007 100644 --- a/src/db/janus_backend.rs +++ b/src/db/janus_backend.rs @@ -253,6 +253,64 @@ pub(crate) fn most_loaded(room_id: Uuid, conn: &PgConnection) -> Result Result, Error> { + use diesel::prelude::*; + use diesel::sql_types::Uuid; + + diesel::sql_query(LEAST_LOADED_SQL) + .bind::(room_id) + .get_result(conn) + .optional() +} + //////////////////////////////////////////////////////////////////////////////// // Similar to the previous one but returns the number of free slots for the room on the backend diff --git a/src/db/room.rs b/src/db/room.rs index d14fe509..aed14425 100644 --- a/src/db/room.rs +++ b/src/db/room.rs @@ -113,7 +113,6 @@ impl Object { self.backend } - #[cfg(test)] pub(crate) fn reserve(&self) -> Option { self.reserve } diff --git a/src/test_helpers/factory.rs b/src/test_helpers/factory.rs index 6a6b1bb6..f78270f1 100644 --- a/src/test_helpers/factory.rs +++ b/src/test_helpers/factory.rs @@ -149,6 +149,7 @@ pub(crate) struct JanusBackend { handle_id: i64, session_id: i64, capacity: Option, + balancer_capacity: Option, } impl JanusBackend { @@ -158,6 +159,7 @@ impl JanusBackend { handle_id, session_id, capacity: None, + balancer_capacity: None, } } @@ -168,6 +170,13 @@ impl JanusBackend { } } + pub(crate) fn balancer_capacity(self, balancer_capacity: i32) -> Self { + Self { + balancer_capacity: Some(balancer_capacity), + ..self + } + } + pub(crate) fn insert(&self, conn: &PgConnection) -> db::janus_backend::Object { let mut q = db::janus_backend::UpsertQuery::new(&self.id, self.handle_id, self.session_id); @@ -175,6 +184,10 @@ impl JanusBackend { q = q.capacity(capacity); } + if let Some(balancer_capacity) = self.balancer_capacity { + q = q.balancer_capacity(balancer_capacity); + } + q.execute(conn).expect("Failed to insert janus_backend") } }