Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/simple_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await
{
Err(ServiceInvocationError::InvalidArgument(msg)) => {
println!("service returned expected error: {}", msg)
println!("service returned expected error: {msg}")
}
_ => panic!("expected service to return an Invalid Argument error"),
}
Expand Down
2 changes: 1 addition & 1 deletion src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Display for RegistrationError {
"the underlying transport implementation does not support the push delivery method",
),
RegistrationError::InvalidFilter(msg) => {
f.write_fmt(format_args!("invalid filter(s): {}", msg))
f.write_fmt(format_args!("invalid filter(s): {msg}"))
}
RegistrationError::Unknown(status) => f.write_fmt(format_args!(
"error un-/registering listener: {}",
Expand Down
3 changes: 1 addition & 2 deletions src/communication/default_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ impl Publisher for SimplePublisher {
.await
.map_err(PubSubError::PublishError),
Err(e) => Err(PubSubError::InvalidArgument(format!(
"failed to create Publish message from parameters: {}",
e
"failed to create Publish message from parameters: {e}"
))),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/communication/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Display for NotificationError {
match self {
NotificationError::InvalidArgument(s) => f.write_str(s.as_str()),
NotificationError::NotifyError(s) => {
f.write_fmt(format_args!("failed to send notification: {}", s))
f.write_fmt(format_args!("failed to send notification: {s}"))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/communication/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Display for PubSubError {
match self {
PubSubError::InvalidArgument(s) => f.write_str(s.as_str()),
PubSubError::PublishError(s) => {
f.write_fmt(format_args!("failed to publish message: {}", s))
f.write_fmt(format_args!("failed to publish message: {s}"))
}
}
}
Expand Down
66 changes: 62 additions & 4 deletions src/communication/usubscription_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use crate::{
core::usubscription::{
usubscription_uri, FetchSubscribersRequest, FetchSubscribersResponse,
FetchSubscriptionsRequest, FetchSubscriptionsResponse, NotificationsRequest,
NotificationsResponse, SubscriptionRequest, SubscriptionResponse, USubscription,
UnsubscribeRequest, UnsubscribeResponse, RESOURCE_ID_FETCH_SUBSCRIBERS,
RESOURCE_ID_FETCH_SUBSCRIPTIONS, RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS,
RESOURCE_ID_SUBSCRIBE, RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE,
NotificationsResponse, ResetRequest, ResetResponse, SubscriptionRequest,
SubscriptionResponse, USubscription, UnsubscribeRequest, UnsubscribeResponse,
RESOURCE_ID_FETCH_SUBSCRIBERS, RESOURCE_ID_FETCH_SUBSCRIPTIONS,
RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_RESET, RESOURCE_ID_SUBSCRIBE,
RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS, RESOURCE_ID_UNSUBSCRIBE,
},
UStatus,
};
Expand Down Expand Up @@ -136,6 +137,17 @@ impl USubscription for RpcClientUSubscription {
.await
.map_err(UStatus::from)
}

async fn reset(&self, reset_request: ResetRequest) -> Result<ResetResponse, UStatus> {
self.rpc_client
.invoke_proto_method::<_, ResetResponse>(
usubscription_uri(RESOURCE_ID_RESET),
Self::default_call_options(),
reset_request,
)
.await
.map_err(UStatus::from)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -459,4 +471,50 @@ mod tests {
.await
.is_ok());
}

#[tokio::test]
async fn test_reset_invokes_rpc_client() {
let request = ResetRequest::default();

let expected_request = request.clone();
let mut rpc_client = MockRpcClient::new();
let mut seq = Sequence::new();
rpc_client
.expect_invoke_method()
.once()
.in_sequence(&mut seq)
.withf(|method, _options, payload| {
method == &usubscription_uri(RESOURCE_ID_RESET) && payload.is_some()
})
.return_const(Err(crate::communication::ServiceInvocationError::Internal(
"internal error".to_string(),
)));
rpc_client
.expect_invoke_method()
.once()
.in_sequence(&mut seq)
.withf(move |method, _options, payload| {
let request = payload
.to_owned()
.unwrap()
.extract_protobuf::<ResetRequest>()
.unwrap();

request == expected_request && method == &usubscription_uri(RESOURCE_ID_RESET)
})
.returning(move |_method, _options, _payload| {
let response = ResetResponse {
..Default::default()
};
Ok(Some(UPayload::try_from_protobuf(response).unwrap()))
});

let usubscription_client = RpcClientUSubscription::new(Arc::new(rpc_client));

assert!(usubscription_client
.reset(request.clone())
.await
.is_err_and(|e| e.get_code() == UCode::INTERNAL));
assert!(usubscription_client.reset(request).await.is_ok());
}
}
27 changes: 18 additions & 9 deletions src/core/usubscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use mockall::automock;
pub use crate::up_core_api::usubscription::{
fetch_subscriptions_request::Request, subscription_status::State, EventDeliveryConfig,
FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest,
FetchSubscriptionsResponse, NotificationsRequest, NotificationsResponse, SubscribeAttributes,
SubscriberInfo, Subscription, SubscriptionRequest, SubscriptionResponse, SubscriptionStatus,
UnsubscribeRequest, UnsubscribeResponse, Update,
FetchSubscriptionsResponse, NotificationsRequest, NotificationsResponse, ResetRequest,
ResetResponse, SubscribeAttributes, SubscriberInfo, Subscription, SubscriptionRequest,
SubscriptionResponse, SubscriptionStatus, UnsubscribeRequest, UnsubscribeResponse, Update,
};

use crate::{UStatus, UUri};
Expand Down Expand Up @@ -130,6 +130,8 @@ pub const RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS: u16 = 0x0006;
pub const RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS: u16 = 0x0007;
/// The resource identifier of uSubscription's _fetch subscribers_ operation.
pub const RESOURCE_ID_FETCH_SUBSCRIBERS: u16 = 0x0008;
/// The resource identifier of uSubscription's _reset_ operation.
pub const RESOURCE_ID_RESET: u16 = 0x0009;

/// The resource identifier of uSubscription's _subscription change_ topic.
pub const RESOURCE_ID_SUBSCRIPTION_CHANGE: u16 = 0x8000;
Expand Down Expand Up @@ -161,7 +163,7 @@ pub fn usubscription_uri(resource_id: u16) -> UUri {
#[cfg_attr(test, automock)]
#[async_trait]
pub trait USubscription: Send + Sync {
/// Subscribe to a topic, using a [`SubscriptionRequest`]
/// Subscribes to a topic, using a [`SubscriptionRequest`]
///
/// # Parameters
///
Expand All @@ -175,7 +177,7 @@ pub trait USubscription: Send + Sync {
subscription_request: SubscriptionRequest,
) -> Result<SubscriptionResponse, UStatus>;

/// Unsubscribe to a topic, using an [`UnsubscribeRequest`]
/// Unsubscribes from a topic, using an [`UnsubscribeRequest`]
///
/// # Parameters
///
Expand All @@ -186,7 +188,7 @@ pub trait USubscription: Send + Sync {
/// * [`UStatus`] detailing if unsubscription was successful and if not why not
async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus>;

/// Fetch all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`]
/// Fetches all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`]
///
/// # Parameters
///
Expand All @@ -200,7 +202,7 @@ pub trait USubscription: Send + Sync {
fetch_subscriptions_request: FetchSubscriptionsRequest,
) -> Result<FetchSubscriptionsResponse, UStatus>;

/// Register for notifications relevant to a given topic inside a [`NotificationsRequest`]
/// Registers for notifications relevant to a given topic inside a [`NotificationsRequest`]
/// changing in subscription status.
///
/// # Parameters
Expand All @@ -215,7 +217,7 @@ pub trait USubscription: Send + Sync {
notifications_register_request: NotificationsRequest,
) -> Result<(), UStatus>;

/// Unregister for notifications relevant to a given topic inside a [`NotificationsRequest`]
/// Unregisters from notifications relevant to a given topic inside a [`NotificationsRequest`]
/// changing in subscription status.
///
/// # Parameters
Expand All @@ -230,7 +232,7 @@ pub trait USubscription: Send + Sync {
notifications_unregister_request: NotificationsRequest,
) -> Result<(), UStatus>;

/// Fetch a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`]
/// Fetches a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`]
///
/// # Parameters
///
Expand All @@ -243,4 +245,11 @@ pub trait USubscription: Send + Sync {
&self,
fetch_subscribers_request: FetchSubscribersRequest,
) -> Result<FetchSubscribersResponse, UStatus>;

/// Flushes all stored subscription information, including any persistently stored subscriptions
///
/// # Returns
///
/// * [`ResetResponse`] with result of the operation
async fn reset(&self, reset_request: ResetRequest) -> Result<ResetResponse, UStatus>;
}
4 changes: 2 additions & 2 deletions src/uattributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ impl UAttributesError {
impl std::fmt::Display for UAttributesError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ValidationError(e) => f.write_fmt(format_args!("Validation failure: {}", e)),
Self::ParsingError(e) => f.write_fmt(format_args!("Parsing error: {}", e)),
Self::ValidationError(e) => f.write_fmt(format_args!("Validation failure: {e}")),
Self::ParsingError(e) => f.write_fmt(format_args!("Parsing error: {e}")),
}
}
}
Expand Down
30 changes: 13 additions & 17 deletions src/uattributes/uattributesvalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ pub trait UAttributesValidator: Send {
mt.to_cloudevent_type()
))),
Err(unknown_code) => Err(UAttributesError::validation_error(format!(
"Unknown Message Type code [{}]",
unknown_code
"Unknown Message Type code [{unknown_code}]"
))),
}
}
Expand Down Expand Up @@ -98,8 +97,7 @@ pub fn validate_rpc_priority(attributes: &UAttributes) -> Result<(), UAttributes
.enum_value()
.map_err(|unknown_code| {
UAttributesError::ValidationError(format!(
"RPC message must have a valid priority [{}]",
unknown_code
"RPC message must have a valid priority [{unknown_code}]"
))
})
.and_then(|prio| {
Expand Down Expand Up @@ -256,9 +254,9 @@ impl UAttributesValidator for PublishValidator {
fn validate_source(&self, attributes: &UAttributes) -> Result<(), UAttributesError> {
// [impl->dsn~up-attributes-publish-source~1]
if let Some(source) = attributes.source.as_ref() {
source.verify_event().map_err(|e| {
UAttributesError::validation_error(format!("Invalid source URI: {}", e))
})
source
.verify_event()
.map_err(|e| UAttributesError::validation_error(format!("Invalid source URI: {e}")))
} else {
Err(UAttributesError::validation_error(
"Attributes for a publish message must contain a source URI",
Expand Down Expand Up @@ -340,7 +338,7 @@ impl UAttributesValidator for NotificationValidator {
))
} else {
source.verify_no_wildcards().map_err(|e| {
UAttributesError::validation_error(format!("Invalid source URI: {}", e))
UAttributesError::validation_error(format!("Invalid source URI: {e}"))
})
}
} else {
Expand Down Expand Up @@ -368,7 +366,7 @@ impl UAttributesValidator for NotificationValidator {
))
} else {
sink.verify_no_wildcards().map_err(|e| {
UAttributesError::validation_error(format!("Invalid sink URI: {}", e))
UAttributesError::validation_error(format!("Invalid sink URI: {e}"))
})
}
} else {
Expand Down Expand Up @@ -451,9 +449,8 @@ impl UAttributesValidator for RequestValidator {
fn validate_source(&self, attributes: &UAttributes) -> Result<(), UAttributesError> {
// [impl->dsn~up-attributes-request-source~1]
if let Some(source) = attributes.source.as_ref() {
UUri::verify_rpc_response(source).map_err(|e| {
UAttributesError::validation_error(format!("Invalid source URI: {}", e))
})
UUri::verify_rpc_response(source)
.map_err(|e| UAttributesError::validation_error(format!("Invalid source URI: {e}")))
} else {
Err(UAttributesError::validation_error("Attributes for a request message must contain a reply-to address in the source property"))
}
Expand All @@ -469,7 +466,7 @@ impl UAttributesValidator for RequestValidator {
// [impl->dsn~up-attributes-request-sink~1]
if let Some(sink) = attributes.sink.as_ref() {
UUri::verify_rpc_method(sink)
.map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {}", e)))
.map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {e}")))
} else {
Err(UAttributesError::validation_error("Attributes for a request message must contain a method-to-invoke in the sink property"))
}
Expand Down Expand Up @@ -574,9 +571,8 @@ impl UAttributesValidator for ResponseValidator {
fn validate_source(&self, attributes: &UAttributes) -> Result<(), UAttributesError> {
// [impl->dsn~up-attributes-response-source~1]
if let Some(source) = attributes.source.as_ref() {
UUri::verify_rpc_method(source).map_err(|e| {
UAttributesError::validation_error(format!("Invalid source URI: {}", e))
})
UUri::verify_rpc_method(source)
.map_err(|e| UAttributesError::validation_error(format!("Invalid source URI: {e}")))
} else {
Err(UAttributesError::validation_error("Missing Source"))
}
Expand All @@ -593,7 +589,7 @@ impl UAttributesValidator for ResponseValidator {
// [impl->dsn~up-attributes-response-sink~1]
if let Some(sink) = &attributes.sink.as_ref() {
UUri::verify_rpc_response(sink)
.map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {}", e)))
.map_err(|e| UAttributesError::validation_error(format!("Invalid sink URI: {e}")))
} else {
Err(UAttributesError::validation_error("Missing Sink"))
}
Expand Down
2 changes: 1 addition & 1 deletion src/uattributes/upayloadformat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl UPayloadError {
impl std::fmt::Display for UPayloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {}", e)),
Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {e}")),
Self::MediatypeProblem => {
f.write_fmt(format_args!("Mediatype problem unsupported or malformed"))
}
Expand Down
2 changes: 1 addition & 1 deletion src/uattributes/upriority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ impl UPriority {
}
})
})
.ok_or_else(|| UAttributesError::parsing_error(format!("unknown priority [{}]", prio)))
.ok_or_else(|| UAttributesError::parsing_error(format!("unknown priority [{prio}]")))
}
}
7 changes: 3 additions & 4 deletions src/umessage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ impl std::fmt::Display for UMessageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AttributesValidationError(e) => f.write_fmt(format_args!(
"Builder state is not consistent with message type: {}",
e
"Builder state is not consistent with message type: {e}"
)),
Self::DataSerializationError(e) => {
f.write_fmt(format_args!("Failed to serialize payload: {}", e))
f.write_fmt(format_args!("Failed to serialize payload: {e}"))
}
Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {}", e)),
Self::PayloadError(e) => f.write_fmt(format_args!("UMessage payload error: {e}")),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/umessage/umessagetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl UMessageType {
})
})
.ok_or_else(|| {
UAttributesError::parsing_error(format!("unknown message type: {}", type_string))
UAttributesError::parsing_error(format!("unknown message type: {type_string}"))
})
}
}
Expand Down
Loading