Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Add falling back to the least loaded backend #163

Merged
merged 1 commit into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ http = "0.1"
lazy_static = "1.4"
openssl = "*"
rand = "0.7"
sentry = "0.18"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
Expand Down
108 changes: 76 additions & 32 deletions src/app/endpoint/rtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ impl RequestHandler for ConnectHandler {
// There are 3 cases:
// 1. Connecting as writer for the first time. There's no recording in that case.
// Select the most loaded backend that is capable to host the room's reservation.
// If there's no capable backend then select the least loaded and send a warning
// to Sentry. If there are no backends at all then return `no available backends`
// error and also send it to Sentry.
// 2. Connecting as reader with existing recording. Choose the backend of the active
// recording because Janus doesn't support clustering and it must be the same server
// that the writer is connected to.
Expand All @@ -311,9 +314,34 @@ impl RequestHandler for ConnectHandler {
.execute(&conn)?
.ok_or("no backend found for stream")
.status(ResponseStatus::UNPROCESSABLE_ENTITY)?,
None => db::janus_backend::most_loaded(room.id(), &conn)?
.ok_or("no available backends")
.status(ResponseStatus::SERVICE_UNAVAILABLE)?,
None => match db::janus_backend::most_loaded(room.id(), &conn)? {
Some(backend) => backend,
None => db::janus_backend::least_loaded(room.id(), &conn)?
.map(|backend| {
use sentry::protocol::{value::Value, Event, Level};

let mut extra = std::collections::BTreeMap::new();
extra.insert(String::from("room_id"), Value::from(room_id));
extra.insert(String::from("rtc_id"), Value::from(rtc_id));
let backend_id = backend.id().to_string();
extra.insert(String::from("backend_id"), Value::from(backend_id));

if let Some(reserve) = room.reserve() {
extra.insert(String::from("reserve"), Value::from(reserve));
}

sentry::capture_event(Event {
message: Some(String::from("No capable backends to host the reserve; falling back to the least loaded backend")),
level: Level::Warning,
extra,
..Default::default()
});

backend
})
.ok_or("no available backends")
.status(ResponseStatus::SERVICE_UNAVAILABLE)?,
},
};

// Create recording if a writer connects for the first time.
Expand Down Expand Up @@ -1326,28 +1354,29 @@ mod test {
}

#[test]
fn connect_to_rtc_reservation_with_stopped_stream() {
fn connect_to_rtc_too_big_reserve() {
async_std::task::block_on(async {
let mut rng = rand::thread_rng();
let db = TestDb::new();
let mut authz = TestAuthz::new();
let writer = TestAgent::new("web", "writer", USR_AUDIENCE);
let writer1 = TestAgent::new("web", "writer1", USR_AUDIENCE);
let writer2 = TestAgent::new("web", "writer2", USR_AUDIENCE);

let rtc = db
let (rtc, backend) = db
.connection_pool()
.get()
.map(|conn| {
let now = Utc::now();

// Insert rooms with reserves.
// Insert room with big reserve and another one for load.
let room1 = factory::Room::new()
.audience(USR_AUDIENCE)
.time((
Bound::Included(now),
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(15)
.reserve(100)
.insert(&conn);

let room2 = factory::Room::new()
Expand All @@ -1357,62 +1386,77 @@ mod test {
Bound::Excluded(now + Duration::hours(1)),
))
.backend(RoomBackend::Janus)
.reserve(15)
.insert(&conn);

// Insert rtcs.
let rtc1 = factory::Rtc::new(room1.id()).insert(&conn);
let rtc2 = factory::Rtc::new(room2.id()).insert(&conn);
let _rtc2 = factory::Rtc::new(room2.id()).insert(&conn);

// Insert a backend with capacity less than the sum of reserves.
let backend_id = {
// Insert backends with low balancer capacity.
let backend_id1 = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
};

let backend = factory::JanusBackend::new(backend_id, rng.gen(), rng.gen())
.capacity(20)
.insert(&conn);
let backend1 =
factory::JanusBackend::new(backend_id1, rng.gen(), rng.gen())
.balancer_capacity(20)
.capacity(200)
.insert(&conn);

// Insert a stopped stream in the first room.
let stream = factory::JanusRtcStream::new(USR_AUDIENCE)
.backend(&backend)
.rtc(&rtc1)
.insert(&conn);
let backend_id2 = {
let agent = TestAgent::new("alpha", "janus", SVC_AUDIENCE);
agent.agent_id().to_owned()
};

crate::db::janus_rtc_stream::start(stream.id(), &conn).unwrap();
crate::db::janus_rtc_stream::stop(stream.id(), &conn).unwrap();
let _backend2 =
factory::JanusBackend::new(backend_id2, rng.gen(), rng.gen())
.balancer_capacity(50)
.capacity(200)
.insert(&conn);

factory::Recording::new()
.rtc(&rtc1)
.backend(&backend)
// Add some load on the biggest one.
factory::Agent::new()
.agent_id(writer2.agent_id())
.room_id(room2.id())
.status(AgentStatus::Ready)
.insert(&conn);

rtc2
(rtc1, backend1)
})
.unwrap();

// Allow user to update the rtc.
let room_id = rtc.room_id().to_string();
let rtc_id = rtc.id().to_string();
let object = vec!["rooms", &room_id, "rtcs", &rtc_id];
authz.allow(writer.account_id(), object, "update");
authz.allow(writer1.account_id(), object, "update");

// Make an rtc.connect request.
// The reserve of the first room must be taken into account despited of the
// stream has been already stopped.
// Despite of none of the backends are capable to host the reserve it should
// select the least loaded one.
let mut context = TestContext::new(db, authz);

let payload = ConnectRequest {
id: rtc.id(),
intent: ConnectIntent::Write,
};

let err = handle_request::<ConnectHandler>(&mut context, &writer, payload)
let messages = handle_request::<ConnectHandler>(&mut context, &writer1, payload)
.await
.expect_err("Unexpected success on rtc connecting");
.expect("RTC connect failed");

assert_eq!(err.status_code(), ResponseStatus::SERVICE_UNAVAILABLE);
// Assert outgoing request goes to the expected backend.
let (_req, _reqp, topic) = find_request::<JanusAttachRequest>(messages.as_slice());

let expected_topic = format!(
"agents/{}/api/{}/in/{}",
backend.id(),
janus::JANUS_API_VERSION,
context.config().id,
);

assert_eq!(topic, &expected_topic);
});
}

Expand Down
58 changes: 58 additions & 0 deletions src/db/janus_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,64 @@ pub(crate) fn most_loaded(room_id: Uuid, conn: &PgConnection) -> Result<Option<O
.optional()
}

// The same as above but finds the least loaded backend instead without considering the reserve.
const LEAST_LOADED_SQL: &str = r#"
WITH
room_load AS (
SELECT
room_id,
COUNT(id) AS taken
FROM agent
WHERE status = 'connected'
GROUP BY room_id
),
active_room AS (
SELECT *
FROM room
WHERE backend = 'janus'
AND NOW() < UPPER(time)
),
janus_backend_load AS (
SELECT
backend_id,
SUM(taken) AS load
FROM (
SELECT DISTINCT ON(backend_id, room_id)
jrs.backend_id,
rtc.room_id,
COALESCE(rl.taken, 0) AS taken
FROM janus_rtc_stream AS jrs
INNER JOIN rtc
ON rtc.id = jrs.rtc_id
LEFT JOIN active_room AS ar
ON ar.id = rtc.room_id
LEFT JOIN room_load AS rl
ON rl.room_id = rtc.room_id
WHERE LOWER(jrs.time) IS NOT NULL
) AS sub
GROUP BY backend_id
)
SELECT jb.*
FROM janus_backend AS jb
LEFT JOIN janus_backend_load AS jbl
ON jbl.backend_id = jb.id
LEFT JOIN room AS r2
ON 1 = 1
WHERE r2.id = $1
ORDER BY COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0)
LIMIT 1
"#;

pub(crate) fn least_loaded(room_id: Uuid, conn: &PgConnection) -> Result<Option<Object>, Error> {
use diesel::prelude::*;
use diesel::sql_types::Uuid;

diesel::sql_query(LEAST_LOADED_SQL)
.bind::<Uuid, _>(room_id)
.get_result(conn)
.optional()
}

////////////////////////////////////////////////////////////////////////////////

// Similar to the previous one but returns the number of free slots for the room on the backend
Expand Down
1 change: 0 additions & 1 deletion src/db/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ impl Object {
self.backend
}

#[cfg(test)]
pub(crate) fn reserve(&self) -> Option<i32> {
self.reserve
}
Expand Down
13 changes: 13 additions & 0 deletions src/test_helpers/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub(crate) struct JanusBackend {
handle_id: i64,
session_id: i64,
capacity: Option<i32>,
balancer_capacity: Option<i32>,
}

impl JanusBackend {
Expand All @@ -158,6 +159,7 @@ impl JanusBackend {
handle_id,
session_id,
capacity: None,
balancer_capacity: None,
}
}

Expand All @@ -168,13 +170,24 @@ impl JanusBackend {
}
}

pub(crate) fn balancer_capacity(self, balancer_capacity: i32) -> Self {
Self {
balancer_capacity: Some(balancer_capacity),
..self
}
}

pub(crate) fn insert(&self, conn: &PgConnection) -> db::janus_backend::Object {
let mut q = db::janus_backend::UpsertQuery::new(&self.id, self.handle_id, self.session_id);

if let Some(capacity) = self.capacity {
q = q.capacity(capacity);
}

if let Some(balancer_capacity) = self.balancer_capacity {
q = q.balancer_capacity(balancer_capacity);
}

q.execute(conn).expect("Failed to insert janus_backend")
}
}
Expand Down