diff --git a/Cargo.lock b/Cargo.lock index b31ceaf3..545b9300 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,7 +234,7 @@ dependencies = [ "serde 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.53 (registry+https://github.com/rust-lang/crates.io-index)", - "svc-agent 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", + "svc-agent 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authn 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "svc-authz 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "svc-error 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1943,10 +1943,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-channel 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "rumqtt 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.53 (registry+https://github.com/rust-lang/crates.io-index)", + "svc-authn 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "svc-agent" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures-channel 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rumqtt 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.110 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2838,6 +2858,7 @@ dependencies = [ "checksum spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" "checksum surf 2.0.0-alpha.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1640fa223ab85a28abd533ad229cc840336acb1d65a5e3f8d86daa9d8f930061" "checksum svc-agent 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8abcffbd2a13078e3cf90474756e9af6a267f804e93eef5e0885863b155d5860" +"checksum svc-agent 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b12ed3427b18b7b46f3e411a5c283936ec6c035e07e09c918b24f52fdbf5a73b" "checksum svc-authn 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a9c668ebf3bf8af0ca69d0e3453d6ddb526d80ac4af0d10d7e074518136e1c89" "checksum svc-authz 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bde07cb880c587a1271d075d4fa82b60cf3a9d5377eacdadc312c18a7c428d1e" "checksum svc-error 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "5ca1419d26952fbb917f2481b0697bc77768c0ad9bde5618fd7cefdf18fd1738" diff --git a/Cargo.toml b/Cargo.toml index 39725039..d88a797b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ rand = "0.7" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -svc-agent = { version = "0.12", features = ["diesel"] } +svc-agent = { version = "0.13", features = ["diesel"] } svc-authn = { version = "0.5", features = ["jose", "diesel"] } svc-authz = "0.10" svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] } diff --git a/src/app/endpoint/message.rs b/src/app/endpoint/message.rs index fafd3a0a..d8f9dd02 100644 --- a/src/app/endpoint/message.rs +++ b/src/app/endpoint/message.rs @@ -7,7 +7,7 @@ use diesel::pg::PgConnection; use serde_derive::Deserialize; use serde_json::{json, Value as JsonValue}; use svc_agent::mqtt::{ - IncomingRequestProperties, IncomingResponseProperties, IntoPublishableDump, OutgoingRequest, + IncomingRequestProperties, IncomingResponseProperties, IntoPublishableMessage, OutgoingRequest, OutgoingResponse, OutgoingResponseProperties, ResponseStatus, ShortTermTimingProperties, SubscriptionTopic, }; @@ -74,7 +74,7 @@ impl RequestHandler for UnicastHandler { API_VERSION, ); - let boxed_req = Box::new(req) as Box; + let boxed_req = Box::new(req) as Box; Ok(Box::new(stream::once(boxed_req))) } } @@ -157,7 +157,7 @@ impl ResponseHandler for CallbackHandler { ); let resp = OutgoingResponse::unicast(payload.to_owned(), props, &reqp, API_VERSION); - let boxed_resp = Box::new(resp) as Box; + let boxed_resp = Box::new(resp) as Box; Ok(Box::new(stream::once(boxed_resp))) } } diff --git a/src/app/endpoint/metric.rs b/src/app/endpoint/metric.rs index 3bfa3f7e..68681e7d 100644 --- a/src/app/endpoint/metric.rs +++ b/src/app/endpoint/metric.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use chrono::{serde::ts_seconds, DateTime, Utc}; use serde_derive::{Deserialize, Serialize}; use svc_agent::mqtt::{ - IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ShortTermTimingProperties, + IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ShortTermTimingProperties, }; use crate::app::context::Context; @@ -54,7 +54,8 @@ impl EventHandler for PullHandler { let props = evp.to_event("metric.create", short_term_timing); let outgoing_event = OutgoingEvent::multicast(outgoing_event_payload, props, account_id); - let boxed_event = Box::new(outgoing_event) as Box; + let boxed_event = + Box::new(outgoing_event) as Box; Ok(Box::new(stream::once(boxed_event))) } diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index 179febd8..878d584b 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -4,8 +4,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde::de::DeserializeOwned; use svc_agent::mqtt::{ - compat::IncomingEnvelope, IncomingEventProperties, IncomingRequestProperties, - IncomingResponseProperties, + IncomingEvent, IncomingEventProperties, IncomingRequest, IncomingRequestProperties, + IncomingResponse, IncomingResponseProperties, }; use svc_error::Error as SvcError; @@ -37,15 +37,14 @@ macro_rules! request_routes { ($($m: pat => $h: ty),*) => { pub(crate) async fn route_request( context: &C, - envelope: IncomingEnvelope, - reqp: &IncomingRequestProperties, + request: &IncomingRequest, _topic: &str, start_timestamp: DateTime, ) -> Option { - match reqp.method() { + match request.properties().method() { $( $m => Some( - <$h>::handle_envelope::(context, envelope, reqp, start_timestamp).await + <$h>::handle_envelope::(context, request, start_timestamp).await ), )* _ => None, @@ -90,23 +89,14 @@ pub(crate) trait ResponseHandler { pub(crate) async fn route_response( context: &C, - envelope: IncomingEnvelope, - respp: &IncomingResponseProperties, + resp: &IncomingResponse, topic: &str, start_timestamp: DateTime, ) -> Option { if topic == context.janus_topics().responses_topic() { - Some(janus::handle_response::(context, envelope, respp, start_timestamp).await) + Some(janus::handle_response::(context, resp, start_timestamp).await) } else { - Some( - message::CallbackHandler::handle_envelope::( - context, - envelope, - respp, - start_timestamp, - ) - .await, - ) + Some(message::CallbackHandler::handle_envelope::(context, resp, start_timestamp).await) } } @@ -129,20 +119,19 @@ macro_rules! event_routes { #[allow(unused_variables)] pub(crate) async fn route_event( context: &C, - envelope: IncomingEnvelope, - evp: &IncomingEventProperties, + event: &IncomingEvent, topic: &str, start_timestamp: DateTime, ) -> Option { if topic == context.janus_topics().events_topic() { - Some(janus::handle_event::(context, envelope, evp, start_timestamp).await) + Some(janus::handle_event::(context, event, start_timestamp).await) } else if topic == context.janus_topics().status_events_topic() { - Some(janus::handle_status_event::(context, envelope, evp, start_timestamp).await) + Some(janus::handle_status_event::(context, event, start_timestamp).await) } else { - match evp.label() { + match event.properties().label() { $( Some($l) => Some( - <$h>::handle_envelope::(context, envelope, evp, start_timestamp).await + <$h>::handle_envelope::(context, event, start_timestamp).await ), )* _ => None, diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index c64365cc..d31a5d86 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc}; use serde_derive::{Deserialize, Serialize}; use std::ops::Bound; use svc_agent::mqtt::{ - IncomingRequestProperties, IntoPublishableDump, OutgoingRequest, ResponseStatus, + IncomingRequestProperties, IntoPublishableMessage, OutgoingRequest, ResponseStatus, ShortTermTimingProperties, }; use svc_agent::{Addressable, AgentId}; @@ -339,7 +339,7 @@ impl RequestHandler for EnterHandler { // Then we won't need the local state on the broker at all and will be able // to send a multicast request to the broker. let outgoing_request = OutgoingRequest::unicast(payload, props, reqp, MQTT_GW_API_VERSION); - let boxed_request = Box::new(outgoing_request) as Box; + let boxed_request = Box::new(outgoing_request) as Box; Ok(Box::new(stream::once(boxed_request))) } } @@ -413,7 +413,7 @@ impl RequestHandler for LeaveHandler { // Then we won't need the local state on the broker at all and will be able // to send a multicast request to the broker. let outgoing_request = OutgoingRequest::unicast(payload, props, reqp, MQTT_GW_API_VERSION); - let boxed_request = Box::new(outgoing_request) as Box; + let boxed_request = Box::new(outgoing_request) as Box; Ok(Box::new(stream::once(boxed_request))) } } diff --git a/src/app/endpoint/rtc.rs b/src/app/endpoint/rtc.rs index 966a7ddd..7da51779 100644 --- a/src/app/endpoint/rtc.rs +++ b/src/app/endpoint/rtc.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde_derive::{Deserialize, Serialize}; use svc_agent::mqtt::{ - IncomingRequestProperties, IntoPublishableDump, OutgoingResponse, ResponseStatus, + IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus, }; use uuid::Uuid; @@ -312,7 +312,7 @@ impl RequestHandler for ConnectHandler { match janus_request_result { Ok(req) => { - let boxed_request = Box::new(req) as Box; + let boxed_request = Box::new(req) as Box; Ok(Box::new(stream::once(boxed_request))) } Err(err) => Err(format!("error creating a backend request: {}", err)) diff --git a/src/app/endpoint/rtc_signal.rs b/src/app/endpoint/rtc_signal.rs index f4d49ae1..53f917ef 100644 --- a/src/app/endpoint/rtc_signal.rs +++ b/src/app/endpoint/rtc_signal.rs @@ -7,7 +7,7 @@ use chrono::{DateTime, Duration, Utc}; use serde_derive::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use svc_agent::mqtt::{ - IncomingRequestProperties, IntoPublishableDump, OutgoingResponse, ResponseStatus, + IncomingRequestProperties, IntoPublishableMessage, OutgoingResponse, ResponseStatus, }; use svc_agent::Addressable; use svc_error::Error as SvcError; @@ -85,7 +85,7 @@ impl RequestHandler for CreateHandler { start_timestamp, authz_time, ) - .map(|req| Box::new(req) as Box) + .map(|req| Box::new(req) as Box) .map_err(|err| format!("error creating a backend request: {}", err)) .status(ResponseStatus::UNPROCESSABLE_ENTITY)? } else { @@ -124,7 +124,7 @@ impl RequestHandler for CreateHandler { start_timestamp, authz_time, ) - .map(|req| Box::new(req) as Box) + .map(|req| Box::new(req) as Box) .map_err(|err| format!("error creating a backend request: {}", err)) .status(ResponseStatus::UNPROCESSABLE_ENTITY)? } @@ -145,7 +145,7 @@ impl RequestHandler for CreateHandler { start_timestamp, authz_time, ) - .map(|req| Box::new(req) as Box) + .map(|req| Box::new(req) as Box) .map_err(|err| format!("error creating a backend request: {}", err)) .status(ResponseStatus::UNPROCESSABLE_ENTITY)? } diff --git a/src/app/endpoint/rtc_stream.rs b/src/app/endpoint/rtc_stream.rs index b080e837..85b1ffb6 100644 --- a/src/app/endpoint/rtc_stream.rs +++ b/src/app/endpoint/rtc_stream.rs @@ -5,8 +5,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde_derive::Deserialize; use svc_agent::mqtt::{ - IncomingRequestProperties, OutgoingEvent, OutgoingEventProperties, ResponseStatus, - ShortTermTimingProperties, TrackingProperties, + IncomingRequestProperties, OutgoingEvent, OutgoingEventProperties, OutgoingMessage, + ResponseStatus, ShortTermTimingProperties, TrackingProperties, }; use svc_error::Error as SvcError; use uuid::Uuid; @@ -104,7 +104,7 @@ impl RequestHandler for ListHandler { //////////////////////////////////////////////////////////////////////////////// -pub(crate) type ObjectUpdateEvent = OutgoingEvent; +pub(crate) type ObjectUpdateEvent = OutgoingMessage; pub(crate) fn update_event( room_id: Uuid, diff --git a/src/app/endpoint/shared.rs b/src/app/endpoint/shared.rs index a9a12264..9d061dda 100644 --- a/src/app/endpoint/shared.rs +++ b/src/app/endpoint/shared.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Duration, Utc}; use serde::Serialize; use svc_agent::mqtt::{ - IncomingRequestProperties, IntoPublishableDump, OutgoingEvent, OutgoingEventProperties, + IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties, OutgoingResponse, ResponseStatus, ShortTermTimingProperties, }; @@ -15,7 +15,7 @@ pub(crate) fn build_response( reqp: &IncomingRequestProperties, start_timestamp: DateTime, maybe_authz_time: Option, -) -> Box { +) -> Box { let mut timing = ShortTermTimingProperties::until_now(start_timestamp); if let Some(authz_time) = maybe_authz_time { @@ -32,7 +32,7 @@ pub(crate) fn build_notification( payload: impl Serialize + Send + 'static, reqp: &IncomingRequestProperties, start_timestamp: DateTime, -) -> Box { +) -> Box { let timing = ShortTermTimingProperties::until_now(start_timestamp); let mut props = OutgoingEventProperties::new(label, timing); props.set_tracking(reqp.tracking().to_owned()); diff --git a/src/app/endpoint/subscription.rs b/src/app/endpoint/subscription.rs index fd00d9e1..dc84873b 100644 --- a/src/app/endpoint/subscription.rs +++ b/src/app/endpoint/subscription.rs @@ -6,7 +6,7 @@ use chrono::{DateTime, Utc}; use serde_derive::{Deserialize, Serialize}; use svc_agent::{ mqtt::{ - IncomingEventProperties, IntoPublishableDump, OutgoingEvent, ResponseStatus, + IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ResponseStatus, ShortTermTimingProperties, }, AgentId, Authenticable, @@ -101,7 +101,7 @@ impl EventHandler for CreateHandler { let props = evp.to_event("room.enter", short_term_timing); let to_uri = format!("rooms/{}/events", room_id); let outgoing_event = OutgoingEvent::broadcast(outgoing_event_payload, props, &to_uri); - let boxed_event = Box::new(outgoing_event) as Box; + let boxed_event = Box::new(outgoing_event) as Box; Ok(Box::new(stream::once(boxed_event))) } } @@ -145,7 +145,7 @@ impl EventHandler for DeleteHandler { let props = evp.to_event("room.leave", short_term_timing); let to_uri = format!("rooms/{}/events", room_id); let outgoing_event = OutgoingEvent::broadcast(outgoing_event_payload, props, &to_uri); - let boxed_event = Box::new(outgoing_event) as Box; + let boxed_event = Box::new(outgoing_event) as Box; let mut messages = vec![boxed_event]; // `agent.leave` requests to Janus instances that host active streams in this room. diff --git a/src/app/endpoint/system.rs b/src/app/endpoint/system.rs index eb305118..a4210295 100644 --- a/src/app/endpoint/system.rs +++ b/src/app/endpoint/system.rs @@ -5,8 +5,8 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde_derive::{Deserialize, Serialize}; use svc_agent::mqtt::{ - IncomingRequestProperties, IntoPublishableDump, OutgoingEvent, OutgoingEventProperties, - ResponseStatus, ShortTermTimingProperties, TrackingProperties, + IncomingRequestProperties, IntoPublishableMessage, OutgoingEvent, OutgoingEventProperties, + OutgoingMessage, ResponseStatus, ShortTermTimingProperties, TrackingProperties, }; use svc_authn::Authenticable; use uuid::Uuid; @@ -47,7 +47,7 @@ struct RtcUploadEventData { uri: Option, } -pub(crate) type RoomUploadEvent = OutgoingEvent; +pub(crate) type RoomUploadEvent = OutgoingMessage; //////////////////////////////////////////////////////////////////////////////// @@ -107,7 +107,7 @@ impl RequestHandler for VacuumHandler { .map_err(|err| format!("error creating a backend request: {}", err)) .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; - requests.push(Box::new(backreq) as Box); + requests.push(Box::new(backreq) as Box); } } diff --git a/src/app/janus.rs b/src/app/janus.rs index f578c67c..811ceaf8 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -8,10 +8,10 @@ use serde_derive::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use std::ops::Bound; use svc_agent::mqtt::{ - compat::{into_event, into_response, IncomingEnvelope}, - IncomingEventProperties, IncomingRequestProperties, IncomingResponseProperties, - IntoPublishableDump, OutgoingRequest, OutgoingRequestProperties, OutgoingResponse, - ResponseStatus, ShortTermTimingProperties, SubscriptionTopic, TrackingProperties, + IncomingEvent as MQTTIncomingEvent, IncomingEventProperties, IncomingRequestProperties, + IncomingResponse as MQTTIncomingResponse, IncomingResponseProperties, IntoPublishableMessage, + OutgoingMessage, OutgoingRequest, OutgoingRequestProperties, OutgoingResponse, ResponseStatus, + ShortTermTimingProperties, SubscriptionTopic, TrackingProperties, }; use svc_agent::{Addressable, AgentId, Subscription}; use svc_error::{extension::sentry, Error as SvcError}; @@ -62,7 +62,7 @@ pub(crate) fn create_session_request( evp: &IncomingEventProperties, me: &M, start_timestamp: DateTime, -) -> Result> +) -> Result> where M: Addressable, { @@ -104,7 +104,7 @@ pub(crate) fn create_handle_request( session_id: i64, me: &M, start_timestamp: DateTime, -) -> Result> +) -> Result> where M: Addressable, { @@ -169,7 +169,7 @@ pub(crate) fn create_rtc_handle_request( me: &M, start_timestamp: DateTime, authz_time: Duration, -) -> Result> +) -> Result> where A: Addressable, M: Addressable, @@ -246,7 +246,7 @@ pub(crate) fn create_stream_request( me: &M, start_timestamp: DateTime, authz_time: Duration, -) -> Result> +) -> Result> where A: Addressable, M: Addressable, @@ -321,7 +321,7 @@ pub(crate) fn read_stream_request( me: &M, start_timestamp: DateTime, authz_time: Duration, -) -> Result> +) -> Result> where A: Addressable, M: Addressable, @@ -400,7 +400,7 @@ pub(crate) fn upload_stream_request( to: &A, me: &M, start_timestamp: DateTime, -) -> Result> +) -> Result> where A: Addressable, M: Addressable, @@ -454,7 +454,7 @@ pub(crate) fn trickle_request( me: &M, start_timestamp: DateTime, authz_time: Duration, -) -> Result> +) -> Result> where A: Addressable, M: Addressable, @@ -515,7 +515,7 @@ pub(crate) fn agent_leave_request( to: &T, me: &M, tracking: &TrackingProperties, -) -> Result> +) -> Result> where T: Addressable, M: Addressable, @@ -562,11 +562,10 @@ where pub(crate) async fn handle_response( context: &C, - envelope: IncomingEnvelope, - respp: &IncomingResponseProperties, + resp: &MQTTIncomingResponse, start_timestamp: DateTime, ) -> MessageStream { - handle_response_impl(context, envelope, respp, start_timestamp) + handle_response_impl(context, resp, start_timestamp) .await .unwrap_or_else(|err| { error!("Failed to handle a response from janus: {}", err); @@ -577,15 +576,16 @@ pub(crate) async fn handle_response( async fn handle_response_impl( context: &C, - envelope: IncomingEnvelope, - respp: &IncomingResponseProperties, + resp: &MQTTIncomingResponse, start_timestamp: DateTime, ) -> Result { - let message = into_response::(envelope) + let payload = MQTTIncomingResponse::convert_payload::(&resp) .map_err(|err| format!("failed to parse response: {}", err)) .status(ResponseStatus::BAD_REQUEST)?; - match message.payload() { + let respp = resp.properties(); + + match payload { IncomingResponse::Success(ref inresp) => { let txn = from_base64::(&inresp.transaction()) .map_err(|err| format!("failed to parse transaction: {}", err)) @@ -604,7 +604,7 @@ async fn handle_response_impl( .map_err(|err| err.to_string()) .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; - let boxed_backreq = Box::new(backreq) as Box; + let boxed_backreq = Box::new(backreq) as Box; Ok(Box::new(stream::once(boxed_backreq))) } // Handle has been created @@ -640,7 +640,7 @@ async fn handle_response_impl( JANUS_API_VERSION, ); - let boxed_resp = Box::new(resp) as Box; + let boxed_resp = Box::new(resp) as Box; Ok(Box::new(stream::once(boxed_resp))) } // An unsupported incoming Success message has been received @@ -667,7 +667,7 @@ async fn handle_response_impl( JANUS_API_VERSION, ); - let boxed_resp = Box::new(resp) as Box; + let boxed_resp = Box::new(resp) as Box; Ok(Box::new(stream::once(boxed_resp))) } // An unsupported incoming Ack message has been received @@ -726,7 +726,7 @@ async fn handle_response_impl( JANUS_API_VERSION, ); - let boxed_resp = Box::new(resp) as Box; + let boxed_resp = Box::new(resp) as Box; Ok(Box::new(stream::once(boxed_resp)) as MessageStream) }) .or_else(|err| { @@ -783,7 +783,7 @@ async fn handle_response_impl( JANUS_API_VERSION, ); - let boxed_resp = Box::new(resp) as Box; + let boxed_resp = Box::new(resp) as Box; Ok(Box::new(stream::once(boxed_resp)) as MessageStream) }) .or_else(|err| { @@ -961,7 +961,7 @@ async fn handle_response_impl( .map_err(|e| format!("error creating a system event, {}", e)) .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; - let event_box = Box::new(event) as Box; + let event_box = Box::new(event) as Box; Ok(Box::new(stream::once(event_box)) as MessageStream) } None => { @@ -1016,22 +1016,21 @@ fn handle_response_error( let timing = ShortTermTimingProperties::until_now(start_timestamp); let resp = OutgoingResponse::unicast(err, reqp.to_response(status, timing), reqp, API_VERSION); - let boxed_resp = Box::new(resp) as Box; + let boxed_resp = Box::new(resp) as Box; Box::new(stream::once(boxed_resp)) } pub(crate) async fn handle_event( context: &C, - envelope: IncomingEnvelope, - evp: &IncomingEventProperties, + event: &MQTTIncomingEvent, start_timestamp: DateTime, ) -> MessageStream { - handle_event_impl(context, envelope, evp, start_timestamp) + handle_event_impl(context, event, start_timestamp) .await .unwrap_or_else(|err| { error!( "Failed to handle an event from janus, label = '{}': {}", - evp.label().unwrap_or("none"), + event.properties().label().unwrap_or("none"), err, ); @@ -1042,13 +1041,15 @@ pub(crate) async fn handle_event( async fn handle_event_impl( context: &C, - envelope: IncomingEnvelope, - evp: &IncomingEventProperties, + event: &MQTTIncomingEvent, start_timestamp: DateTime, ) -> Result { - let message = into_event::(envelope)?; + let payload = MQTTIncomingEvent::convert_payload::(&event) + .map_err(|err| format!("failed to parse event: {}", err)) + .status(ResponseStatus::BAD_REQUEST)?; - match message.payload() { + let evp = event.properties(); + match payload { IncomingEvent::WebRtcUp(ref inev) => { let rtc_stream_id = Uuid::from_str(inev.opaque_id()) .map_err(|err| format!("Failed to parse opaque id as uuid: {}", err)) @@ -1077,7 +1078,7 @@ async fn handle_event_impl( )?; Ok(Box::new(stream::once( - Box::new(event) as Box + Box::new(event) as Box ))) } else { Ok(Box::new(stream::empty())) @@ -1113,7 +1114,7 @@ async fn handle_event_impl( evp.tracking(), )?; - let boxed_event = Box::new(event) as Box; + let boxed_event = Box::new(event) as Box; return Ok(Box::new(stream::once(boxed_event))); } } @@ -1132,16 +1133,15 @@ async fn handle_event_impl( pub(crate) async fn handle_status_event( context: &C, - envelope: IncomingEnvelope, - evp: &IncomingEventProperties, + event: &MQTTIncomingEvent, start_timestamp: DateTime, ) -> MessageStream { - handle_status_event_impl(context, envelope, evp, start_timestamp) + handle_status_event_impl(context, event, start_timestamp) .await .unwrap_or_else(|err| { error!( "Failed to handle a status event from janus, label = '{}': {}", - evp.label().unwrap_or("none"), + event.properties().label().unwrap_or("none"), err, ); @@ -1152,18 +1152,21 @@ pub(crate) async fn handle_status_event( async fn handle_status_event_impl( context: &C, - envelope: IncomingEnvelope, - evp: &IncomingEventProperties, + event: &MQTTIncomingEvent, start_timestamp: DateTime, ) -> Result { - let inev = into_event::(envelope)?; + let evp = event.properties(); + + let payload = MQTTIncomingEvent::convert_payload::(&event) + .map_err(|err| format!("failed to parse event: {}", err)) + .status(ResponseStatus::BAD_REQUEST)?; - if inev.payload().online() { + if payload.online() { let event = create_session_request(evp, context.agent_id(), start_timestamp) .map_err(|err| err.to_string()) .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; - let boxed_event = Box::new(event) as Box; + let boxed_event = Box::new(event) as Box; Ok(Box::new(stream::once(boxed_event))) } else { let conn = context.db().get()?; diff --git a/src/app/message_handler.rs b/src/app/message_handler.rs index 789d22a7..caa91a2f 100644 --- a/src/app/message_handler.rs +++ b/src/app/message_handler.rs @@ -5,17 +5,18 @@ use async_std::prelude::*; use async_std::stream::{self, Stream}; use chrono::{DateTime, Utc}; use futures_util::pin_mut; -use log::{error, info, warn}; +use log::{error, warn}; use svc_agent::mqtt::{ - compat, Agent, IncomingEventProperties, IncomingRequestProperties, IncomingResponseProperties, - IntoPublishableDump, OutgoingResponse, ResponseStatus, ShortTermTimingProperties, + Agent, IncomingEvent, IncomingMessage, IncomingRequest, IncomingRequestProperties, + IncomingResponse, IntoPublishableMessage, OutgoingResponse, ResponseStatus, + ShortTermTimingProperties, }; use svc_error::{extension::sentry, Error as SvcError}; use crate::app::{context::Context, endpoint, API_VERSION}; pub(crate) type MessageStream = - Box> + Send + Unpin>; + Box> + Send + Unpin>; pub(crate) struct MessageHandler { agent: Agent, @@ -35,73 +36,68 @@ impl MessageHandler { &self.context } - pub(crate) async fn handle(&self, message_bytes: &[u8], topic: &str) { - info!( - "Incoming message = '{}'", - String::from_utf8_lossy(message_bytes) - ); - - if let Err(err) = self.handle_message(message_bytes, topic).await { - error!( - "Error processing a message = '{}': {}", - String::from_utf8_lossy(message_bytes), - err, - ); - - let svc_error = SvcError::builder() - .status(ResponseStatus::UNPROCESSABLE_ENTITY) - .kind("message_handler", "Message handling error") - .detail(&err.to_string()) - .build(); - - sentry::send(svc_error) - .unwrap_or_else(|err| warn!("Error sending error to Sentry: {}", err)); + pub(crate) async fn handle( + &self, + message: &Result, String>, + topic: &str, + ) { + match message { + Ok(ref msg) => { + if let Err(err) = self.handle_message(msg, topic).await { + self.report_error(message, &err.to_string()).await; + } + } + Err(e) => { + self.report_error(message, e).await; + } } } - async fn handle_message(&self, message_bytes: &[u8], topic: &str) -> Result<(), SvcError> { - let start_timestamp = Utc::now(); + async fn report_error(&self, message: &Result, String>, err: &str) { + error!("Error processing a message = '{:?}': {}", message, err); - let envelope = serde_json::from_slice::(message_bytes) - .map_err(|err| format!("Failed to parse incoming envelope: {}", err)) - .status(ResponseStatus::BAD_REQUEST)?; + let svc_error = SvcError::builder() + .status(ResponseStatus::UNPROCESSABLE_ENTITY) + .kind("message_handler", "Message handling error") + .detail(&err.to_string()) + .build(); - match envelope.properties() { - compat::IncomingEnvelopeProperties::Request(ref reqp) => { - let reqp = reqp.to_owned(); - self.handle_request(envelope, reqp, topic, start_timestamp) - .await - } - compat::IncomingEnvelopeProperties::Response(ref respp) => { - let respp = respp.to_owned(); - self.handle_response(envelope, respp, topic, start_timestamp) - .await - } - compat::IncomingEnvelopeProperties::Event(ref evp) => { - let evp = evp.to_owned(); - self.handle_event(envelope, evp, topic, start_timestamp) - .await + sentry::send(svc_error) + .unwrap_or_else(|err| warn!("Error sending error to Sentry: {}", err)); + } + + async fn handle_message( + &self, + message: &IncomingMessage, + topic: &str, + ) -> Result<(), SvcError> { + let start_timestamp = Utc::now(); + + match message { + IncomingMessage::Request(req) => self.handle_request(req, topic, start_timestamp).await, + IncomingMessage::Response(resp) => { + self.handle_response(resp, topic, start_timestamp).await } + IncomingMessage::Event(event) => self.handle_event(event, topic, start_timestamp).await, } } async fn handle_request( &self, - envelope: compat::IncomingEnvelope, - reqp: IncomingRequestProperties, + req: &IncomingRequest, topic: &str, start_timestamp: DateTime, ) -> Result<(), SvcError> { let outgoing_message_stream = - endpoint::route_request(&self.context, envelope, &reqp, topic, start_timestamp) + endpoint::route_request(&self.context, req, topic, start_timestamp) .await .unwrap_or_else(|| { error_response( ResponseStatus::METHOD_NOT_ALLOWED, "about:blank", "Unknown method", - &format!("Unknown method '{}'", reqp.method()), - &reqp, + &format!("Unknown method '{}'", req.properties().method()), + req.properties(), start_timestamp, ) }); @@ -112,13 +108,12 @@ impl MessageHandler { async fn handle_response( &self, - envelope: compat::IncomingEnvelope, - respp: IncomingResponseProperties, + envelope: &IncomingResponse, topic: &str, start_timestamp: DateTime, ) -> Result<(), SvcError> { let outgoing_message_stream = - endpoint::route_response(&self.context, envelope, &respp, topic, start_timestamp) + endpoint::route_response(&self.context, envelope, topic, start_timestamp) .await .unwrap_or_else(|| { warn!("Unhandled response"); @@ -131,16 +126,15 @@ impl MessageHandler { async fn handle_event( &self, - envelope: compat::IncomingEnvelope, - evp: IncomingEventProperties, + event: &IncomingEvent, topic: &str, start_timestamp: DateTime, ) -> Result<(), SvcError> { let outgoing_message_stream = - endpoint::route_event(&self.context, envelope, &evp, topic, start_timestamp) + endpoint::route_event(&self.context, event, topic, start_timestamp) .await .unwrap_or_else(|| { - let label = evp.label().unwrap_or("none"); + let label = event.properties().label().unwrap_or("none"); warn!("Unexpected event with label = '{}'", label); Box::new(stream::empty()) }); @@ -181,27 +175,16 @@ fn error_response( let timing = ShortTermTimingProperties::until_now(start_timestamp); let props = reqp.to_response(status, timing); let resp = OutgoingResponse::unicast(err, props, reqp, API_VERSION); - let boxed_resp = Box::new(resp) as Box; + let boxed_resp = Box::new(resp) as Box; Box::new(stream::once(boxed_resp)) } pub(crate) fn publish_message( agent: &mut Agent, - message: Box, + message: Box, ) -> Result<(), SvcError> { - let dump = message - .into_dump(agent.address()) - .map_err(|err| format!("Failed to dump message: {}", err)) - .status(ResponseStatus::UNPROCESSABLE_ENTITY)?; - - info!( - "Outgoing message = '{}' sending to the topic = '{}'", - dump.payload(), - dump.topic(), - ); - agent - .publish_dump(dump) + .publish_publishable(message) .map_err(|err| format!("Failed to publish message: {}", err)) .status(ResponseStatus::UNPROCESSABLE_ENTITY) } @@ -216,8 +199,7 @@ pub(crate) fn publish_message( pub(crate) trait RequestEnvelopeHandler<'async_trait> { fn handle_envelope( context: &'async_trait C, - envelope: compat::IncomingEnvelope, - reqp: &'async_trait IncomingRequestProperties, + req: &'async_trait IncomingRequest, start_timestamp: DateTime, ) -> Pin + Send + 'async_trait>>; } @@ -230,8 +212,7 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> { fn handle_envelope( context: &'async_trait C, - envelope: compat::IncomingEnvelope, - reqp: &'async_trait IncomingRequestProperties, + req: &'async_trait IncomingRequest, start_timestamp: DateTime, ) -> Pin + Send + 'async_trait>> where @@ -240,12 +221,13 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> // The actual implementation. async fn handle_envelope( context: &C, - envelope: compat::IncomingEnvelope, - reqp: &IncomingRequestProperties, + req: &IncomingRequest, start_timestamp: DateTime, ) -> MessageStream { // Parse the envelope with the payload type specified in the handler. - match envelope.payload::() { + let payload = IncomingRequest::convert_payload::(req); + let reqp = req.properties(); + match payload { // Call handler. Ok(payload) => H::handle(context, payload, reqp, start_timestamp) .await @@ -277,20 +259,14 @@ impl<'async_trait, H: 'async_trait + Sync + endpoint::RequestHandler> } } - Box::pin(handle_envelope::( - context, - envelope, - reqp, - start_timestamp, - )) + Box::pin(handle_envelope::(context, req, start_timestamp)) } } pub(crate) trait ResponseEnvelopeHandler<'async_trait> { fn handle_envelope( context: &'async_trait C, - envelope: compat::IncomingEnvelope, - respp: &'async_trait IncomingResponseProperties, + resp: &'async_trait IncomingResponse, start_timestamp: DateTime, ) -> Pin + Send + 'async_trait>>; } @@ -301,21 +277,21 @@ impl<'async_trait, H: 'async_trait + endpoint::ResponseHandler> { fn handle_envelope( context: &'async_trait C, - envelope: compat::IncomingEnvelope, - respp: &'async_trait IncomingResponseProperties, + resp: &'async_trait IncomingResponse, start_timestamp: DateTime, ) -> Pin + Send + 'async_trait>> { // The actual implementation. async fn handle_envelope( context: &C, - envelope: compat::IncomingEnvelope, - respp: &IncomingResponseProperties, + resp: &IncomingResponse, start_timestamp: DateTime, ) -> MessageStream { // Parse response envelope with the payload from the handler. - match envelope.payload::() { + let payload = IncomingResponse::convert_payload::(resp); + let resp = resp.properties(); + match payload { // Call handler. - Ok(payload) => H::handle(context, payload, respp, start_timestamp) + Ok(payload) => H::handle(context, payload, resp, start_timestamp) .await .unwrap_or_else(|mut svc_error| { // Handler returned an error. @@ -335,20 +311,14 @@ impl<'async_trait, H: 'async_trait + endpoint::ResponseHandler> } } - Box::pin(handle_envelope::( - context, - envelope, - respp, - start_timestamp, - )) + Box::pin(handle_envelope::(context, resp, start_timestamp)) } } pub(crate) trait EventEnvelopeHandler<'async_trait> { fn handle_envelope( context: &'async_trait C, - envelope: compat::IncomingEnvelope, - evp: &'async_trait IncomingEventProperties, + event: &'async_trait IncomingEvent, start_timestamp: DateTime, ) -> Pin + Send + 'async_trait>>; } @@ -359,19 +329,19 @@ impl<'async_trait, H: 'async_trait + endpoint::EventHandler> EventEnvelopeHandle { fn handle_envelope( context: &'async_trait C, - envelope: compat::IncomingEnvelope, - evp: &'async_trait IncomingEventProperties, + event: &'async_trait IncomingEvent, start_timestamp: DateTime, ) -> Pin + Send + 'async_trait>> { // The actual implementation. async fn handle_envelope( context: &C, - envelope: compat::IncomingEnvelope, - evp: &IncomingEventProperties, + event: &IncomingEvent, start_timestamp: DateTime, ) -> MessageStream { + let payload = IncomingEvent::convert_payload::(event); + let evp = event.properties(); // Parse event envelope with the payload from the handler. - match envelope.payload::() { + match payload { // Call handler. Ok(payload) => H::handle(context, payload, evp, start_timestamp) .await @@ -404,12 +374,7 @@ impl<'async_trait, H: 'async_trait + endpoint::EventHandler> EventEnvelopeHandle } } - Box::pin(handle_envelope::( - context, - envelope, - evp, - start_timestamp, - )) + Box::pin(handle_envelope::(context, event, start_timestamp)) } } diff --git a/src/app/mod.rs b/src/app/mod.rs index 0ccef37f..63caf1ab 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -9,7 +9,7 @@ use log::{error, info, warn}; use serde_json::json; use svc_agent::{ mqtt::{ - Agent, AgentBuilder, ConnectionMode, IntoPublishableDump, Notification, OutgoingRequest, + Agent, AgentBuilder, AgentNotification, ConnectionMode, OutgoingRequest, OutgoingRequestProperties, QoS, ShortTermTimingProperties, SubscriptionTopic, }, AccountId, AgentId, Authenticable, SharedGroup, Subscription, @@ -53,7 +53,7 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option) -> .context("Failed to create an agent")?; // Event loop for incoming messages of MQTT Agent - let (mq_tx, mut mq_rx) = futures_channel::mpsc::unbounded::(); + let (mq_tx, mut mq_rx) = futures_channel::mpsc::unbounded::(); thread::spawn(move || { for message in rx { @@ -87,15 +87,13 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option) -> task::spawn(async move { match message { - svc_agent::mqtt::Notification::Publish(message) => { - message_handler - .handle(&message.payload, &message.topic_name) - .await + AgentNotification::Message(message, metadata) => { + message_handler.handle(&message, &metadata.topic).await } - svc_agent::mqtt::Notification::Disconnection => { + AgentNotification::Disconnection => { error!("Disconnected from broker"); } - svc_agent::mqtt::Notification::Reconnection => { + AgentNotification::Reconnection => { error!("Reconnected to broker"); resubscribe( @@ -191,21 +189,8 @@ fn subscribe_to_kruonis(kruonis_id: &AccountId, agent: &mut Agent) -> Result<()> let props = OutgoingRequestProperties::new("kruonis.subscribe", &topic, "", timing); let event = OutgoingRequest::multicast(json!({}), props, kruonis_id); - let message = Box::new(event) as Box; - let dump = message - .into_dump(agent.address()) - .context("Failed to dump message")?; - - info!( - "Outgoing message = '{}' sending to the topic = '{}'", - dump.payload(), - dump.topic(), - ); - - agent - .publish_dump(dump) - .context("Failed to publish message")?; + agent.publish(event).context("Failed to publish message")?; Ok(()) }