From 477d78df0f93b2b79b6537c463959a4f707a59ab Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Thu, 28 Jan 2021 21:07:19 +0300 Subject: [PATCH] Allow unbounded rooms (#211) --- src/app/endpoint/helpers.rs | 30 +++- src/app/endpoint/room.rs | 306 +++++++++++++++++++++++++----------- src/app/endpoint/rtc.rs | 6 +- src/app/error.rs | 7 + src/db/janus_backend.rs | 12 +- 5 files changed, 262 insertions(+), 99 deletions(-) diff --git a/src/app/endpoint/helpers.rs b/src/app/endpoint/helpers.rs index 0a15cb21..65d387f7 100644 --- a/src/app/endpoint/helpers.rs +++ b/src/app/endpoint/helpers.rs @@ -13,6 +13,7 @@ use crate::app::context::Context; use crate::app::error::{Error as AppError, ErrorExt, ErrorKind as AppErrorKind}; use crate::app::API_VERSION; use crate::db; +use crate::db::room::Object as Room; /////////////////////////////////////////////////////////////////////////////// @@ -51,6 +52,7 @@ pub(crate) fn build_notification( pub(crate) enum RoomTimeRequirement { Any, NotClosed, + NotClosedOrUnboundedOpen, Open, } @@ -78,7 +80,7 @@ fn find_room( context: &mut C, query: Q, opening_requirement: RoomTimeRequirement, -) -> Result +) -> Result where C: Context, Q: db::room::FindQueryable, @@ -96,12 +98,29 @@ where // Room time doesn't matter. RoomTimeRequirement::Any => Ok(room), // Current time must be before room closing, including not yet opened rooms. + // Rooms without closing time are fine. + // Rooms without opening time are forbidden. RoomTimeRequirement::NotClosed => { let now = Utc::now(); - let (_opened_at, closed_at) = room.time(); - match closed_at { - Bound::Included(dt) | Bound::Excluded(dt) if *dt < now => { + match room.time() { + (Bound::Unbounded, _) => { + Err(anyhow!("Room has no opening time")).error(AppErrorKind::RoomClosed) + } + (_, Bound::Included(dt)) | (_, Bound::Excluded(dt)) if *dt < now => { + Err(anyhow!("Room closed")).error(AppErrorKind::RoomClosed) + } + _ => Ok(room), + } + } + // Current time must be before room closing, including not yet opened rooms. + // Rooms without closing time are fine. + // Rooms without opening time are fine. + RoomTimeRequirement::NotClosedOrUnboundedOpen => { + let now = Utc::now(); + + match room.time() { + (_, Bound::Included(dt)) | (_, Bound::Excluded(dt)) if *dt < now => { Err(anyhow!("Room closed")).error(AppErrorKind::RoomClosed) } _ => Ok(room), @@ -113,6 +132,9 @@ where let (opened_at, closed_at) = room.time(); match opened_at { + Bound::Unbounded => { + Err(anyhow!("Room has no opening time")).error(AppErrorKind::RoomClosed) + } Bound::Included(dt) | Bound::Excluded(dt) if *dt >= now => { Err(anyhow!("Room not opened")).error(AppErrorKind::RoomClosed) } diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index 7a29ca87..f8c273b0 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -15,6 +15,7 @@ use uuid::Uuid; use crate::app::context::Context; use crate::app::endpoint::prelude::*; use crate::db; +use crate::db::room::RoomBackend; /////////////////////////////////////////////////////////////////////////////// @@ -43,14 +44,14 @@ pub(crate) struct CreateRequest { time: (Bound>, Bound>), audience: String, #[serde(default = "CreateRequest::default_backend")] - backend: db::room::RoomBackend, + backend: RoomBackend, reserve: Option, tags: Option, } impl CreateRequest { - fn default_backend() -> db::room::RoomBackend { - db::room::RoomBackend::None + fn default_backend() -> RoomBackend { + RoomBackend::None } } @@ -179,8 +180,11 @@ impl RequestHandler for UpdateHandler { payload: Self::Payload, reqp: &IncomingRequestProperties, ) -> Result { - let room = - helpers::find_room_by_id(context, payload.id, helpers::RoomTimeRequirement::NotClosed)?; + let room = helpers::find_room_by_id( + context, + payload.id, + helpers::RoomTimeRequirement::NotClosedOrUnboundedOpen, + )?; // Authorize room updating on the tenant. let room_id = room.id().to_string(); @@ -192,63 +196,54 @@ impl RequestHandler for UpdateHandler { .await?; let room_was_open = !room.is_closed(); - let mut room_closed_by_update = false; - - let mut time = payload.time; - if let Some(new_time) = time { - match new_time { - (Bound::Included(new_opened_at), Bound::Excluded(new_closed_at)) - if new_closed_at > new_opened_at => - { - if let (Bound::Included(opened_at), _) = room.time() { - if *opened_at <= Utc::now() { - let new_closed_at = if new_closed_at <= Utc::now() { - room_closed_by_update = true; - Utc::now() - } else { - new_closed_at - }; - - time = - Some((Bound::Included(*opened_at), Bound::Excluded(new_closed_at))); - } - } - } - // the case when new end is unbouned is special - // we must take care not to erase max webinar duration timeout - (new_opened_at, Bound::Unbounded) => { - let (start, end) = room.time(); - let new_start = match start { - Bound::Included(opened_at) if *opened_at <= Utc::now() => { - Bound::Included(*opened_at) - } - Bound::Included(_) => new_opened_at, + + // Update room. + let room = { + let conn = context.get_conn()?; + + let time = match payload.time { + None => None, + Some(new_time) => { + match new_time { + (Bound::Included(o), Bound::Excluded(c)) if o < c => (), + (Bound::Included(_), Bound::Unbounded) => (), _ => { return Err(anyhow!("Invalid room time")) .error(AppErrorKind::InvalidRoomTime) } }; - // if new end should be unbounded there are two cases - // old end is unbounded => end doesnt change - // old end is bounded => it means there already has been an attempt to stream or the room was bounded from creation - // thus we cant make the end unbounded again - time = Some((new_start, *end)) + + match room.time() { + // Allow any change when no closing date specified. + (_, Bound::Unbounded) => Some(new_time), + (Bound::Included(o), Bound::Excluded(c)) if *c > Utc::now() => { + match new_time { + // Allow reschedule future closing. + (_, Bound::Excluded(nc)) => { + let nc = std::cmp::max(nc, Utc::now()); + Some((Bound::Included(*o), Bound::Excluded(nc))) + } + _ => { + return Err(anyhow!("Setting unbounded closing time is not allowed in this room anymore")) + .error(AppErrorKind::RoomTimeChangingForbidden); + } + } + } + _ => { + return Err(anyhow!("Room has been already closed")) + .error(AppErrorKind::RoomTimeChangingForbidden); + } + } } - _ => return Err(anyhow!("Invalid room time")).error(AppErrorKind::InvalidRoomTime), - } - } + }; - // Update room. - let room = { - let query = db::room::UpdateQuery::new(room.id()) + db::room::UpdateQuery::new(room.id()) .time(time) .audience(payload.audience) .backend(payload.backend) .reserve(payload.reserve) - .tags(payload.tags); - - let conn = context.get_conn()?; - query.execute(&conn)? + .tags(payload.tags) + .execute(&conn)? }; // Respond and broadcast to the audience topic. @@ -270,27 +265,25 @@ impl RequestHandler for UpdateHandler { let mut responses = vec![response, notification]; - let append_closed_notification = || { - responses.push(helpers::build_notification( - "room.close", - &format!("rooms/{}/events", room.id()), - room.clone(), - reqp, - context.start_timestamp(), - )); - - responses.push(helpers::build_notification( - "room.close", - &format!("audiences/{}/events", room.audience()), - room, - reqp, - context.start_timestamp(), - )); - }; - - // Publish room closed notification - if room_was_open && room_closed_by_update { - append_closed_notification(); + // Publish room closed notification. + if let (_, Bound::Excluded(closed_at)) = room.time() { + if room_was_open && *closed_at <= Utc::now() { + responses.push(helpers::build_notification( + "room.close", + &format!("rooms/{}/events", room.id()), + room.clone(), + reqp, + context.start_timestamp(), + )); + + responses.push(helpers::build_notification( + "room.close", + &format!("audiences/{}/events", room.audience()), + room.clone(), + reqp, + context.start_timestamp(), + )); + } } Ok(Box::new(stream::from_iter(responses))) @@ -312,8 +305,11 @@ impl RequestHandler for DeleteHandler { payload: Self::Payload, reqp: &IncomingRequestProperties, ) -> Result { - let room = - helpers::find_room_by_id(context, payload.id, helpers::RoomTimeRequirement::NotClosed)?; + let room = helpers::find_room_by_id( + context, + payload.id, + helpers::RoomTimeRequirement::NotClosedOrUnboundedOpen, + )?; // Authorize room deletion on the tenant. let room_id = room.id().to_string(); @@ -340,7 +336,7 @@ impl RequestHandler for DeleteHandler { ); let notification = helpers::build_notification( - "room.update", + "room.delete", &format!("audiences/{}/events", room.audience()), room, reqp, @@ -367,7 +363,7 @@ impl RequestHandler for EnterHandler { reqp: &IncomingRequestProperties, ) -> Result { let room = - helpers::find_room_by_id(context, payload.id, helpers::RoomTimeRequirement::Open)?; + helpers::find_room_by_id(context, payload.id, helpers::RoomTimeRequirement::NotClosed)?; // Authorize subscribing to the room's events. let room_id = room.id().to_string(); @@ -494,7 +490,7 @@ mod test { mod create { use std::ops::Bound; - use chrono::{SubsecRound, Utc}; + use chrono::Utc; use serde_json::json; use crate::db::room::Object as Room; @@ -505,15 +501,16 @@ mod test { #[test] fn create() { async_std::task::block_on(async { + let db = TestDb::new(); + // Allow user to create rooms. let mut authz = TestAuthz::new(); let agent = TestAgent::new("web", "user123", USR_AUDIENCE); authz.allow(agent.account_id(), vec!["rooms"], "create"); // Make room.create request. - let mut context = TestContext::new(TestDb::new(), authz); - let now = Utc::now().trunc_subsecs(0); - let time = (Bound::Included(now), Bound::Unbounded); + let mut context = TestContext::new(db.clone(), authz); + let time = (Bound::Unbounded, Bound::Unbounded); let payload = CreateRequest { time: time.clone(), @@ -666,6 +663,7 @@ mod test { use chrono::{Duration, SubsecRound, Utc}; use serde_json::json; + use uuid::Uuid; use crate::db::room::Object as Room; use crate::test_helpers::find_event_by_predicate; @@ -688,10 +686,7 @@ mod test { // Create room. factory::Room::new() .audience(USR_AUDIENCE) - .time(( - Bound::Included(now + Duration::hours(1)), - Bound::Excluded(now + Duration::hours(2)), - )) + .time((Bound::Unbounded, Bound::Unbounded)) .backend(db::room::RoomBackend::Janus) .insert(&conn) }; @@ -706,8 +701,8 @@ mod test { let mut context = TestContext::new(db, authz); let time = ( - Bound::Included(now + Duration::hours(2)), - Bound::Excluded(now + Duration::hours(3)), + Bound::Included(now + Duration::minutes(50)), + Bound::Unbounded, ); let payload = UpdateRequest { @@ -804,7 +799,7 @@ mod test { .audience(USR_AUDIENCE) .time(( Bound::Included(now - Duration::hours(1)), - Bound::Excluded(now + Duration::hours(2)), + Bound::Excluded(now + Duration::hours(5)), )) .backend(db::room::RoomBackend::Janus) .insert(&conn) @@ -865,6 +860,55 @@ mod test { }); } + #[test] + fn update_and_close_unbounded_room() { + async_std::task::block_on(async { + let db = TestDb::new(); + let now = Utc::now().trunc_subsecs(0); + + let room = { + let conn = db + .connection_pool() + .get() + .expect("Failed to get DB connection"); + + // Create room. + factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Included(now - Duration::hours(1)), Bound::Unbounded)) + .backend(db::room::RoomBackend::Janus) + .insert(&conn) + }; + + // Allow agent to update the room. + let agent = TestAgent::new("web", "user123", USR_AUDIENCE); + let mut authz = TestAuthz::new(); + let room_id = room.id().to_string(); + authz.allow(agent.account_id(), vec!["rooms", &room_id], "update"); + + // Make room.update request. + let mut context = TestContext::new(db, authz); + + let time = ( + Bound::Included(now - Duration::hours(1)), + Bound::Excluded(now - Duration::seconds(5)), + ); + + let payload = UpdateRequest { + id: room.id(), + time: Some(time), + reserve: None, + audience: None, + backend: None, + tags: None, + }; + + handle_request::(&mut context, &agent, payload) + .await + .expect("Room update failed"); + }) + } + #[test] fn update_room_missing() { async_std::task::block_on(async { @@ -1014,6 +1058,8 @@ mod test { } mod enter { + use chrono::{Duration, Utc}; + use crate::test_helpers::prelude::*; use super::super::*; @@ -1140,7 +1186,49 @@ mod test { ); // Make room.enter request. - let mut context = TestContext::new(db, TestAuthz::new()); + let mut context = TestContext::new(db, authz); + let payload = EnterRequest { id: room.id() }; + + let err = handle_request::(&mut context, &agent, payload) + .await + .expect_err("Unexpected success on room entering"); + + assert_eq!(err.status(), ResponseStatus::NOT_FOUND); + assert_eq!(err.kind(), "room_closed"); + }); + } + + #[test] + fn enter_room_with_no_opening_time() { + async_std::task::block_on(async { + let db = TestDb::new(); + + let room = { + let conn = db + .connection_pool() + .get() + .expect("Failed to get DB connection"); + + // Create room without time. + factory::Room::new() + .audience(USR_AUDIENCE) + .time((Bound::Unbounded, Bound::Unbounded)) + .insert(&conn) + }; + + // Allow agent to subscribe to the rooms' events. + let agent = TestAgent::new("web", "user123", USR_AUDIENCE); + let mut authz = TestAuthz::new(); + let room_id = room.id().to_string(); + + authz.allow( + agent.account_id(), + vec!["rooms", &room_id, "events"], + "subscribe", + ); + + // Make room.enter request. + let mut context = TestContext::new(db, authz); let payload = EnterRequest { id: room.id() }; let err = handle_request::(&mut context, &agent, payload) @@ -1151,6 +1239,48 @@ mod test { assert_eq!(err.kind(), "room_closed"); }); } + + #[test] + fn enter_room_that_opens_in_the_future() { + async_std::task::block_on(async { + let db = TestDb::new(); + + let room = { + let conn = db + .connection_pool() + .get() + .expect("Failed to get DB connection"); + + // Create room without time. + factory::Room::new() + .audience(USR_AUDIENCE) + .time(( + Bound::Included(Utc::now() + Duration::hours(1)), + Bound::Unbounded, + )) + .insert(&conn) + }; + + // Allow agent to subscribe to the rooms' events. + let agent = TestAgent::new("web", "user123", USR_AUDIENCE); + let mut authz = TestAuthz::new(); + let room_id = room.id().to_string(); + + authz.allow( + agent.account_id(), + vec!["rooms", &room_id, "events"], + "subscribe", + ); + + // Make room.enter request. + let mut context = TestContext::new(db, authz); + let payload = EnterRequest { id: room.id() }; + + handle_request::(&mut context, &agent, payload) + .await + .expect("Room entrance failed"); + }); + } } mod leave { diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index 3b4c7c90..024b8845 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -1427,7 +1427,7 @@ mod test { let room1 = factory::Room::new() .audience(USR_AUDIENCE) .time(( - Bound::Included(now), + Bound::Included(now - Duration::minutes(1)), Bound::Excluded(now + Duration::hours(1)), )) .backend(RoomBackend::Janus) @@ -1438,7 +1438,7 @@ mod test { let room2 = factory::Room::new() .audience(USR_AUDIENCE) .time(( - Bound::Included(now), + Bound::Included(now - Duration::minutes(1)), Bound::Excluded(now + Duration::hours(1)), )) .reserve(600) @@ -1449,7 +1449,7 @@ mod test { let room3 = factory::Room::new() .audience(USR_AUDIENCE) .time(( - Bound::Included(now), + Bound::Included(now - Duration::minutes(1)), Bound::Excluded(now + Duration::hours(1)), )) .reserve(400) diff --git a/src/app/error.rs b/src/app/error.rs index d12677c8..e5b2e325 100644 --- a/src/app/error.rs +++ b/src/app/error.rs @@ -40,6 +40,7 @@ pub(crate) enum ErrorKind { ResubscriptionFailed, RoomClosed, RoomNotFound, + RoomTimeChangingForbidden, RtcNotFound, StatsCollectionFailed, } @@ -220,6 +221,12 @@ impl Into for ErrorKind { title: "Room not found", is_notify_sentry: false, }, + Self::RoomTimeChangingForbidden => ErrorKindProperties { + status: ResponseStatus::UNPROCESSABLE_ENTITY, + kind: "room_time_changing_forbidden", + title: "Room time changing forbidden", + is_notify_sentry: false, + }, Self::RtcNotFound => ErrorKindProperties { status: ResponseStatus::NOT_FOUND, kind: "rtc_not_found", diff --git a/src/db/janus_backend.rs b/src/db/janus_backend.rs index a7f486be..f55cc4e7 100644 --- a/src/db/janus_backend.rs +++ b/src/db/janus_backend.rs @@ -209,7 +209,8 @@ const MOST_LOADED_SQL: &str = r#" FROM room WHERE backend = 'janus' AND backend_id IS NOT NULL - AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day' + AND LOWER(time) <= NOW() + AND (UPPER(time) IS NULL OR UPPER(time) > NOW()) ), janus_backend_load AS ( SELECT @@ -265,7 +266,8 @@ const LEAST_LOADED_SQL: &str = r#" FROM room WHERE backend = 'janus' AND backend_id IS NOT NULL - AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day' + AND LOWER(time) <= NOW() + AND (UPPER(time) IS NULL OR UPPER(time) > NOW()) ), janus_backend_load AS ( SELECT @@ -325,7 +327,8 @@ const FREE_CAPACITY_SQL: &str = r#" FROM room WHERE backend = 'janus' AND backend_id IS NOT NULL - AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day' + AND LOWER(time) <= NOW() + AND (UPPER(time) IS NULL OR UPPER(time) > NOW()) ), janus_backend_load AS ( SELECT @@ -448,7 +451,8 @@ WITH FROM room WHERE backend = 'janus' AND backend_id IS NOT NULL - AND UPPER(time) BETWEEN NOW() AND NOW() + INTERVAL '1 day' + AND LOWER(time) <= NOW() + AND (UPPER(time) IS NULL OR UPPER(time) > NOW()) ), janus_backend_load AS ( SELECT