From 2bf9225d2adad2492e723ee9c331095501c2b5de Mon Sep 17 00:00:00 2001 From: Timofey Martynov Date: Mon, 18 May 2020 14:24:08 +0300 Subject: [PATCH] Optimize DB connection usage (#105) --- src/app/endpoint/agent.rs | 40 +++++---- src/app/endpoint/message.rs | 5 +- src/app/endpoint/room.rs | 113 +++++++++++++++----------- src/app/endpoint/rtc.rs | 134 +++++++++++++++++-------------- src/app/endpoint/rtc_stream.rs | 41 +++++----- src/app/endpoint/subscription.rs | 27 ++++--- 6 files changed, 203 insertions(+), 157 deletions(-) diff --git a/src/app/endpoint/agent.rs b/src/app/endpoint/agent.rs index 42a2dd40..2813cb95 100644 --- a/src/app/endpoint/agent.rs +++ b/src/app/endpoint/agent.rs @@ -33,15 +33,17 @@ impl RequestHandler for ListHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; - // Check whether the room exists and open. - let room = db::room::FindQuery::new() - .id(payload.room_id) - .time(db::room::now()) - .execute(&conn)? - .ok_or_else(|| format!("the room = '{}' is not found or closed", payload.room_id)) - .status(ResponseStatus::NOT_FOUND)?; + let room = { + let conn = context.db().get()?; + + db::room::FindQuery::new() + .id(payload.room_id) + .time(db::room::now()) + .execute(&conn)? + .ok_or_else(|| format!("the room = '{}' is not found or closed", payload.room_id)) + .status(ResponseStatus::NOT_FOUND)? + }; // Authorize agents listing in the room. let room_id = room.id().to_string(); @@ -53,15 +55,19 @@ impl RequestHandler for ListHandler { .await?; // Get agents list in the room. - let agents = db::agent::ListQuery::new() - .room_id(payload.room_id) - .status(db::agent::Status::Ready) - .offset(payload.offset.unwrap_or_else(|| 0)) - .limit(std::cmp::min( - payload.limit.unwrap_or_else(|| MAX_LIMIT), - MAX_LIMIT, - )) - .execute(&conn)?; + let agents = { + let conn = context.db().get()?; + + db::agent::ListQuery::new() + .room_id(payload.room_id) + .status(db::agent::Status::Ready) + .offset(payload.offset.unwrap_or_else(|| 0)) + .limit(std::cmp::min( + payload.limit.unwrap_or_else(|| MAX_LIMIT), + MAX_LIMIT, + )) + .execute(&conn)? + }; // Respond with agents list. Ok(Box::new(stream::once(shared::build_response( diff --git a/src/app/endpoint/message.rs b/src/app/endpoint/message.rs index 964e2c9e..fafd3a0a 100644 --- a/src/app/endpoint/message.rs +++ b/src/app/endpoint/message.rs @@ -74,9 +74,8 @@ impl RequestHandler for UnicastHandler { API_VERSION, ); - Ok(Box::new(stream::once( - Box::new(req) as Box - ))) + let boxed_req = Box::new(req) as Box; + Ok(Box::new(stream::once(boxed_req))) } } diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index e7d50d57..c64365cc 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -119,13 +119,15 @@ impl RequestHandler for ReadHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - let room = db::room::FindQuery::new() - .id(payload.id) - .execute(&conn)? - .ok_or_else(|| format!("Room not found, id = '{}'", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; + db::room::FindQuery::new() + .id(payload.id) + .execute(&conn)? + .ok_or_else(|| format!("Room not found, id = '{}'", payload.id)) + .status(ResponseStatus::NOT_FOUND)? + }; // Authorize room reading on the tenant. let room_id = room.id().to_string(); @@ -162,14 +164,16 @@ impl RequestHandler for UpdateHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - let room = db::room::FindQuery::new() - .time(db::room::since_now()) - .id(payload.id()) - .execute(&conn)? - .ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id())) - .status(ResponseStatus::NOT_FOUND)?; + db::room::FindQuery::new() + .time(db::room::since_now()) + .id(payload.id()) + .execute(&conn)? + .ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id())) + .status(ResponseStatus::NOT_FOUND)? + }; // Authorize room updating on the tenant. let room_id = room.id().to_string(); @@ -181,7 +185,10 @@ impl RequestHandler for UpdateHandler { .await?; // Update room. - let room = payload.execute(&conn)?; + let room = { + let conn = context.db().get()?; + payload.execute(&conn)? + }; // Respond and broadcast to the audience topic. let response = shared::build_response( @@ -220,14 +227,16 @@ impl RequestHandler for DeleteHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - let room = db::room::FindQuery::new() - .time(db::room::since_now()) - .id(payload.id) - .execute(&conn)? - .ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; + db::room::FindQuery::new() + .time(db::room::since_now()) + .id(payload.id) + .execute(&conn)? + .ok_or_else(|| format!("Room not found, id = '{}' or closed", payload.id)) + .status(ResponseStatus::NOT_FOUND)? + }; // Authorize room deletion on the tenant. let room_id = room.id().to_string(); @@ -239,7 +248,10 @@ impl RequestHandler for DeleteHandler { .await?; // Delete room. - db::room::DeleteQuery::new(room.id()).execute(&conn)?; + { + let conn = context.db().get()?; + db::room::DeleteQuery::new(room.id()).execute(&conn)?; + } // Respond and broadcast to the audience topic. let response = shared::build_response( @@ -278,14 +290,16 @@ impl RequestHandler for EnterHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - let room = db::room::FindQuery::new() - .id(payload.id) - .time(db::room::now()) - .execute(&conn)? - .ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; + db::room::FindQuery::new() + .id(payload.id) + .time(db::room::now()) + .execute(&conn)? + .ok_or_else(|| format!("Room not found or closed, id = '{}'", payload.id)) + .status(ResponseStatus::NOT_FOUND)? + }; // Authorize subscribing to the room's events. let room_id = room.id().to_string(); @@ -297,7 +311,10 @@ impl RequestHandler for EnterHandler { .await?; // Register agent in `in_progress` state. - db::agent::InsertQuery::new(reqp.as_agent_id(), room.id()).execute(&conn)?; + { + let conn = context.db().get()?; + db::agent::InsertQuery::new(reqp.as_agent_id(), room.id()).execute(&conn)?; + } // Send dynamic subscription creation request to the broker. let payload = SubscriptionRequest::new(reqp.as_agent_id().to_owned(), object); @@ -343,22 +360,26 @@ impl RequestHandler for LeaveHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; - - let room = db::room::FindQuery::new() - .id(payload.id) - .execute(&conn)? - .ok_or_else(|| format!("Room not found, id = '{}'", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; - - // Check room presence. - let results = db::agent::ListQuery::new() - .room_id(room.id()) - .agent_id(reqp.as_agent_id()) - .status(db::agent::Status::Ready) - .execute(&conn)?; - - if results.is_empty() { + let (room, presence) = { + let conn = context.db().get()?; + + let room = db::room::FindQuery::new() + .id(payload.id) + .execute(&conn)? + .ok_or_else(|| format!("Room not found, id = '{}'", payload.id)) + .status(ResponseStatus::NOT_FOUND)?; + + // Check room presence. + let presence = db::agent::ListQuery::new() + .room_id(room.id()) + .agent_id(reqp.as_agent_id()) + .status(db::agent::Status::Ready) + .execute(&conn)?; + + (room, presence) + }; + + if presence.is_empty() { return Err(format!( "agent = '{}' is not online in the room = '{}'", reqp.as_agent_id(), diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index 0d450446..966a7ddd 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -48,14 +48,16 @@ impl RequestHandler for CreateHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - let room = db::room::FindQuery::new() - .time(db::room::now()) - .id(payload.room_id) - .execute(&conn)? - .ok_or_else(|| format!("the room = '{}' is not found", payload.room_id)) - .status(ResponseStatus::NOT_FOUND)?; + db::room::FindQuery::new() + .time(db::room::now()) + .id(payload.room_id) + .execute(&conn)? + .ok_or_else(|| format!("the room = '{}' is not found", payload.room_id)) + .status(ResponseStatus::NOT_FOUND)? + }; // Authorize room creation. let room_id = room.id().to_string(); @@ -67,7 +69,10 @@ impl RequestHandler for CreateHandler { .await?; // Create an rtc. - let rtc = db::rtc::InsertQuery::new(room.id()).execute(&conn)?; + let rtc = { + let conn = context.db().get()?; + db::rtc::InsertQuery::new(room.id()).execute(&conn)? + }; // Respond and broadcast to the room topic. let response = shared::build_response( @@ -110,33 +115,37 @@ impl RequestHandler for ReadHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - // Authorize rtc reading. - let authz_time = { - let room = db::room::FindQuery::new() + db::room::FindQuery::new() .time(db::room::now()) .rtc_id(payload.id) .execute(&conn)? .ok_or_else(|| format!("a room for the rtc = '{}' is not found", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; + .status(ResponseStatus::NOT_FOUND)? + }; - let rtc_id = payload.id.to_string(); - let room_id = room.id().to_string(); - let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; + // Authorize rtc reading. + let rtc_id = payload.id.to_string(); + let room_id = room.id().to_string(); + let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; - context - .authz() - .authorize(room.audience(), reqp, object, "read") - .await? - }; + let authz_time = context + .authz() + .authorize(room.audience(), reqp, object, "read") + .await?; // Return rtc. - let rtc = db::rtc::FindQuery::new() - .id(payload.id) - .execute(&conn)? - .ok_or_else(|| format!("RTC not found, id = '{}'", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; + let rtc = { + let conn = context.db().get()?; + + db::rtc::FindQuery::new() + .id(payload.id) + .execute(&conn)? + .ok_or_else(|| format!("RTC not found, id = '{}'", payload.id)) + .status(ResponseStatus::NOT_FOUND)? + }; Ok(Box::new(stream::once(shared::build_response( ResponseStatus::OK, @@ -172,25 +181,25 @@ impl RequestHandler for ListHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - // Authorize rtc listing. - let authz_time = { - let room = db::room::FindQuery::new() + db::room::FindQuery::new() .time(db::room::now()) .id(payload.room_id) .execute(&conn)? .ok_or_else(|| format!("the room = '{}' is not found", payload.room_id)) - .status(ResponseStatus::NOT_FOUND)?; + .status(ResponseStatus::NOT_FOUND)? + }; - let room_id = room.id().to_string(); - let object = vec!["rooms", &room_id, "rtcs"]; + // Authorize rtc listing. + let room_id = room.id().to_string(); + let object = vec!["rooms", &room_id, "rtcs"]; - context - .authz() - .authorize(room.audience(), reqp, object, "list") - .await? - }; + let authz_time = context + .authz() + .authorize(room.audience(), reqp, object, "list") + .await?; // Return rtc list. let mut query = db::rtc::ListQuery::new().room_id(payload.room_id); @@ -202,7 +211,10 @@ impl RequestHandler for ListHandler { let limit = std::cmp::min(payload.limit.unwrap_or_else(|| MAX_LIMIT), MAX_LIMIT); query = query.limit(limit); - let rtcs = query.execute(&conn)?; + let rtcs = { + let conn = context.db().get()?; + query.execute(&conn)? + }; Ok(Box::new(stream::once(shared::build_response( ResponseStatus::OK, @@ -234,36 +246,38 @@ impl RequestHandler for ConnectHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - // Authorize connecting to the rtc. - let authz_time = { - let room = db::room::FindQuery::new() + db::room::FindQuery::new() .time(db::room::now()) .rtc_id(payload.id) .execute(&conn)? .ok_or_else(|| format!("a room for the rtc = '{}' is not found", payload.id)) - .status(ResponseStatus::NOT_FOUND)?; - - if room.backend() != db::room::RoomBackend::Janus { - return Err(format!( - "'rtc.connect' is not implemented for the backend = '{}'.", - room.backend(), - )) - .status(ResponseStatus::NOT_IMPLEMENTED); - } + .status(ResponseStatus::NOT_FOUND)? + }; - let rtc_id = payload.id.to_string(); - let room_id = room.id().to_string(); - let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; + // Authorize connecting to the rtc. + if room.backend() != db::room::RoomBackend::Janus { + return Err(format!( + "'rtc.connect' is not implemented for the backend = '{}'.", + room.backend(), + )) + .status(ResponseStatus::NOT_IMPLEMENTED); + } - context - .authz() - .authorize(room.audience(), reqp, object, "read") - .await? - }; + let rtc_id = payload.id.to_string(); + let room_id = room.id().to_string(); + let object = vec!["rooms", &room_id, "rtcs", &rtc_id]; + + let authz_time = context + .authz() + .authorize(room.audience(), reqp, object, "read") + .await?; let backend = { + let conn = context.db().get()?; + // If there is an active stream choose its backend since Janus doesn't support // clustering so all agents within one rtc must be sent to the same node. If there's no // active stream it means we're connecting as publisher and going to create it. diff --git a/src/app/endpoint/rtc_stream.rs b/src/app/endpoint/rtc_stream.rs index 5271189d..b080e837 100644 --- a/src/app/endpoint/rtc_stream.rs +++ b/src/app/endpoint/rtc_stream.rs @@ -43,33 +43,33 @@ impl RequestHandler for ListHandler { reqp: &IncomingRequestProperties, start_timestamp: DateTime, ) -> Result { - let conn = context.db().get()?; + let room = { + let conn = context.db().get()?; - let authz_time = { - let room = db::room::FindQuery::new() + db::room::FindQuery::new() .time(db::room::now()) .id(payload.room_id) .execute(&conn)? .ok_or_else(|| format!("the room = '{}' is not found", payload.room_id)) - .status(ResponseStatus::NOT_FOUND)?; + .status(ResponseStatus::NOT_FOUND)? + }; - if room.backend() != db::room::RoomBackend::Janus { - let err = format!( - "'rtc_stream.list' is not implemented for the backend = '{}'", - room.backend() - ); + if room.backend() != db::room::RoomBackend::Janus { + let err = format!( + "'rtc_stream.list' is not implemented for the backend = '{}'", + room.backend() + ); - return Err(err).status(ResponseStatus::NOT_IMPLEMENTED)?; - } + return Err(err).status(ResponseStatus::NOT_IMPLEMENTED)?; + } - let room_id = room.id().to_string(); - let object = vec!["rooms", &room_id, "rtcs"]; + let room_id = room.id().to_string(); + let object = vec!["rooms", &room_id, "rtcs"]; - context - .authz() - .authorize(room.audience(), reqp, object, "list") - .await? - }; + let authz_time = context + .authz() + .authorize(room.audience(), reqp, object, "list") + .await?; let mut query = db::janus_rtc_stream::ListQuery::new().room_id(payload.room_id); @@ -87,7 +87,10 @@ impl RequestHandler for ListHandler { query = query.limit(std::cmp::min(payload.limit.unwrap_or(MAX_LIMIT), MAX_LIMIT)); - let rtc_streams = query.execute(&conn)?; + let rtc_streams = { + let conn = context.db().get()?; + query.execute(&conn)? + }; Ok(Box::new(stream::once(shared::build_response( ResponseStatus::OK, diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index def93b52..fd00d9e1 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -74,19 +74,22 @@ impl EventHandler for CreateHandler { // Find room. let room_id = payload.try_room_id()?; - let conn = context.db().get()?; - db::room::FindQuery::new() - .id(room_id) - .time(db::room::now()) - .execute(&conn)? - .ok_or_else(|| format!("the room = '{}' is not found or closed", room_id)) - .status(ResponseStatus::NOT_FOUND)?; - - // Update agent state to `ready`. - db::agent::UpdateQuery::new(&payload.subject, room_id) - .status(db::agent::Status::Ready) - .execute(&conn)?; + { + let conn = context.db().get()?; + + db::room::FindQuery::new() + .id(room_id) + .time(db::room::now()) + .execute(&conn)? + .ok_or_else(|| format!("the room = '{}' is not found or closed", room_id)) + .status(ResponseStatus::NOT_FOUND)?; + + // Update agent state to `ready`. + db::agent::UpdateQuery::new(&payload.subject, room_id) + .status(db::agent::Status::Ready) + .execute(&conn)?; + } // Send broadcast notification that the agent has entered the room. let outgoing_event_payload = RoomEnterLeaveEvent {