From 8c3e29b4ee80088d827b9d753dd00d530c0aedfc Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Mon, 10 Aug 2020 22:04:52 +0300 Subject: [PATCH] Revert publisher migration (#126) --- src/app/endpoint/rtc.rs | 82 ++++---------------------------------- src/db/janus_rtc_stream.rs | 51 ++++++++---------------- 2 files changed, 25 insertions(+), 108 deletions(-) diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index 46a9465c..c716a74e 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -279,13 +279,16 @@ impl RequestHandler for ConnectHandler { let backend = { let conn = context.db().get()?; - // If there is an active stream choose its backend since Janus doesn't support - // clustering so all agents within one rtc must be sent to the same node. If there's no - // active stream it means we're connecting as publisher and going to create it. - // Then select the least loaded node: the one with the least active rtc streams count. + // There are 3 cases: + // 1. Connecting as publisher with no previous stream. Select the least loaded backend + // that is capable to host the room's reservation. + // 2. Connecting as subscriber with existing stream. Choose the backend of the active + // stream because Janus doesn't support clustering and it must be the same server + // that the stream's publisher is connected to. + // 3. Reconnecting as publisher with previous stream. Select the backend of the previous + // stream to avoid partitioning the record across multiple servers. let maybe_rtc_stream = db::janus_rtc_stream::FindQuery::new() .rtc_id(payload.id) - .active(true) .execute(&conn)?; match maybe_rtc_stream { @@ -793,75 +796,6 @@ mod test { }); } - #[test] - fn connect_to_rtc_migration_to_another_backend() { - async_std::task::block_on(async { - let db = TestDb::new(); - let mut authz = TestAuthz::new(); - - let (rtc, backend) = db - .connection_pool() - .get() - .map(|conn| { - // Insert rtcs and janus backends. - let rtc1 = shared_helpers::insert_rtc(&conn); - let rtc2 = shared_helpers::insert_rtc(&conn); - let backend1 = shared_helpers::insert_janus_backend(&conn); - let backend2 = shared_helpers::insert_janus_backend(&conn); - - // The first backend has a finished stream for the first rtc… - let stream1 = factory::JanusRtcStream::new(USR_AUDIENCE) - .backend(&backend1) - .rtc(&rtc1) - .insert(&conn); - - crate::db::janus_rtc_stream::start(stream1.id(), &conn).unwrap(); - crate::db::janus_rtc_stream::stop_by_agent_id(stream1.sent_by(), &conn) - .unwrap(); - - // …and an active stream for the second rtc. - let stream2 = factory::JanusRtcStream::new(USR_AUDIENCE) - .backend(&backend1) - .rtc(&rtc2) - .insert(&conn); - - crate::db::janus_rtc_stream::start(stream2.id(), &conn).unwrap(); - - // A new stream for the first rtc should start on the second backend. - (rtc1, backend2) - }) - .unwrap(); - - // Allow user to read the rtc. - let agent = TestAgent::new("web", "user123", USR_AUDIENCE); - let room_id = rtc.room_id().to_string(); - let rtc_id = rtc.id().to_string(); - let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; - authz.allow(agent.account_id(), object, "read"); - - // Make rtc.connect request. - let context = TestContext::new(db, authz); - let payload = ConnectRequest { id: rtc.id() }; - - let messages = handle_request::(&context, &agent, payload) - .await - .expect("RTC connect failed"); - - // Ensure we're balanced to the least loaded backend. - let (req, _reqp, topic) = find_request::(messages.as_slice()); - - let expected_topic = format!( - "agents/{}/api/{}/in/{}", - backend.id(), - janus::JANUS_API_VERSION, - context.config().id, - ); - - assert_eq!(topic, expected_topic); - assert_eq!(req.session_id, backend.session_id()); - }); - } - #[test] fn connect_to_rtc_with_reservation() { async_std::task::block_on(async { diff --git a/src/db/janus_rtc_stream.rs b/src/db/janus_rtc_stream.rs index 246ea0d4..24545b61 100644 --- a/src/db/janus_rtc_stream.rs +++ b/src/db/janus_rtc_stream.rs @@ -95,23 +95,16 @@ impl Object { //////////////////////////////////////////////////////////////////////////////// -const ACTIVE_SQL: &str = r#" -( - lower("janus_rtc_stream"."time") is not null - and upper("janus_rtc_stream"."time") is null -) -"#; - pub(crate) struct FindQuery { + id: Option, rtc_id: Option, - active: Option, } impl FindQuery { pub(crate) fn new() -> Self { Self { + id: None, rtc_id: None, - active: None, } } @@ -122,42 +115,32 @@ impl FindQuery { } } - pub(crate) fn active(self, active: bool) -> Self { - Self { - active: Some(active), - ..self - } - } - pub(crate) fn execute(&self, conn: &PgConnection) -> Result, Error> { - use diesel::dsl::sql; use diesel::prelude::*; - let rtc_id = match self.rtc_id { - Some(rtc_id) => rtc_id, - None => { + let query = match (self.id, self.rtc_id) { + (Some(ref id), None) => janus_rtc_stream::table.find(id.to_owned()).into_boxed(), + (None, Some(ref rtc_id)) => janus_rtc_stream::table + .filter(janus_rtc_stream::rtc_id.eq(rtc_id.to_owned())) + .into_boxed(), + _ => { return Err(Error::QueryBuilderError( - "rtc_id is required parameter of the query".into(), + "id either rtc_id is required parameter of the query".into(), )) } }; - let mut q = janus_rtc_stream::table - .filter(janus_rtc_stream::rtc_id.eq(rtc_id)) - .into_boxed(); - - match self.active { - None => (), - Some(true) => q = q.filter(sql(ACTIVE_SQL)), - Some(false) => q = q.filter(sql(&format!("not {}", ACTIVE_SQL))), - } - - q.get_result(conn).optional() + query.get_result(conn).optional() } } //////////////////////////////////////////////////////////////////////////////// +const ACTIVE_SQL: &str = r#"( + lower("janus_rtc_stream"."time") is not null + and upper("janus_rtc_stream"."time") is null +)"#; + pub(crate) struct ListQuery { room_id: Option, rtc_id: Option, @@ -311,7 +294,7 @@ pub(crate) fn start(id: Uuid, conn: &PgConnection) -> Result, Err update janus_rtc_stream \ set time = tstzrange(now(), null, '[)') \ where id = $1 \ - returning * \ + returning *\ ", ) .bind::(id) @@ -328,7 +311,7 @@ pub(crate) fn stop(id: Uuid, conn: &PgConnection) -> Result, Erro update janus_rtc_stream \ set time = case when time is not null then tstzrange(lower(time), now(), '[)') end \ where id = $1 \ - returning * \ + returning *\ ", ) .bind::(id)