From 2b601a95ab02ed96b3c96accf6e9f92534b6d958 Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Wed, 30 Sep 2020 14:40:30 +0300 Subject: [PATCH] Add balancer capacity (#160) --- .../down.sql | 1 + .../up.sql | 1 + src/app/janus.rs | 40 ++++++++++++++++++- src/backend/janus.rs | 5 +++ src/db/janus_backend.rs | 16 +++++++- src/schema.rs | 1 + 6 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/down.sql create mode 100644 migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/up.sql diff --git a/migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/down.sql b/migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/down.sql new file mode 100644 index 00000000..7a9be7a7 --- /dev/null +++ b/migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/down.sql @@ -0,0 +1 @@ +ALTER TABLE janus_backend DROP COLUMN balancer_capacity; diff --git a/migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/up.sql b/migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/up.sql new file mode 100644 index 00000000..24b6e2dd --- /dev/null +++ b/migrations/2020-09-28-204546_add_balancer_capacity_to_janus_backend/up.sql @@ -0,0 +1 @@ +ALTER TABLE janus_backend ADD COLUMN balancer_capacity INT; diff --git a/src/app/janus.rs b/src/app/janus.rs index e788e88f..abc2f5aa 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -54,21 +54,34 @@ pub(crate) enum Transaction { #[derive(Debug, Deserialize, Serialize)] pub(crate) struct CreateSessionTransaction { capacity: Option, + balancer_capacity: Option, } impl CreateSessionTransaction { pub(crate) fn new() -> Self { - Self { capacity: None } + Self { + capacity: None, + balancer_capacity: None, + } } pub(crate) fn capacity(&self) -> Option { self.capacity } + pub(crate) fn balancer_capacity(&self) -> Option { + self.balancer_capacity + } + pub(crate) fn set_capacity(&mut self, capacity: i32) -> &mut Self { self.capacity = Some(capacity); self } + + pub(crate) fn set_balancer_capacity(&mut self, balancer_capacity: i32) -> &mut Self { + self.balancer_capacity = Some(balancer_capacity); + self + } } pub(crate) fn create_session_request( @@ -87,6 +100,10 @@ where tn_data.set_capacity(capacity); } + if let Some(balancer_capacity) = payload.balancer_capacity() { + tn_data.set_balancer_capacity(balancer_capacity); + } + let transaction = Transaction::CreateSession(tn_data); let payload = CreateSessionRequest::new(&to_base64(&transaction)?); @@ -112,6 +129,7 @@ where pub(crate) struct CreateHandleTransaction { session_id: i64, capacity: Option, + balancer_capacity: Option, } impl CreateHandleTransaction { @@ -119,6 +137,7 @@ impl CreateHandleTransaction { Self { session_id, capacity: None, + balancer_capacity: None, } } @@ -126,16 +145,26 @@ impl CreateHandleTransaction { self.capacity } + pub(crate) fn balancer_capacity(&self) -> Option { + self.balancer_capacity + } + pub(crate) fn set_capacity(&mut self, capacity: i32) -> &mut Self { self.capacity = Some(capacity); self } + + pub(crate) fn set_balancer_capacity(&mut self, balancer_capacity: i32) -> &mut Self { + self.balancer_capacity = Some(balancer_capacity); + self + } } pub(crate) fn create_handle_request( respp: &IncomingResponseProperties, session_id: i64, capacity: Option, + balancer_capacity: Option, me: &M, start_timestamp: DateTime, ) -> Result> @@ -149,6 +178,10 @@ where tn_data.set_capacity(capacity); } + if let Some(balancer_capacity) = balancer_capacity { + tn_data.set_balancer_capacity(balancer_capacity); + } + let transaction = Transaction::CreateHandle(tn_data); let payload = CreateHandleRequest::new( @@ -644,6 +677,7 @@ async fn handle_response_impl( respp, inresp.data().id(), tn.capacity(), + tn.balancer_capacity(), context.agent_id(), start_timestamp, ) @@ -666,6 +700,10 @@ async fn handle_response_impl( q = q.capacity(capacity); } + if let Some(balancer_capacity) = tn.balancer_capacity() { + q = q.balancer_capacity(balancer_capacity); + } + q.execute(&conn)?; Ok(Box::new(stream::empty())) } diff --git a/src/backend/janus.rs b/src/backend/janus.rs index 74f7fb69..4f7f736a 100644 --- a/src/backend/janus.rs +++ b/src/backend/janus.rs @@ -413,6 +413,7 @@ impl OpaqueId for DetachedEvent { pub(crate) struct StatusEvent { online: bool, capacity: Option, + balancer_capacity: Option, } impl StatusEvent { @@ -423,4 +424,8 @@ impl StatusEvent { pub(crate) fn capacity(&self) -> Option { self.capacity } + + pub(crate) fn balancer_capacity(&self) -> Option { + self.balancer_capacity + } } diff --git a/src/db/janus_backend.rs b/src/db/janus_backend.rs index c8b877cd..a2bfcb44 100644 --- a/src/db/janus_backend.rs +++ b/src/db/janus_backend.rs @@ -12,6 +12,7 @@ pub(crate) type AllColumns = ( janus_backend::session_id, janus_backend::created_at, janus_backend::capacity, + janus_backend::balancer_capacity, ); pub(crate) const ALL_COLUMNS: AllColumns = ( @@ -20,6 +21,7 @@ pub(crate) const ALL_COLUMNS: AllColumns = ( janus_backend::session_id, janus_backend::created_at, janus_backend::capacity, + janus_backend::balancer_capacity, ); //////////////////////////////////////////////////////////////////////////////// @@ -32,6 +34,7 @@ pub(crate) struct Object { session_id: i64, created_at: DateTime, capacity: Option, + balancer_capacity: Option, } impl Object { @@ -125,6 +128,7 @@ pub(crate) struct UpsertQuery<'a> { handle_id: i64, session_id: i64, capacity: Option, + balancer_capacity: Option, } impl<'a> UpsertQuery<'a> { @@ -134,6 +138,7 @@ impl<'a> UpsertQuery<'a> { handle_id, session_id, capacity: None, + balancer_capacity: None, } } @@ -144,6 +149,13 @@ impl<'a> UpsertQuery<'a> { } } + pub(crate) fn balancer_capacity(self, balancer_capacity: i32) -> Self { + Self { + balancer_capacity: Some(balancer_capacity), + ..self + } + } + pub(crate) fn execute(&self, conn: &PgConnection) -> Result { use crate::schema::janus_backend::dsl::janus_backend; use diesel::RunQueryDsl; @@ -226,8 +238,8 @@ const LEAST_LOADED_SQL: &str = r#" LEFT JOIN room AS r2 ON 1 = 1 WHERE r2.id = $1 - AND COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.load, 0) > COALESCE(r2.reserve, 0) - ORDER BY COALESCE(jb.capacity, 2147483647) - COALESCE(jbl.load, 0) DESC + AND COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0) > COALESCE(r2.reserve, 0) + ORDER BY COALESCE(jb.balancer_capacity, jb.capacity, 2147483647) - COALESCE(jbl.load, 0) DESC LIMIT 1 "#; diff --git a/src/schema.rs b/src/schema.rs index 72cb8187..6350bab5 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -33,6 +33,7 @@ table! { session_id -> Int8, created_at -> Timestamptz, capacity -> Nullable, + balancer_capacity -> Nullable, } }