diff --git a/examples/simple_rpc.rs b/examples/simple_rpc.rs index 54ffdc0b..09169c98 100644 --- a/examples/simple_rpc.rs +++ b/examples/simple_rpc.rs @@ -74,7 +74,7 @@ pub async fn main() -> Result<(), Box> { .await { Err(ServiceInvocationError::InvalidArgument(msg)) => { - println!("service returned expected error: {}", msg) + println!("service returned expected error: {msg}") } _ => panic!("expected service to return an Invalid Argument error"), } diff --git a/src/communication.rs b/src/communication.rs index e8f09bc0..ab26b169 100644 --- a/src/communication.rs +++ b/src/communication.rs @@ -88,7 +88,7 @@ impl Display for RegistrationError { "the underlying transport implementation does not support the push delivery method", ), RegistrationError::InvalidFilter(msg) => { - f.write_fmt(format_args!("invalid filter(s): {}", msg)) + f.write_fmt(format_args!("invalid filter(s): {msg}")) } RegistrationError::Unknown(status) => f.write_fmt(format_args!( "error un-/registering listener: {}", diff --git a/src/communication/default_pubsub.rs b/src/communication/default_pubsub.rs index 146bc1b1..84691675 100644 --- a/src/communication/default_pubsub.rs +++ b/src/communication/default_pubsub.rs @@ -221,8 +221,7 @@ impl Publisher for SimplePublisher { .await .map_err(PubSubError::PublishError), Err(e) => Err(PubSubError::InvalidArgument(format!( - "failed to create Publish message from parameters: {}", - e + "failed to create Publish message from parameters: {e}" ))), } } diff --git a/src/communication/notification.rs b/src/communication/notification.rs index 6faca745..38b42bed 100644 --- a/src/communication/notification.rs +++ b/src/communication/notification.rs @@ -36,7 +36,7 @@ impl Display for NotificationError { match self { NotificationError::InvalidArgument(s) => f.write_str(s.as_str()), NotificationError::NotifyError(s) => { - f.write_fmt(format_args!("failed to send notification: {}", s)) + f.write_fmt(format_args!("failed to send notification: {s}")) } } } diff --git a/src/communication/pubsub.rs b/src/communication/pubsub.rs index 86ab70ce..5a737e8c 100644 --- a/src/communication/pubsub.rs +++ b/src/communication/pubsub.rs @@ -37,7 +37,7 @@ impl Display for PubSubError { match self { PubSubError::InvalidArgument(s) => f.write_str(s.as_str()), PubSubError::PublishError(s) => { - f.write_fmt(format_args!("failed to publish message: {}", s)) + f.write_fmt(format_args!("failed to publish message: {s}")) } } } diff --git a/src/communication/usubscription_client.rs b/src/communication/usubscription_client.rs index b9765ae2..6b4d62d9 100644 --- a/src/communication/usubscription_client.rs +++ b/src/communication/usubscription_client.rs @@ -19,10 +19,11 @@ use crate::{ core::usubscription::{ usubscription_uri, FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest, FetchSubscriptionsResponse, NotificationsRequest, - NotificationsResponse, SubscriptionRequest, SubscriptionResponse, USubscription, - UnsubscribeRequest, UnsubscribeResponse, RESOURCE_ID_FETCH_SUBSCRIBERS, - RESOURCE_ID_FETCH_SUBSCRIPTIONS, RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, - RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE, + NotificationsResponse, ResetRequest, ResetResponse, SubscriptionRequest, + SubscriptionResponse, USubscription, UnsubscribeRequest, UnsubscribeResponse, + RESOURCE_ID_FETCH_SUBSCRIBERS, RESOURCE_ID_FETCH_SUBSCRIPTIONS, + RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_RESET, RESOURCE_ID_SUBSCRIBE, + RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE, }, UStatus, }; @@ -136,6 +137,17 @@ impl USubscription for RpcClientUSubscription { .await .map_err(UStatus::from) } + + async fn reset(&self, reset_request: ResetRequest) -> Result { + self.rpc_client + .invoke_proto_method::<_, ResetResponse>( + usubscription_uri(RESOURCE_ID_RESET), + Self::default_call_options(), + reset_request, + ) + .await + .map_err(UStatus::from) + } } #[cfg(test)] @@ -459,4 +471,50 @@ mod tests { .await .is_ok()); } + + #[tokio::test] + async fn test_reset_invokes_rpc_client() { + let request = ResetRequest::default(); + + let expected_request = request.clone(); + let mut rpc_client = MockRpcClient::new(); + let mut seq = Sequence::new(); + rpc_client + .expect_invoke_method() + .once() + .in_sequence(&mut seq) + .withf(|method, _options, payload| { + method == &usubscription_uri(RESOURCE_ID_RESET) && payload.is_some() + }) + .return_const(Err(crate::communication::ServiceInvocationError::Internal( + "internal error".to_string(), + ))); + rpc_client + .expect_invoke_method() + .once() + .in_sequence(&mut seq) + .withf(move |method, _options, payload| { + let request = payload + .to_owned() + .unwrap() + .extract_protobuf::() + .unwrap(); + + request == expected_request && method == &usubscription_uri(RESOURCE_ID_RESET) + }) + .returning(move |_method, _options, _payload| { + let response = ResetResponse { + ..Default::default() + }; + Ok(Some(UPayload::try_from_protobuf(response).unwrap())) + }); + + let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client)); + + assert!(usubscription_client + .reset(request.clone()) + .await + .is_err_and(|e| e.get_code() == UCode::INTERNAL)); + assert!(usubscription_client.reset(request).await.is_ok()); + } } diff --git a/src/core/usubscription.rs b/src/core/usubscription.rs index a041f11e..d7ae42bf 100644 --- a/src/core/usubscription.rs +++ b/src/core/usubscription.rs @@ -19,9 +19,9 @@ use mockall::automock; pub use crate::up_core_api::usubscription::{ fetch_subscriptions_request::Request, subscription_status::State, EventDeliveryConfig, FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest, - FetchSubscriptionsResponse, NotificationsRequest, NotificationsResponse, SubscribeAttributes, - SubscriberInfo, Subscription, SubscriptionRequest, SubscriptionResponse, SubscriptionStatus, - UnsubscribeRequest, UnsubscribeResponse, Update, + FetchSubscriptionsResponse, NotificationsRequest, NotificationsResponse, ResetRequest, + ResetResponse, SubscribeAttributes, SubscriberInfo, Subscription, SubscriptionRequest, + SubscriptionResponse, SubscriptionStatus, UnsubscribeRequest, UnsubscribeResponse, Update, }; use crate::{UStatus, UUri}; @@ -130,6 +130,8 @@ pub const RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS: u16 = 0x0006; pub const RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS: u16 = 0x0007; /// The resource identifier of uSubscription's _fetch subscribers_ operation. pub const RESOURCE_ID_FETCH_SUBSCRIBERS: u16 = 0x0008; +/// The resource identifier of uSubscription's _reset_ operation. +pub const RESOURCE_ID_RESET: u16 = 0x0009; /// The resource identifier of uSubscription's _subscription change_ topic. pub const RESOURCE_ID_SUBSCRIPTION_CHANGE: u16 = 0x8000; @@ -161,7 +163,7 @@ pub fn usubscription_uri(resource_id: u16) -> UUri { #[cfg_attr(test, automock)] #[async_trait] pub trait USubscription: Send + Sync { - /// Subscribe to a topic, using a [`SubscriptionRequest`] + /// Subscribes to a topic, using a [`SubscriptionRequest`] /// /// # Parameters /// @@ -175,7 +177,7 @@ pub trait USubscription: Send + Sync { subscription_request: SubscriptionRequest, ) -> Result; - /// Unsubscribe to a topic, using an [`UnsubscribeRequest`] + /// Unsubscribes from a topic, using an [`UnsubscribeRequest`] /// /// # Parameters /// @@ -186,7 +188,7 @@ pub trait USubscription: Send + Sync { /// * [`UStatus`] detailing if unsubscription was successful and if not why not async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus>; - /// Fetch all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`] + /// Fetches all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`] /// /// # Parameters /// @@ -200,7 +202,7 @@ pub trait USubscription: Send + Sync { fetch_subscriptions_request: FetchSubscriptionsRequest, ) -> Result; - /// Register for notifications relevant to a given topic inside a [`NotificationsRequest`] + /// Registers for notifications relevant to a given topic inside a [`NotificationsRequest`] /// changing in subscription status. /// /// # Parameters @@ -215,7 +217,7 @@ pub trait USubscription: Send + Sync { notifications_register_request: NotificationsRequest, ) -> Result<(), UStatus>; - /// Unregister for notifications relevant to a given topic inside a [`NotificationsRequest`] + /// Unregisters from notifications relevant to a given topic inside a [`NotificationsRequest`] /// changing in subscription status. /// /// # Parameters @@ -230,7 +232,7 @@ pub trait USubscription: Send + Sync { notifications_unregister_request: NotificationsRequest, ) -> Result<(), UStatus>; - /// Fetch a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`] + /// Fetches a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`] /// /// # Parameters /// @@ -243,4 +245,11 @@ pub trait USubscription: Send + Sync { &self, fetch_subscribers_request: FetchSubscribersRequest, ) -> Result; + + /// Flushes all stored subscription information, including any persistently stored subscriptions + /// + /// # Returns + /// + /// * [`ResetResponse`] with result of the operation + async fn reset(&self, reset_request: ResetRequest) -> Result; } diff --git a/src/uattributes.rs b/src/uattributes.rs index 6db20874..0b724a24 100644 --- a/src/uattributes.rs +++ b/src/uattributes.rs @@ -50,8 +50,8 @@ impl UAttributesError { impl std::fmt::Display for UAttributesError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::ValidationError(e) => f.write_fmt(format_args!("Validation failure: {}", e)), - Self::ParsingError(e) => f.write_fmt(format_args!("Parsing error: {}", e)), + Self::ValidationError(e) => f.write_fmt(format_args!("Validation failure: {e}")), + Self::ParsingError(e) => f.write_fmt(format_args!("Parsing error: {e}")), } } } diff --git a/src/uattributes/uattributesvalidator.rs b/src/uattributes/uattributesvalidator.rs index 9d30cf1a..9fcb1df2 100644 --- a/src/uattributes/uattributesvalidator.rs +++ b/src/uattributes/uattributesvalidator.rs @@ -47,8 +47,7 @@ pub trait UAttributesValidator: Send { mt.to_cloudevent_type() ))), Err(unknown_code) => Err(UAttributesError::validation_error(format!( - "Unknown Message Type code [{}]", - unknown_code + "Unknown Message Type code [{unknown_code}]" ))), } } @@ -98,8 +97,7 @@ pub fn validate_rpc_priority(attributes: &UAttributes) -> Result<(), UAttributes .enum_value() .map_err(|unknown_code| { UAttributesError::ValidationError(format!( - "RPC message must have a valid priority [{}]", - unknown_code + "RPC message must have a valid priority [{unknown_code}]" )) }) .and_then(|prio| { @@ -256,9 +254,9 @@ impl UAttributesValidator for PublishValidator { fn validate_source(&self, attributes: &UAttributes) -> Result<(), UAttributesError> { // [impl->dsn~up-attributes-publish-source~1] if let Some(source) = attributes.source.as_ref() { - source.verify_event().map_err(|e| { - UAttributesError::validation_error(format!("Invalid source URI: {}", e)) - }) + source + .verify_event() + .map_err(|e| UAttributesError::validation_error(format!("Invalid source URI: {e}"))) } else { Err(UAttributesError::validation_error( "Attributes for a publish message must contain a source URI", @@ -340,7 +338,7 @@ impl UAttributesValidator for NotificationValidator { )) } else { source.verify_no_wildcards().map_err(|e| { - UAttributesError::validation_error(format!("Invalid source URI: {}", e)) + UAttributesError::validation_error(format!("Invalid source URI: {e}")) }) } } else { @@ -368,7 +366,7 @@ impl UAttributesValidator for NotificationValidator { )) } else { sink.verify_no_wildcards().map_err(|e| { - UAttributesError::validation_error(format!("Invalid sink URI: {}", e)) + UAttributesError::validation_error(format!("Invalid sink URI: {e}")) }) } } else { @@ -451,9 +449,8 @@ impl UAttributesValidator for RequestValidator { fn validate_source(&self, attributes: &UAttributes) -> Result<(), UAttributesError> { // [impl->dsn~up-attributes-request-source~1] if let Some(source) = attributes.source.as_ref() { - UUri::verify_rpc_response(source).map_err(|e| { - UAttributesError::validation_error(format!("Invalid source URI: {}", e)) - }) + UUri::verify_rpc_response(source) + .map_err(|e| UAttributesError::validation_error(format!("Invalid source URI: {e}"))) } else { Err(UAttributesError::validation_error("Attributes for a request message must contain a reply-to address in the source property")) } @@ -469,7 +466,7 @@ impl UAttributesValidator for RequestValidator { // [impl->dsn~up-attributes-request-sink~1] if let Some(sink) = attributes.sink.as_ref() { UUri::verify_rpc_method(sink) - .map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {}", e))) + .map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {e}"))) } else { Err(UAttributesError::validation_error("Attributes for a request message must contain a method-to-invoke in the sink property")) } @@ -574,9 +571,8 @@ impl UAttributesValidator for ResponseValidator { fn validate_source(&self, attributes: &UAttributes) -> Result<(), UAttributesError> { // [impl->dsn~up-attributes-response-source~1] if let Some(source) = attributes.source.as_ref() { - UUri::verify_rpc_method(source).map_err(|e| { - UAttributesError::validation_error(format!("Invalid source URI: {}", e)) - }) + UUri::verify_rpc_method(source) + .map_err(|e| UAttributesError::validation_error(format!("Invalid source URI: {e}"))) } else { Err(UAttributesError::validation_error("Missing Source")) } @@ -593,7 +589,7 @@ impl UAttributesValidator for ResponseValidator { // [impl->dsn~up-attributes-response-sink~1] if let Some(sink) = &attributes.sink.as_ref() { UUri::verify_rpc_response(sink) - .map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {}", e))) + .map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {e}"))) } else { Err(UAttributesError::validation_error("Missing Sink")) } diff --git a/src/uattributes/upayloadformat.rs b/src/uattributes/upayloadformat.rs index 1ce753f4..8d8c1a07 100644 --- a/src/uattributes/upayloadformat.rs +++ b/src/uattributes/upayloadformat.rs @@ -37,7 +37,7 @@ impl UPayloadError { impl std::fmt::Display for UPayloadError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {}", e)), + Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {e}")), Self::MediatypeProblem => { f.write_fmt(format_args!("Mediatype problem unsupported or malformed")) } diff --git a/src/uattributes/upriority.rs b/src/uattributes/upriority.rs index 1bf87531..fa7c5f51 100644 --- a/src/uattributes/upriority.rs +++ b/src/uattributes/upriority.rs @@ -77,6 +77,6 @@ impl UPriority { } }) }) - .ok_or_else(|| UAttributesError::parsing_error(format!("unknown priority [{}]", prio))) + .ok_or_else(|| UAttributesError::parsing_error(format!("unknown priority [{prio}]"))) } } diff --git a/src/umessage.rs b/src/umessage.rs index 303a171a..286c9cfd 100644 --- a/src/umessage.rs +++ b/src/umessage.rs @@ -34,13 +34,12 @@ impl std::fmt::Display for UMessageError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::AttributesValidationError(e) => f.write_fmt(format_args!( - "Builder state is not consistent with message type: {}", - e + "Builder state is not consistent with message type: {e}" )), Self::DataSerializationError(e) => { - f.write_fmt(format_args!("Failed to serialize payload: {}", e)) + f.write_fmt(format_args!("Failed to serialize payload: {e}")) } - Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)), + Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {e}")), } } } diff --git a/src/umessage/umessagetype.rs b/src/umessage/umessagetype.rs index b0494f5b..ada91222 100644 --- a/src/umessage/umessagetype.rs +++ b/src/umessage/umessagetype.rs @@ -56,7 +56,7 @@ impl UMessageType { }) }) .ok_or_else(|| { - UAttributesError::parsing_error(format!("unknown message type: {}", type_string)) + UAttributesError::parsing_error(format!("unknown message type: {type_string}")) }) } } diff --git a/src/uri.rs b/src/uri.rs index 4144ba87..a7f3cefa 100644 --- a/src/uri.rs +++ b/src/uri.rs @@ -55,8 +55,8 @@ impl UUriError { impl std::fmt::Display for UUriError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {}", e)), - Self::ValidationError(e) => f.write_fmt(format_args!("Validation error: {}", e)), + Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {e}")), + Self::ValidationError(e) => f.write_fmt(format_args!("Validation error: {e}")), } } } @@ -177,9 +177,8 @@ impl FromStr for UUri { "URI must contain non-empty entity ID", )); } - let ue_id = u32::from_str_radix(entity, 16).map_err(|e| { - UUriError::serialization_error(format!("Cannot parse entity ID: {}", e)) - })?; + let ue_id = u32::from_str_radix(entity, 16) + .map_err(|e| UUriError::serialization_error(format!("Cannot parse entity ID: {e}")))?; let version = path_segments[1].as_str(); if version.is_empty() { return Err(UUriError::serialization_error( @@ -187,7 +186,7 @@ impl FromStr for UUri { )); } let ue_version_major = u8::from_str_radix(version, 16).map_err(|e| { - UUriError::serialization_error(format!("Cannot parse entity version: {}", e)) + UUriError::serialization_error(format!("Cannot parse entity version: {e}")) })?; let resource = path_segments[2].as_str(); if resource.is_empty() { @@ -196,7 +195,7 @@ impl FromStr for UUri { )); } let resource_id = u16::from_str_radix(resource, 16).map_err(|e| { - UUriError::serialization_error(format!("Cannot parse resource ID: {}", e)) + UUriError::serialization_error(format!("Cannot parse resource ID: {e}")) })?; Ok(UUri { @@ -458,7 +457,7 @@ impl UUri { // [impl->dsn~uri-host-only~2] fn verify_authority(authority: &str) -> Result { Authority::try_from(authority) - .map_err(|e| UUriError::validation_error(format!("invalid authority: {}", e))) + .map_err(|e| UUriError::validation_error(format!("invalid authority: {e}"))) .and_then(|auth| Self::verify_parsed_authority(&auth)) } @@ -720,28 +719,22 @@ impl UUri { pub fn verify_no_wildcards(&self) -> Result<(), UUriError> { if self.has_wildcard_authority() { Err(UUriError::validation_error(format!( - "Authority must not contain wildcard character [{}]", - WILDCARD_AUTHORITY + "Authority must not contain wildcard character [{WILDCARD_AUTHORITY}]" ))) } else if self.has_wildcard_entity_instance() { Err(UUriError::validation_error(format!( - "Entity instance ID must not be set to wildcard value [{:#X}]", - WILDCARD_ENTITY_INSTANCE - ))) + "Entity instance ID must not be set to wildcard value [{WILDCARD_ENTITY_INSTANCE:#X}]"))) } else if self.has_wildcard_entity_type() { Err(UUriError::validation_error(format!( - "Entity type ID must not be set to wildcard value [{:#X}]", - WILDCARD_ENTITY_TYPE + "Entity type ID must not be set to wildcard value [{WILDCARD_ENTITY_TYPE:#X}]" ))) } else if self.has_wildcard_version() { Err(UUriError::validation_error(format!( - "Entity version must not be set to wildcard value [{:#X}]", - WILDCARD_ENTITY_VERSION + "Entity version must not be set to wildcard value [{WILDCARD_ENTITY_VERSION:#X}]" ))) } else if self.has_wildcard_resource_id() { Err(UUriError::validation_error(format!( - "Resource ID must not be set to wildcard value [{:#X}]", - WILDCARD_RESOURCE_ID + "Resource ID must not be set to wildcard value [{WILDCARD_RESOURCE_ID:#X}]" ))) } else { Ok(()) @@ -794,9 +787,7 @@ impl UUri { pub fn verify_rpc_method(&self) -> Result<(), UUriError> { if !self.is_rpc_method() { Err(UUriError::validation_error(format!( - "Resource ID must be a value from ]{:#X}, {:#X}[", - RESOURCE_ID_RESPONSE, RESOURCE_ID_MIN_EVENT - ))) + "Resource ID must be a value from ]{RESOURCE_ID_RESPONSE:#X}, {RESOURCE_ID_MIN_EVENT:#X}["))) } else { self.verify_no_wildcards() } @@ -861,8 +852,7 @@ impl UUri { pub fn verify_rpc_response(&self) -> Result<(), UUriError> { if !self.is_rpc_response() { Err(UUriError::validation_error(format!( - "Resource ID must be {:#X}", - RESOURCE_ID_RESPONSE + "Resource ID must be {RESOURCE_ID_RESPONSE:#X}" ))) } else { self.verify_no_wildcards() @@ -909,8 +899,7 @@ impl UUri { pub fn verify_event(&self) -> Result<(), UUriError> { if !self.is_event() { Err(UUriError::validation_error(format!( - "Resource ID must be >= {:#X}", - RESOURCE_ID_MIN_EVENT + "Resource ID must be >= {RESOURCE_ID_MIN_EVENT:#X}" ))) } else { self.verify_no_wildcards() diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 37c08367..3f1134b4 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -33,7 +33,7 @@ impl CustomTckOpts { #[allow(dead_code)] pub(crate) fn get_junit_out_file(&self, tck_test_name: &str) -> Option { self.junit_out_folder.as_ref().map(|path| { - fs::File::create(format!("{}/tck-{}-results.xml", path, tck_test_name)) + fs::File::create(format!("{path}/tck-{tck_test_name}-results.xml")) .expect("failed to create JUnit report file") }) }