diff --git a/Cargo.lock b/Cargo.lock index f1737751..df8a0481 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,7 +185,7 @@ dependencies = [ "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", - "svc-agent 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", + "svc-agent 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authn 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authz 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "svc-error 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1947,7 +1947,7 @@ dependencies = [ [[package]] name = "svc-agent" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1959,6 +1959,7 @@ dependencies = [ "serde_derive 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authn 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2006,7 +2007,7 @@ dependencies = [ "sentry 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)", - "svc-agent 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", + "svc-agent 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authn 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authz 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2430,6 +2431,15 @@ dependencies = [ "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "uuid" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "vcpkg" version = "0.2.6" @@ -2861,7 +2871,7 @@ dependencies = [ "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum string 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d24114bfcceb867ca7f71a0d3fe45d45619ec47a6fbfa98cb14e14250bfa5d6d" "checksum surf 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "741a8008f8a833ef16f47df94a30754478fb2c2bf822b9c2e6f7f09203b97ace" -"checksum svc-agent 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b62577cf1c5930bc7d7d2328592c78956968999f7771ee1ec3dd9c2fbbde4e6c" +"checksum svc-agent 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ad8e3a6772cb9a49701e00169a1332d35351cf68398fff8b429e2ec009968d2e" "checksum svc-authn 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "155857a0822913ba103a75f043e2e8594e78d071fe4440b443870e7559681664" "checksum svc-authz 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "437d2f44d68fe60946c17de439fafa10831c67f1ae0ce7bd1a6b98e76160adc3" "checksum svc-error 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "a698f5401ae58a79c3e8c7345a43427bb7f53c9a9dac094f8c0e2e7b2374fd16" @@ -2910,6 +2920,7 @@ dependencies = [ "checksum utf8-ranges 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9d50aa7650df78abf942826607c62468ce18d9019673d4a2ebe1865dbb96ffde" "checksum uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e1436e58182935dcd9ce0add9ea0b558e8a87befe01c1a301e6020aeb0876363" "checksum uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)" = "90dbc611eb48397705a6b0f6e917da23ae517e4d127123d2cf7674206627d32a" +"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" "checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" diff --git a/Cargo.toml b/Cargo.toml index 359c15f2..12f35899 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,4 @@ svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-a [dev-dependencies] rand = "0.7" + diff --git a/src/app/endpoint/message.rs b/src/app/endpoint/message.rs index e78d96fa..9662ac09 100644 --- a/src/app/endpoint/message.rs +++ b/src/app/endpoint/message.rs @@ -136,6 +136,7 @@ impl State { reqp.correlation_data(), long_term_timing, short_term_timing, + inresp.properties().tracking().clone(), ); let payload = inresp.payload(); diff --git a/src/app/endpoint/rtc_stream.rs b/src/app/endpoint/rtc_stream.rs index 3484b76c..73b22577 100644 --- a/src/app/endpoint/rtc_stream.rs +++ b/src/app/endpoint/rtc_stream.rs @@ -2,7 +2,7 @@ use chrono::{DateTime, Utc}; use serde_derive::Deserialize; use svc_agent::mqtt::{ IncomingRequest, OutgoingEvent, OutgoingEventProperties, ResponseStatus, - ShortTermTimingProperties, + ShortTermTimingProperties, TrackingProperties, }; use svc_error::Error as SvcError; use uuid::Uuid; @@ -117,11 +117,13 @@ pub(crate) fn update_event( room_id: Uuid, object: janus_rtc_stream::Object, start_timestamp: DateTime, + tracking: &TrackingProperties, ) -> Result { let uri = format!("rooms/{}/events", room_id); let timing = ShortTermTimingProperties::until_now(start_timestamp); - let props = OutgoingEventProperties::new("rtc_stream.update", timing); + let mut props = OutgoingEventProperties::new("rtc_stream.update", timing); + props.set_tracking(tracking.to_owned()); Ok(OutgoingEvent::broadcast(object, props, &uri)) } diff --git a/src/app/endpoint/shared.rs b/src/app/endpoint/shared.rs index 57d5a178..c21a0c40 100644 --- a/src/app/endpoint/shared.rs +++ b/src/app/endpoint/shared.rs @@ -30,7 +30,8 @@ pub(crate) fn respond( let mut messages: Vec> = vec![Box::new(resp)]; if let Some((label, topic)) = notification { - let props = OutgoingEventProperties::new(label, short_term_timing); + let mut props = OutgoingEventProperties::new(label, short_term_timing); + props.set_tracking(inreq.properties().tracking().to_owned()); messages.push(Box::new(OutgoingEvent::broadcast(object, props, topic))); } diff --git a/src/app/endpoint/system.rs b/src/app/endpoint/system.rs index 1548cc2d..31ace2ff 100644 --- a/src/app/endpoint/system.rs +++ b/src/app/endpoint/system.rs @@ -5,7 +5,7 @@ use failure::Error; use serde_derive::{Deserialize, Serialize}; use svc_agent::mqtt::{ IncomingRequest, OutgoingEvent, OutgoingEventProperties, Publishable, ResponseStatus, - ShortTermTimingProperties, + ShortTermTimingProperties, TrackingProperties, }; use svc_authn::AccountId; use svc_error::Error as SvcError; @@ -103,6 +103,7 @@ impl State { ), backend.id(), start_timestamp, + inreq.properties().tracking(), ) .map_err(|_| { // TODO: Send the error as an event to "app/${APP}/audiences/${AUD}" topic @@ -126,6 +127,7 @@ pub(crate) fn upload_event( room: &room::Object, rtcs_and_recordings: I, start_timestamp: DateTime, + tracking: &TrackingProperties, ) -> Result where I: Iterator, @@ -152,7 +154,8 @@ where let uri = format!("audiences/{}/events", room.audience()); let timing = ShortTermTimingProperties::until_now(start_timestamp); - let props = OutgoingEventProperties::new("room.upload", timing); + let mut props = OutgoingEventProperties::new("room.upload", timing); + props.set_tracking(tracking.to_owned()); let event = RoomUploadEventData { id: room.id(), diff --git a/src/app/janus.rs b/src/app/janus.rs index 92596c1e..8b017b9e 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use svc_agent::mqtt::{ compat::{into_event, IncomingEnvelope}, IncomingRequestProperties, OutgoingRequest, OutgoingRequestProperties, Publishable, - ResponseStatus, ShortTermTimingProperties, + ResponseStatus, ShortTermTimingProperties, TrackingProperties, }; use svc_agent::{Addressable, AgentId}; use svc_error::Error as SvcError; @@ -54,6 +54,7 @@ impl CreateSessionTransaction { pub(crate) fn create_session_request( to: &A, start_timestamp: DateTime, + tracking: &TrackingProperties, ) -> Result, Error> where A: Addressable, @@ -61,13 +62,14 @@ where let transaction = Transaction::CreateSession(CreateSessionTransaction::new()); let payload = CreateSessionRequest::new(&to_base64(&transaction)?); - let props = OutgoingRequestProperties::new( + let mut props = OutgoingRequestProperties::new( "janus_session.create", IGNORE, IGNORE, ShortTermTimingProperties::until_now(start_timestamp), ); + props.set_tracking(tracking.to_owned()); Ok(OutgoingRequest::unicast(payload, props, to)) } @@ -88,6 +90,7 @@ pub(crate) fn create_handle_request( session_id: i64, to: &A, start_timestamp: DateTime, + tracking: &TrackingProperties, ) -> Result, Error> where A: Addressable, @@ -101,12 +104,14 @@ where None, ); - let props = OutgoingRequestProperties::new( + let mut props = OutgoingRequestProperties::new( "janus_handle.create", IGNORE, IGNORE, ShortTermTimingProperties::until_now(start_timestamp), ); + + props.set_tracking(tracking.to_owned()); Ok(OutgoingRequest::unicast(payload, props, to)) } @@ -342,6 +347,7 @@ pub(crate) fn upload_stream_request( body: UploadStreamRequestBody, to: &AgentId, start_timestamp: DateTime, + tracking: &TrackingProperties, ) -> Result, Error> { let transaction = Transaction::UploadStream(UploadStreamTransaction::new(body.id)); @@ -353,13 +359,14 @@ pub(crate) fn upload_stream_request( None, ); - let props = OutgoingRequestProperties::new( + let mut props = OutgoingRequestProperties::new( "janus_conference_stream.upload", IGNORE, IGNORE, ShortTermTimingProperties::until_now(start_timestamp), ); + props.set_tracking(tracking.to_owned()); Ok(OutgoingRequest::unicast(payload, props, to)) } @@ -423,11 +430,14 @@ pub(crate) async fn handle_response( match from_base64::(&inresp.transaction())? { // Session has been created Transaction::CreateSession(_tn) => { - let session_id = inresp.data().id(); - // Creating Handle - let backreq = - create_handle_request(session_id, message.properties(), start_timestamp)?; + let backreq = create_handle_request( + inresp.data().id(), + message.properties(), + start_timestamp, + message.properties().tracking(), + )?; + Ok(vec![Box::new(backreq) as Box]) } // Handle has been created @@ -776,8 +786,14 @@ pub(crate) async fn handle_response( match maybe_rtcs_and_recordings { Some(rtcs_and_recordings) => { - let event = endpoint::system::upload_event(&room, rtcs_and_recordings.into_iter(), start_timestamp) - .map_err(|e| format_err!("error creating a system event, {}", e))?; + let event = endpoint::system::upload_event( + &room, + rtcs_and_recordings.into_iter(), + start_timestamp, + message.properties().tracking(), + ).map_err(|e| { + format_err!("error creating a system event, {}", e) + })?; Ok(vec![Box::new(event) as Box]) } @@ -824,8 +840,12 @@ pub(crate) async fn handle_response( .execute(&conn)? .ok_or_else(|| format_err!("a room for rtc = '{}' is not found", &rtc_id))?; - let event = - endpoint::rtc_stream::update_event(room.id(), rtc_stream, start_timestamp)?; + let event = endpoint::rtc_stream::update_event( + room.id(), + rtc_stream, + start_timestamp, + message.properties().tracking(), + )?; Ok(vec![Box::new(event) as Box]) } else { @@ -851,8 +871,12 @@ pub(crate) async fn handle_response( // Publish the update event if only stream object has been changed // (if there was't any actual media stream, the object won't contain its start time) if let Some(_) = rtc_stream.time() { - let event = - endpoint::rtc_stream::update_event(room.id(), rtc_stream, start_timestamp)?; + let event = endpoint::rtc_stream::update_event( + room.id(), + rtc_stream, + start_timestamp, + message.properties().tracking(), + )?; return Ok(vec![Box::new(event) as Box]); } @@ -887,7 +911,8 @@ pub(crate) async fn handle_status( let agent_id = inev.properties().as_agent_id(); if let true = inev.payload().online() { - let event = create_session_request(agent_id, start_timestamp)?; + let tracking = inev.properties().tracking(); + let event = create_session_request(agent_id, start_timestamp, tracking)?; Ok(vec![Box::new(event) as Box]) } else { let conn = janus.db.get()?; diff --git a/src/test_helpers/agent.rs b/src/test_helpers/agent.rs index 508dc8c6..b6dfad89 100644 --- a/src/test_helpers/agent.rs +++ b/src/test_helpers/agent.rs @@ -67,6 +67,8 @@ impl TestAgent { "broker_timestamp": now, "broker_processing_timestamp": now, "broker_initial_processing_timestamp": now, + "tracking_id": "16911d40-0b13-11ea-8171-60f81db6d53e", + "session_tracking_label": "16cc4294-0b13-11ea-91ae-60f81db6d53e.16ee876e-0b13-11ea-8c32-60f81db6d53e 2565f962-0b13-11ea-9359-60f81db6d53e.25c2b97c-0b13-11ea-9f20-60f81db6d53e", } }); @@ -100,6 +102,8 @@ impl TestAgent { "broker_timestamp": now, "broker_processing_timestamp": now, "broker_initial_processing_timestamp": now, + "tracking_id": "16911d40-0b13-11ea-8171-60f81db6d53e", + "session_tracking_label": "16cc4294-0b13-11ea-91ae-60f81db6d53e.16ee876e-0b13-11ea-8c32-60f81db6d53e 2565f962-0b13-11ea-9359-60f81db6d53e.25c2b97c-0b13-11ea-9f20-60f81db6d53e", } });