diff --git a/Cargo.lock b/Cargo.lock index a7d4a908..44d94419 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1948,7 +1948,7 @@ dependencies = [ [[package]] name = "svc-agent" version = "0.9.6" -source = "git+https://github.com/netology-group/svc-agent-rs#991f2b484ea0082a5fee770f3ebd0dd4ddc870ef" +source = "git+https://github.com/netology-group/svc-agent-rs#6feb43b12c8d26d62f7de4978b468483d7807a5f" dependencies = [ "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/app/endpoint/message.rs b/src/app/endpoint/message.rs index 9662ac09..c876f408 100644 --- a/src/app/endpoint/message.rs +++ b/src/app/endpoint/message.rs @@ -89,8 +89,8 @@ impl State { let to = &inreq.payload().agent_id; let payload = &inreq.payload().data; - let response_topic = Subscription::multicast_requests_from(to) - .subscription_topic(&self.me) + let response_topic = Subscription::multicast_requests_from(to, Some("v1")) + .subscription_topic(&self.me, "v2") .map_err(|_| { SvcError::builder() .status(ResponseStatus::UNPROCESSABLE_ENTITY) @@ -112,7 +112,7 @@ impl State { ShortTermTimingProperties::until_now(start_timestamp), ); - OutgoingRequest::unicast(payload.to_owned(), props, to).into() + OutgoingRequest::unicast(payload.to_owned(), props, to, "v1").into() } pub(crate) async fn callback( @@ -139,8 +139,13 @@ impl State { inresp.properties().tracking().clone(), ); - let payload = inresp.payload(); - let message = OutgoingResponse::unicast(payload.to_owned(), props, &reqp); + let message = OutgoingResponse::unicast( + inresp.payload().to_owned(), + props, + &reqp, + reqp.to_connection().version(), + ); + Ok(vec![Box::new(message) as Box]) } } @@ -221,7 +226,10 @@ mod test { let message = result.remove(0); match message.destination() { - &Destination::Unicast(ref agent_id) => assert_eq!(agent_id, receiver.agent_id()), + &Destination::Unicast(ref agent_id, ref version) => { + assert_eq!(agent_id, receiver.agent_id()); + assert_eq!(version, "v1"); + } _ => panic!("Expected unicast destination"), } diff --git a/src/app/endpoint/mod.rs b/src/app/endpoint/mod.rs index b634d79f..8f5d8d54 100644 --- a/src/app/endpoint/mod.rs +++ b/src/app/endpoint/mod.rs @@ -120,7 +120,14 @@ where .build(); let timing = ShortTermTimingProperties::until_now(start_timestamp); - let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props); + + let resp = OutgoingResponse::unicast( + err, + props.to_response(status, timing), + props, + props.to_connection().version(), + ); + Ok(vec![Box::new(resp) as Box]) } } @@ -145,7 +152,14 @@ pub(crate) fn handle_error( } let timing = ShortTermTimingProperties::until_now(start_timestamp); - let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props); + + let resp = OutgoingResponse::unicast( + err, + props.to_response(status, timing), + props, + props.to_connection().version(), + ); + Ok(vec![Box::new(resp) as Box]) } @@ -163,7 +177,7 @@ pub(crate) fn handle_unknown_method( .build(); let timing = ShortTermTimingProperties::until_now(start_timestamp); - let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props); + let resp = OutgoingResponse::unicast(err, props.to_response(status, timing), props, "v1"); Ok(vec![Box::new(resp) as Box]) } diff --git a/src/app/endpoint/room.rs b/src/app/endpoint/room.rs index 35b3aa10..c1204740 100644 --- a/src/app/endpoint/room.rs +++ b/src/app/endpoint/room.rs @@ -311,7 +311,7 @@ impl State { // https://github.com/vernemq/vernemq/issues/1326. // Then we won't need the local state on the broker at all and will be able // to send a multicast request to the broker. - OutgoingRequest::unicast(payload, props, inreq.properties()).into() + OutgoingRequest::unicast(payload, props, inreq.properties(), "v1").into() } pub(crate) async fn leave( @@ -359,7 +359,7 @@ impl State { // https://github.com/vernemq/vernemq/issues/1326. // Then we won't need the local state on the broker at all and will be able // to send a multicast request to the broker. - OutgoingRequest::unicast(payload, props, inreq.properties()).into() + OutgoingRequest::unicast(payload, props, inreq.properties(), "v1").into() } } @@ -371,7 +371,7 @@ mod test { use diesel::prelude::*; use failure::format_err; use serde_json::{json, Value as JsonValue}; - use svc_agent::Destination; + use svc_agent::{AccountId, AgentId, Destination}; use svc_authn::Authenticable; use crate::test_helpers::{ @@ -857,10 +857,13 @@ mod test { // Assert outgoing broker request. match message.destination() { - &Destination::Unicast(ref agent_id) => { - assert_eq!(agent_id.label(), "web"); - assert_eq!(agent_id.as_account_id().label(), "user123"); - assert_eq!(agent_id.as_account_id().audience(), AUDIENCE); + &Destination::Unicast(ref agent_id, ref version) => { + assert_eq!( + agent_id.to_owned(), + AgentId::new("web", AccountId::new("user123", AUDIENCE)) + ); + + assert_eq!(version, "v1"); } _ => panic!("Expected unicast destination"), } @@ -1038,10 +1041,13 @@ mod test { // Assert outgoing broker request. match message.destination() { - &Destination::Unicast(ref agent_id) => { - assert_eq!(agent_id.label(), "web"); - assert_eq!(agent_id.as_account_id().label(), "user123"); - assert_eq!(agent_id.as_account_id().audience(), AUDIENCE); + &Destination::Unicast(ref agent_id, ref version) => { + assert_eq!( + agent_id.to_owned(), + AgentId::new("web", AccountId::new("user123", AUDIENCE)) + ); + + assert_eq!(version, "v1"); } _ => panic!("Expected unicast destination"), } diff --git a/src/app/janus.rs b/src/app/janus.rs index 30fbd941..9a23686b 100644 --- a/src/app/janus.rs +++ b/src/app/janus.rs @@ -71,7 +71,7 @@ where ); props.set_tracking(tracking.to_owned()); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -113,7 +113,7 @@ where ); props.set_tracking(tracking.to_owned()); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -173,7 +173,7 @@ where Some(&rtc_handle_id.to_string()), ); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -241,7 +241,7 @@ where Some(jsep), ); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -309,7 +309,7 @@ where Some(jsep), ); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -374,7 +374,7 @@ pub(crate) fn upload_stream_request( ); props.set_tracking(tracking.to_owned()); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -408,7 +408,7 @@ where let props = reqp.to_request("janus_trickle.create", IGNORE, IGNORE, short_term_timing); let transaction = Transaction::Trickle(TrickleTransaction::new(reqp)); let payload = TrickleRequest::new(&to_base64(&transaction)?, session_id, handle_id, jsep); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -470,7 +470,7 @@ where None, ); - Ok(OutgoingRequest::unicast(payload, props, to)) + Ok(OutgoingRequest::unicast(payload, props, to, "v1")) } //////////////////////////////////////////////////////////////////////////////// @@ -541,6 +541,7 @@ pub(crate) async fn handle_response( ShortTermTimingProperties::until_now(start_timestamp), ), &reqp, + "v1", ); Ok(vec![Box::new(resp) as Box]) @@ -568,6 +569,7 @@ pub(crate) async fn handle_response( ShortTermTimingProperties::until_now(start_timestamp), ), tn.reqp.as_agent_id(), + "v1", ); Ok(vec![Box::new(resp) as Box]) @@ -627,6 +629,7 @@ pub(crate) async fn handle_response( endpoint::rtc_signal::CreateResponseData::new(Some(jsep.clone())), tn.reqp.to_response(ResponseStatus::OK, ShortTermTimingProperties::until_now(start_timestamp)), tn.reqp.as_agent_id(), + "v1", ); Ok(vec![Box::new(resp) as Box]) @@ -688,6 +691,7 @@ pub(crate) async fn handle_response( endpoint::rtc_signal::CreateResponseData::new(Some(jsep.clone())), tn.reqp.to_response(ResponseStatus::OK, ShortTermTimingProperties::until_now(start_timestamp)), tn.reqp.as_agent_id(), + "v1", ); Ok(vec![Box::new(resp) as Box]) diff --git a/src/app/mod.rs b/src/app/mod.rs index 1fb04efd..0a79ca54 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -101,26 +101,27 @@ pub(crate) async fn run(db: &ConnectionPool) -> Result<(), Error> { // Create Subscriptions let route = Arc::new(Route { janus_status_subscription_topic: { - let subscription = Subscription::broadcast_events(&config.backend_id, "status"); + let subscription = Subscription::broadcast_events(&config.backend_id, "v1", "status"); tx.subscribe(&subscription, QoS::AtLeastOnce, Some(&group)) .expect("Error subscribing to backend events topic"); subscription - .subscription_topic(&agent_id) + .subscription_topic(&agent_id, "v1") .expect("Error building janus events subscription topic") }, janus_responses_subscription_topic: { - let subscription = Subscription::broadcast_events(&config.backend_id, "responses"); + let subscription = + Subscription::broadcast_events(&config.backend_id, "v1", "responses"); tx.subscribe(&subscription, QoS::AtLeastOnce, Some(&group)) .expect("Error subscribing to backend responses topic"); subscription - .subscription_topic(&agent_id) + .subscription_topic(&agent_id, "v1") .expect("Error building janus responses subscription topic") }, }); tx.subscribe( - &Subscription::multicast_requests(), + &Subscription::multicast_requests(Some("v1")), QoS::AtMostOnce, Some(&group), )