diff --git a/migrations/2020-09-11-163557_move_backend_id_from_stream_to_recording/down.sql b/migrations/2020-09-11-163557_move_backend_id_from_stream_to_recording/down.sql new file mode 100644 index 00000000..62f16256 --- /dev/null +++ b/migrations/2020-09-11-163557_move_backend_id_from_stream_to_recording/down.sql @@ -0,0 +1,6 @@ +DROP INDEX recording_rtc_id_idx; + +ALTER TABLE janus_rtc_stream +DROP CONSTRAINT janus_rtc_stream_backend_id_fkey; + +ALTER TABLE recording DROP COLUMN backend_id; diff --git a/migrations/2020-09-11-163557_move_backend_id_from_stream_to_recording/up.sql b/migrations/2020-09-11-163557_move_backend_id_from_stream_to_recording/up.sql new file mode 100644 index 00000000..8913b47e --- /dev/null +++ b/migrations/2020-09-11-163557_move_backend_id_from_stream_to_recording/up.sql @@ -0,0 +1,16 @@ +ALTER TABLE recording ADD COLUMN backend_id agent_id NULL; +UPDATE recording SET backend_id = '("(janus,svc.netology-group.services)",unknown)'::agent_id; +ALTER TABLE recording ALTER COLUMN backend_id SET NOT NULL; + +UPDATE recording AS rec +SET backend_id = jrs.backend_id +FROM janus_rtc_stream AS jrs +WHERE jrs.rtc_id = rec.rtc_id; + +ALTER TABLE janus_rtc_stream +ADD CONSTRAINT janus_rtc_stream_backend_id_fkey +FOREIGN KEY (backend_id) +REFERENCES janus_backend(id) +ON DELETE CASCADE; + +CREATE UNIQUE INDEX recording_rtc_id_idx ON recording (rtc_id); diff --git a/src/app/endpoint/metric.rs b/src/app/endpoint/metric.rs index 523334f2..7c341bb1 100644 --- a/src/app/endpoint/metric.rs +++ b/src/app/endpoint/metric.rs @@ -199,3 +199,32 @@ fn append_janus_stats( Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Deserialize)] + struct DynamicMetric { + metric: String, + value: u64, + timestamp: DateTime, + } + + #[test] + fn serialize_dynamic_metric() { + let now = Utc::now(); + + let json = serde_json::json!(Metric::Dynamic { + key: String::from("example"), + value: MetricValue::new(123, now), + }); + + let parsed: DynamicMetric = + serde_json::from_str(&json.to_string()).expect("Failed to parse json"); + + assert_eq!(&parsed.metric, "apps.conference.example_total"); + assert_eq!(parsed.value, 123); + assert_eq!(parsed.timestamp, now); + } +} diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index e2b01d67..5537b365 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -324,6 +324,12 @@ impl RequestHandler for ConnectHandler { .status(ResponseStatus::SERVICE_UNAVAILABLE)?, }; + // Create recording if a writer connects for the first time. + if payload.intent == ConnectIntent::Write && maybe_rtc_stream.is_none() { + db::recording::InsertQuery::new(payload.id, backend.id()) + .execute(&conn)?; + } + // Check that the backend's capacity is not exceeded for readers. if payload.intent == ConnectIntent::Read && db::janus_backend::free_capacity(payload.id, &conn)? == 0 diff --git a/src/app/endpoint/system.rs b/src/app/endpoint/system.rs index 83e23499..f2853143 100644 --- a/src/app/endpoint/system.rs +++ b/src/app/endpoint/system.rs @@ -74,24 +74,22 @@ impl RequestHandler for VacuumHandler { let mut requests = Vec::new(); let conn = context.db().get()?; - let rooms = db::room::finished_without_recordings(&conn)?; + let rooms = db::room::finished_with_in_progress_recordings(&conn)?; - for (room, rtc, backend) in rooms.into_iter() { + for (room, recording, backend) in rooms.into_iter() { db::agent::DeleteQuery::new() .room_id(room.id()) .execute(&conn)?; - db::recording::InsertQuery::new(rtc.id()).execute(&conn)?; - // TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic let backreq = janus::upload_stream_request( reqp, backend.session_id(), backend.handle_id(), janus::UploadStreamRequestBody::new( - rtc.id(), + recording.rtc_id(), &bucket_name(&room), - &record_name(&rtc), + &record_name(&recording), ), backend.id(), context.agent_id(), @@ -111,28 +109,30 @@ impl RequestHandler for VacuumHandler { pub(crate) fn upload_event( room: &db::room::Object, - rtcs_and_recordings: I, + recordings: I, start_timestamp: DateTime, tracking: &TrackingProperties, ) -> anyhow::Result where - I: Iterator, + I: Iterator, { let mut event_entries = Vec::new(); - for (rtc, recording) in rtcs_and_recordings { + for recording in recordings { let uri = match recording.status() { RecordingStatus::InProgress => bail!( "Unexpected recording in in_progress status, rtc_id = '{}'", recording.rtc_id() ), RecordingStatus::Missing => None, - RecordingStatus::Ready => { - Some(format!("s3://{}/{}", bucket_name(&room), record_name(&rtc))) - } + RecordingStatus::Ready => Some(format!( + "s3://{}/{}", + bucket_name(&room), + record_name(&recording) + )), }; let entry = RtcUploadEventData { - id: rtc.id(), + id: recording.rtc_id(), status: recording.status().to_owned(), uri, segments: recording.segments().to_owned(), @@ -159,8 +159,8 @@ fn bucket_name(room: &db::room::Object) -> String { format!("origin.webinar.{}", room.audience()) } -fn record_name(rtc: &db::rtc::Object) -> String { - format!("{}.source.webm", rtc.id()) +fn record_name(recording: &db::recording::Object) -> String { + format!("{}.source.webm", recording.rtc_id()) } /////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/janus.rs b/src/app/janus.rs index 9555e165..229f3edf 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -946,7 +946,7 @@ async fn handle_response_impl( .collect()) })?; - let (room, rtcs, recs) = { + let (room, rtcs, recs): (room::Object, Vec, Vec) = { let conn = context.db().get()?; recording::UpdateQuery::new(rtc_id) @@ -980,52 +980,40 @@ async fn handle_response_impl( // TODO: move to db module use diesel::prelude::*; let rtcs = rtc::Object::belonging_to(&room).load(&conn)?; - let recs = recording::Object::belonging_to(&rtcs).load(&conn)?.grouped_by(&rtcs); + let recs = recording::Object::belonging_to(&rtcs).load(&conn)?; (room, rtcs, recs) }; - let maybe_rtcs_and_recordings: Option> = rtcs - .into_iter() - .zip(recs) - .map(|(rtc, rtc_recs)| { - if rtc_recs.len() > 1 { - warn!( - "there must be at most 1 recording for an rtc, got {} for the room = '{}', rtc = '{}'; using the first one, ignoring the rest", - rtc_recs.len(), - room.id(), - rtc.id(), - ); - } - - rtc_recs.into_iter().next().map(|rec| (rtc, rec)) - }) - .collect(); - - match maybe_rtcs_and_recordings { - Some(rtcs_and_recordings) => { - let event = endpoint::system::upload_event( - &room, - rtcs_and_recordings.into_iter(), - start_timestamp, - respp.tracking(), - ) - .map_err(|e| format!("error creating a system event, {}", e)) - .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; + // Ensure that all rtcs have a recording. + let rtc_ids_with_recs = recs + .iter() + .map(|rec| rec.rtc_id()) + .collect::>(); - let event_box = Box::new(event) as Box; - Ok(Box::new(stream::once(event_box)) as MessageStream) - } - None => { - // Waiting for all the room's rtc being uploaded + for rtc in rtcs { + if !rtc_ids_with_recs.contains(&rtc.id()) { info!( "postpone 'room.upload' event because still waiting for rtcs being uploaded for the room = '{}'", room.id(), ); - Ok(Box::new(stream::empty()) as MessageStream) + return Ok(Box::new(stream::empty()) as MessageStream); } } + + // Send room.upload event. + let event = endpoint::system::upload_event( + &room, + recs.into_iter(), + start_timestamp, + respp.tracking(), + ) + .map_err(|e| format!("error creating a system event, {}", e)) + .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; + + let event_box = Box::new(event) as Box; + Ok(Box::new(stream::once(event_box)) as MessageStream) }) } // An unsupported incoming Event message has been received diff --git a/src/db/janus_backend.rs b/src/db/janus_backend.rs index 53632e12..c8b877cd 100644 --- a/src/db/janus_backend.rs +++ b/src/db/janus_backend.rs @@ -6,14 +6,15 @@ use uuid::Uuid; use crate::schema::janus_backend; -type AllColumns = ( +pub(crate) type AllColumns = ( janus_backend::id, janus_backend::handle_id, janus_backend::session_id, janus_backend::created_at, janus_backend::capacity, ); -pub const ALL_COLUMNS: AllColumns = ( + +pub(crate) const ALL_COLUMNS: AllColumns = ( janus_backend::id, janus_backend::handle_id, janus_backend::session_id, diff --git a/src/db/recording.rs b/src/db/recording.rs index e684867e..cfb46935 100644 --- a/src/db/recording.rs +++ b/src/db/recording.rs @@ -4,13 +4,33 @@ use std::ops::Bound; use chrono::{DateTime, Utc}; use diesel::{pg::PgConnection, result::Error}; use serde_derive::{Deserialize, Serialize}; +use svc_agent::AgentId; use uuid::Uuid; +use super::janus_backend::Object as JanusBackend; use super::rtc::Object as Rtc; use crate::schema::recording; //////////////////////////////////////////////////////////////////////////////// +pub(crate) type AllColumns = ( + recording::rtc_id, + recording::started_at, + recording::segments, + recording::status, + recording::backend_id, +); + +pub(crate) const ALL_COLUMNS: AllColumns = ( + recording::rtc_id, + recording::started_at, + recording::segments, + recording::status, + recording::backend_id, +); + +//////////////////////////////////////////////////////////////////////////////// + pub(crate) type Segment = (Bound, Bound); #[derive(Clone, Copy, Debug, DbEnum, Deserialize, Serialize, PartialEq)] @@ -35,6 +55,7 @@ impl fmt::Display for Status { #[derive(Debug, Serialize, Identifiable, Associations, Queryable)] #[belongs_to(Rtc, foreign_key = "rtc_id")] +#[belongs_to(JanusBackend, foreign_key = "backend_id")] #[primary_key(rtc_id)] #[table_name = "recording"] pub(crate) struct Object { @@ -43,6 +64,7 @@ pub(crate) struct Object { started_at: Option>, segments: Option>, status: Status, + backend_id: AgentId, } impl Object { @@ -67,13 +89,14 @@ impl Object { #[derive(Debug, Insertable)] #[table_name = "recording"] -pub(crate) struct InsertQuery { +pub(crate) struct InsertQuery<'a> { rtc_id: Uuid, + backend_id: &'a AgentId, } -impl InsertQuery { - pub(crate) fn new(rtc_id: Uuid) -> Self { - Self { rtc_id } +impl<'a> InsertQuery<'a> { + pub(crate) fn new(rtc_id: Uuid, backend_id: &'a AgentId) -> Self { + Self { rtc_id, backend_id } } pub(crate) fn execute(self, conn: &PgConnection) -> Result { diff --git a/src/db/room.rs b/src/db/room.rs index d207faf7..6975fae5 100644 --- a/src/db/room.rs +++ b/src/db/room.rs @@ -8,6 +8,8 @@ use diesel::result::Error; use serde_derive::{Deserialize, Serialize}; use uuid::Uuid; +use crate::db::janus_backend::Object as JanusBackend; +use crate::db::recording::{Object as Recording, Status as RecordingStatus}; use crate::schema::{room, rtc}; //////////////////////////////////////////////////////////////////////////////// @@ -184,40 +186,29 @@ impl FindQuery { // room1 | rtc2 | recording1 -> room1 | rtc2 | recording1 // room2 | rtc3 | recording2 room2 | rtc3 | recording2 // room3 | rtc4 | null room3 | null | null -pub(crate) fn finished_without_recordings( +pub(crate) fn finished_with_in_progress_recordings( conn: &PgConnection, -) -> Result< - Vec<( - self::Object, - super::rtc::Object, - super::janus_backend::Object, - )>, - Error, -> { +) -> Result, Error> { use crate::schema; use diesel::{dsl::sql, prelude::*}; schema::room::table .inner_join( - schema::rtc::table - .left_join(schema::recording::table) - .left_join( - schema::janus_rtc_stream::table - .inner_join(schema::janus_backend::table.on( - schema::janus_backend::id.eq(schema::janus_rtc_stream::backend_id), - )), + schema::rtc::table.inner_join( + schema::recording::table.inner_join( + schema::janus_backend::table + .on(schema::janus_backend::id.eq(schema::recording::backend_id)), ), + ), ) - .filter(room::backend.ne(RoomBackend::None)) - .filter(schema::recording::rtc_id.is_null()) + .filter(room::backend.eq(RoomBackend::Janus)) .filter(sql("upper(\"room\".\"time\") < now()")) - .filter(schema::janus_backend::id.is_not_null()) + .filter(schema::recording::status.eq(RecordingStatus::InProgress)) .select(( self::ALL_COLUMNS, - super::rtc::ALL_COLUMNS, + super::recording::ALL_COLUMNS, super::janus_backend::ALL_COLUMNS, )) - .distinct_on(room::id) .load(conn) } @@ -330,7 +321,7 @@ impl UpdateQuery { #[cfg(test)] mod tests { - mod finished_without_recordings { + mod finished_with_in_progress_recordings { use super::super::*; use crate::test_helpers::prelude::*; @@ -356,15 +347,14 @@ mod tests { let rtc2 = shared_helpers::insert_rtc_with_room(&conn, &room2); let _rtc3 = shared_helpers::insert_rtc_with_room(&conn, &room3); - shared_helpers::insert_janus_rtc_stream(&conn, &backend1, &rtc1); - // we insert two rtc_streams to simulate stream stop & start - // this should not affect number of returned rooms (there was a bug when it did) - shared_helpers::insert_janus_rtc_stream(&conn, &backend2, &rtc2); - shared_helpers::insert_janus_rtc_stream(&conn, &backend2, &rtc2); + shared_helpers::insert_recording(&conn, &rtc1, &backend1); + shared_helpers::insert_recording(&conn, &rtc2, &backend2); + + let rooms = finished_with_in_progress_recordings(&conn) + .expect("finished_with_in_progress_recordings call failed"); - let rooms = finished_without_recordings(&conn) - .expect("finished_without_recordings call failed"); assert_eq!(rooms.len(), 2); + // order of rooms is not specified so we check that its [(room1, _, backend1), (room2, _, backend2)] in any order if rooms[0].0.id() == room1.id() { assert_eq!(rooms[0].0.id(), room1.id()); diff --git a/src/db/rtc.rs b/src/db/rtc.rs index 6566f0a2..f4b20012 100644 --- a/src/db/rtc.rs +++ b/src/db/rtc.rs @@ -9,12 +9,6 @@ use crate::schema::rtc; //////////////////////////////////////////////////////////////////////////////// -pub(crate) type AllColumns = (rtc::id, rtc::room_id, rtc::created_at); - -pub(crate) const ALL_COLUMNS: AllColumns = (rtc::id, rtc::room_id, rtc::created_at); - -//////////////////////////////////////////////////////////////////////////////// - #[derive( Clone, Debug, Serialize, Deserialize, Identifiable, Queryable, QueryableByName, Associations, )] diff --git a/src/schema.rs b/src/schema.rs index 331f13c8..2ef51c00 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -61,6 +61,7 @@ table! { started_at -> Nullable, segments -> Nullable>, status -> Recording_status, + backend_id -> Agent_id, } } @@ -91,6 +92,7 @@ table! { joinable!(agent -> room (room_id)); joinable!(agent_stream -> agent (sent_by)); +joinable!(janus_rtc_stream -> janus_backend (backend_id)); joinable!(janus_rtc_stream -> rtc (rtc_id)); joinable!(recording -> rtc (rtc_id)); joinable!(rtc -> room (room_id)); diff --git a/src/test_helpers/factory.rs b/src/test_helpers/factory.rs index aa212c99..6a6b1bb6 100644 --- a/src/test_helpers/factory.rs +++ b/src/test_helpers/factory.rs @@ -262,3 +262,59 @@ impl<'a> JanusRtcStream<'a> { .expect("Failed to insert janus_rtc_stream") } } + +/////////////////////////////////////////////////////////////////////////////// + +pub(crate) struct Recording<'a> { + rtc: Option<&'a db::rtc::Object>, + backend: Option<&'a db::janus_backend::Object>, +} + +impl<'a> Recording<'a> { + pub(crate) fn new() -> Self { + Self { + rtc: None, + backend: None, + } + } + + pub(crate) fn rtc(self, rtc: &'a db::rtc::Object) -> Self { + Self { + rtc: Some(rtc), + ..self + } + } + + pub(crate) fn backend(self, backend: &'a db::janus_backend::Object) -> Self { + Self { + backend: Some(backend), + ..self + } + } + + pub(crate) fn insert(&self, conn: &PgConnection) -> db::recording::Object { + let default_rtc; + + let rtc = match self.rtc { + Some(value) => value, + None => { + default_rtc = insert_rtc(conn); + &default_rtc + } + }; + + let default_backend; + + let backend = match self.backend { + Some(value) => value, + None => { + default_backend = insert_janus_backend(conn); + &default_backend + } + }; + + db::recording::InsertQuery::new(rtc.id(), backend.id()) + .execute(conn) + .expect("Failed to insert recording") + } +} diff --git a/src/test_helpers/shared_helpers.rs b/src/test_helpers/shared_helpers.rs index 4329e08c..77c434ef 100644 --- a/src/test_helpers/shared_helpers.rs +++ b/src/test_helpers/shared_helpers.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use crate::db::agent::{Object as Agent, Status as AgentStatus}; use crate::db::janus_backend::Object as JanusBackend; -use crate::db::janus_rtc_stream::Object as JanusRtcStream; +use crate::db::recording::Object as Recording; use crate::db::room::{Object as Room, RoomBackend}; use crate::db::rtc::Object as Rtc; @@ -69,13 +69,13 @@ pub(crate) fn insert_rtc_with_room(conn: &PgConnection, room: &Room) -> Rtc { factory::Rtc::new(room.id()).insert(conn) } -pub(crate) fn insert_janus_rtc_stream( +pub(crate) fn insert_recording( conn: &PgConnection, - backend: &JanusBackend, rtc: &Rtc, -) -> JanusRtcStream { - factory::JanusRtcStream::new(USR_AUDIENCE) - .backend(backend) + backend: &JanusBackend, +) -> Recording { + factory::Recording::new() .rtc(rtc) + .backend(backend) .insert(conn) }