Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Delete agents of closed rooms on vacuum
Browse files Browse the repository at this point in the history
  • Loading branch information
feymartynov committed Aug 20, 2020
1 parent f560d45 commit 8797737
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
6 changes: 5 additions & 1 deletion src/app/endpoint/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ impl EventHandler for DeleteHandler {
// Delete agent from the DB.
let room_id = payload.try_room_id()?;
let conn = context.db().get()?;
let row_count = db::agent::DeleteQuery::new(&payload.subject, room_id).execute(&conn)?;

let row_count = db::agent::DeleteQuery::new()
.agent_id(&payload.subject)
.room_id(room_id)
.execute(&conn)?;

if row_count == 1 {
// Send broadcast notification that the agent has left the room.
Expand Down
27 changes: 20 additions & 7 deletions src/app/endpoint/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ impl RequestHandler for VacuumHandler {
.await?;

let mut requests = Vec::new();

let rooms = {
let conn = context.db().get()?;
db::room::finished_without_recordings(&conn)?
};
let conn = context.db().get()?;
let rooms = db::room::finished_without_recordings(&conn)?;

for (room, rtc, backend) in rooms.into_iter() {
db::agent::DeleteQuery::new()
.room_id(room.id())
.execute(&conn)?;

// TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic
let backreq = janus::upload_stream_request(
reqp,
Expand Down Expand Up @@ -160,6 +161,7 @@ fn record_name(rtc: &db::rtc::Object) -> String {
mod test {
mod vacuum {
use chrono::{Duration, Utc};
use diesel::prelude::*;

use crate::backend::janus::JANUS_API_VERSION;
use crate::db;
Expand Down Expand Up @@ -203,12 +205,15 @@ mod test {
let _other_rtc = shared_helpers::insert_rtc(&conn);
let backend = shared_helpers::insert_janus_backend(&conn);

// Close rooms.
// Insert active agents and close rooms.
let start = Utc::now() - Duration::hours(2);
let finish = start + Duration::hours(1);
let time = (Bound::Included(start), Bound::Excluded(finish));
let agent = TestAgent::new("web", "user123", USR_AUDIENCE);

for rtc in rtcs.iter() {
shared_helpers::insert_agent(&conn, agent.agent_id(), rtc.room_id());

db::room::UpdateQuery::new(rtc.room_id().to_owned())
.time(time)
.execute(&conn)
Expand All @@ -231,8 +236,10 @@ mod test {
.await
.expect("System vacuum failed");

// Assert outgoing Janus stream.upload requests.
let conn = context.db().get().unwrap();

for (message, rtc) in messages.into_iter().zip(rtcs.iter()) {
// Assert outgoing Janus stream.upload requests.
match message.properties() {
OutgoingEnvelopeProperties::Request(_) => (),
_ => panic!("Expected outgoing request"),
Expand Down Expand Up @@ -262,6 +269,12 @@ mod test {
}
}
);

// Assert deleted active agents.
let query = crate::schema::agent::table
.filter(crate::schema::agent::room_id.eq(rtc.room_id()));

assert_eq!(query.execute(&conn).unwrap(), 0);
}
});
}
Expand Down
39 changes: 31 additions & 8 deletions src/db/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,45 @@ impl<'a> UpdateQuery<'a> {
///////////////////////////////////////////////////////////////////////////////

pub(crate) struct DeleteQuery<'a> {
agent_id: &'a AgentId,
room_id: Uuid,
agent_id: Option<&'a AgentId>,
room_id: Option<Uuid>,
}

impl<'a> DeleteQuery<'a> {
pub(crate) fn new(agent_id: &'a AgentId, room_id: Uuid) -> Self {
Self { agent_id, room_id }
pub(crate) fn new() -> Self {
Self {
agent_id: None,
room_id: None,
}
}

pub(crate) fn agent_id(self, agent_id: &'a AgentId) -> Self {
Self {
agent_id: Some(agent_id),
..self
}
}

pub(crate) fn room_id(self, room_id: Uuid) -> Self {
Self {
room_id: Some(room_id),
..self
}
}

pub(crate) fn execute(&self, conn: &PgConnection) -> Result<usize, Error> {
use diesel::prelude::*;

let query = agent::table
.filter(agent::agent_id.eq(self.agent_id))
.filter(agent::room_id.eq(self.room_id));
let mut query = diesel::delete(agent::table).into_boxed();

if let Some(agent_id) = self.agent_id {
query = query.filter(agent::agent_id.eq(agent_id));
}

if let Some(room_id) = self.room_id {
query = query.filter(agent::room_id.eq(room_id));
}

diesel::delete(query).execute(conn)
query.execute(conn)
}
}

0 comments on commit 8797737

Please sign in to comment.