diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 76a5322..c96c38b 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -17,6 +17,7 @@ ARG TARGETARCH RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ && apt-get -y install \ + cmake \ curl \ gcc \ git \ diff --git a/.env.oft-current b/.env.oft-current index 1781bcb..fbe39c2 100644 --- a/.env.oft-current +++ b/.env.oft-current @@ -17,7 +17,7 @@ UP_SPEC_FILE_PATTERNS="up-spec/up-l3/usubscription/v3/README.adoc" # The file patterns that specify this component's resources which contain specification items # that cover the requirements -COMPONENT_FILE_PATTERNS="*.adoc *.md *.rs .github up-subscription/src" +COMPONENT_FILE_PATTERNS="*.md .github up-subscription/src" OFT_FILE_PATTERNS="$UP_SPEC_FILE_PATTERNS $COMPONENT_FILE_PATTERNS" OFT_TAGS="" diff --git a/.env.oft-latest b/.env.oft-latest index 21b15bc..1649539 100644 --- a/.env.oft-latest +++ b/.env.oft-latest @@ -17,7 +17,7 @@ UP_SPEC_FILE_PATTERNS="up-spec/up-l3/usubscription/v3/README.adoc" # The file patterns that specify this component's resources which contain specification items # that cover the requirements -COMPONENT_FILE_PATTERNS="*.adoc *.md *.rs .github up-subscription/src" +COMPONENT_FILE_PATTERNS="*.md .github up-subscription/src" OFT_FILE_PATTERNS="$UP_SPEC_FILE_PATTERNS $COMPONENT_FILE_PATTERNS" OFT_TAGS="_,uSubscription" diff --git a/tools/coverage.sh b/tools/coverage.sh old mode 100644 new mode 100755 diff --git a/tools/generate_test_coverage_report.sh b/tools/generate_test_coverage_report.sh old mode 100644 new mode 100755 diff --git a/tools/startup.sh b/tools/startup.sh old mode 100644 new mode 100755 diff --git a/up-subscription-cli/src/main.rs b/up-subscription-cli/src/main.rs index 741ef8d..52a2cda 100644 --- a/up-subscription-cli/src/main.rs +++ b/up-subscription-cli/src/main.rs @@ -58,7 +58,7 @@ impl StartupError { impl std::fmt::Display for StartupError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::ConfigurationError(e) => f.write_fmt(format_args!("Configuration error: {}", e)), + Self::ConfigurationError(e) => f.write_fmt(format_args!("Configuration error: {e}")), } } } @@ -193,7 +193,7 @@ async fn main() { Ok(_) => { debug!("Success, running daemonized"); } - Err(e) => error!("Error, {}", e), + Err(e) => error!("Error, {e}"), } } diff --git a/up-subscription/src/common/helpers.rs b/up-subscription/src/common/helpers.rs index 6ade48e..40db451 100644 --- a/up-subscription/src/common/helpers.rs +++ b/up-subscription/src/common/helpers.rs @@ -36,7 +36,7 @@ where { task::spawn(async move { if let Err(e) = fut.await { - error!("{}", e) + error!("{e}") } }) } @@ -53,8 +53,7 @@ where { if resource_id != expected_resource_id { return Err(ServiceInvocationError::InvalidArgument(format!( - "Wrong resource ID (expected {}, got {})", - expected_resource_id, resource_id + "Wrong resource ID (expected {expected_resource_id}, got {resource_id})" ))); } let Some(payload) = payload else { diff --git a/up-subscription/src/configuration.rs b/up-subscription/src/configuration.rs index 2e22131..62ca347 100644 --- a/up-subscription/src/configuration.rs +++ b/up-subscription/src/configuration.rs @@ -86,13 +86,10 @@ impl USubscriptionConfiguration { let persistency_path = if let Some(path_string) = persistency_path { let p = Path::new(&path_string); p.try_exists().unwrap_or_else(|_| { - panic!("Persistency storage path does not exist {}", path_string) + panic!("Persistency storage path does not exist {path_string}") }); if !p.is_dir() { - panic!( - "Persistency storage path is not a directory {}", - path_string - ); + panic!("Persistency storage path is not a directory {path_string}"); } p.to_path_buf() } else { diff --git a/up-subscription/src/notification_manager.rs b/up-subscription/src/notification_manager.rs index e19bded..750a8c2 100644 --- a/up-subscription/src/notification_manager.rs +++ b/up-subscription/src/notification_manager.rs @@ -64,6 +64,43 @@ pub(crate) enum NotificationEvent { }, } +impl PartialEq for NotificationEvent { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + ( + NotificationEvent::StateChange { + subscriber: s1, + topic: t1, + status: st1, + .. + }, + NotificationEvent::StateChange { + subscriber: s2, + topic: t2, + status: st2, + .. + }, + ) => s1 == s2 && t1 == t2 && st1 == st2, + ( + NotificationEvent::AddNotifyee { + subscriber: s1, + topic: t1, + }, + NotificationEvent::AddNotifyee { + subscriber: s2, + topic: t2, + }, + ) => s1 == s2 && t1 == t2, + ( + NotificationEvent::RemoveNotifyee { subscriber: s1 }, + NotificationEvent::RemoveNotifyee { subscriber: s2 }, + ) => s1 == s2, + // Don't care about the test-only variants + _ => false, + } + } +} + // Keeps track of and sends subscription update notification to all registered update-notification channels. // Interfacing with this purely works via channels. pub(crate) async fn notification_engine( @@ -186,7 +223,7 @@ pub(crate) async fn notification_engine( } } - let _r = respond_to.send(()); + let _ = respond_to.send(()); } #[cfg(test)] NotificationEvent::GetNotificationTopics { respond_to } => { diff --git a/up-subscription/src/persistency.rs b/up-subscription/src/persistency.rs index 246a4be..4efca24 100644 --- a/up-subscription/src/persistency.rs +++ b/up-subscription/src/persistency.rs @@ -61,8 +61,8 @@ impl PersistencyError { impl std::fmt::Display for PersistencyError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {}", e)), - Self::InternalError(e) => f.write_fmt(format_args!("Internal error: {}", e)), + Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {e}")), + Self::InternalError(e) => f.write_fmt(format_args!("Internal error: {e}")), } } } diff --git a/up-subscription/src/subscription_manager.rs b/up-subscription/src/subscription_manager.rs index c60f402..7401d7b 100644 --- a/up-subscription/src/subscription_manager.rs +++ b/up-subscription/src/subscription_manager.rs @@ -226,6 +226,7 @@ pub(crate) async fn handle_message( expiry, ) { // [impl->req~usubscription-subscribe-notifications~1] + // [impl->dsn~usubscription-change-notification-update~1] Ok(result) => { // Send topic state change notification notification_manager::notify( @@ -262,6 +263,7 @@ pub(crate) async fn handle_message( ) { Ok(result) => { // Send topic state change notification + // [impl->dsn~usubscription-change-notification-update~1] notification_manager::notify( notification_sender.clone(), Some(subscriber), @@ -364,20 +366,44 @@ pub(crate) async fn handle_message( InternalSubscriptionEvent::TopicStateUpdate { topic, state } => { match remote_topics.set_topic_state(&topic, state) { Ok(_) => { - // Set up timed unsubscription here, in case state changes to ::PENDING + // We're supposed to send topic change notifications to any subscribers of topic + // [impl->dsn~usubscription-change-notification-update~1] + if let Ok(subscribers) = subscriptions.get_topic_subscribers(&topic) { + let topic_clone = topic.clone(); + let notification_sender_clone = notification_sender.clone(); + // Want to do this out off the main control flow + helpers::spawn_and_log_error(async move { + for subscriber in subscribers { + notification_manager::notify( + notification_sender_clone.clone(), + Some(subscriber), + topic_clone.clone(), + SubscriptionStatus { + state: state.into(), + ..Default::default() + }, + ) + .await; + } + Ok(()) + }); + } else { + warn!("Failed to send topic state change update notification to topic subscribers"); + } // Send topic state change notification - in the case of remote subscriptions, // the subscriber is usubscription service itself, so leave that field empty. - notification_manager::notify( - notification_sender.clone(), - None, - topic, - SubscriptionStatus { - state: state.into(), - ..Default::default() - }, - ) - .await; + // TODO: Let's see if this is actually covered by a requirement - otherwise it should go + // notification_manager::notify( + // notification_sender.clone(), + // None, + // topic, + // SubscriptionStatus { + // state: state.into(), + // ..Default::default() + // }, + // ) + // .await; } Err(e) => { panic!("Persistency failure {e}"); @@ -397,6 +423,7 @@ pub(crate) async fn handle_message( ) { Ok(result) => { // Send topic state change notification + // [impl->dsn~usubscription-change-notification-update~1] notification_manager::notify( notification_sender.clone(), Some(subscriber), @@ -699,7 +726,7 @@ async fn remote_unsubscribe( .await; } code => { - debug!("Got {:?} remote unsubscribe response", code); + debug!("Got {code:?} remote unsubscribe response"); return Err(UStatus::fail_with_code( code, "Error during remote unsubscribe", diff --git a/up-subscription/src/tests/subscription_manager_tests.rs b/up-subscription/src/tests/subscription_manager_tests.rs index 81fbc61..b2cbfa0 100644 --- a/up-subscription/src/tests/subscription_manager_tests.rs +++ b/up-subscription/src/tests/subscription_manager_tests.rs @@ -47,6 +47,7 @@ mod tests { // Simple subscription-manager-actor front-end to use for testing struct CommandSender { command_sender: Sender, + shutdowner: Arc, } impl CommandSender { @@ -69,47 +70,53 @@ mod tests { mpsc::channel::(config.notification_command_buffer); // Spawn notification receiver task + let shutdown_notification_cloned = shutdown_notification.clone(); helpers::spawn_and_log_error(async move { - match notification_receiver.recv().await { - Some(NotificationEvent::StateChange { - subscriber, - topic, - status, - respond_to, - }) => { - println!( - "Change Notification received: {} - {} - {}", - subscriber.unwrap().to_uri(true), - topic.to_uri(true), - status - ); - - let _ = respond_to.send(()); - } - _ => { - panic!("Expected a notification message, got nothing") - } + loop { + tokio::select! { + Some(event) = notification_receiver.recv() => { + if let NotificationEvent::StateChange { subscriber, topic, status, respond_to } = event { + println!( + "Change Notification received: {} - {} - {}", + subscriber.unwrap().to_uri(true), + topic.to_uri(true), + status + ); + + let _ = respond_to.send(()); + } else { + panic!("Expected a NotificationEvent::StateChange message, got something else") + } + }, + _ = shutdown_notification_cloned.notified() => break, + }; } Ok(()) }); + let shutdown_notification_cloned = shutdown_notification.clone(); helpers::spawn_and_log_error(async move { handle_message( config.clone(), Arc::new(transport_mock), command_receiver, notification_sender, - shutdown_notification, + shutdown_notification_cloned, ) .await; Ok(()) }); - CommandSender { command_sender } + CommandSender { + command_sender, + shutdowner: shutdown_notification, + } } // Allows configuration of expected invoke_method() calls from subscription manager (provide expected request and response for utransport mock) - async fn new_with_expected_notification(expected_notification: NotificationEvent) -> Self { + async fn new_with_expected_notifications( + mut expected_notifications: Vec, + ) -> Self { let config = Arc::new( USubscriptionConfiguration::create( test_lib::helpers::LOCAL_AUTHORITY.to_string(), @@ -128,50 +135,58 @@ mod tests { mpsc::channel::(config.notification_command_buffer); // Spawn notification receiver task + let shutdown_notification_cloned = shutdown_notification.clone(); helpers::spawn_and_log_error(async move { - match notification_receiver.recv().await { - Some(NotificationEvent::StateChange { - subscriber, - topic, - status, - respond_to: _, - }) => { - if let NotificationEvent::StateChange { - subscriber: expected_subscriber, - topic: expected_topic, - status: expected_status, - respond_to: _, - } = expected_notification - { - assert_eq!(subscriber, expected_subscriber, "subscriber mismatch"); - assert_eq!(topic, expected_topic, "topic mismatch"); - assert_eq!(status, expected_status, "status mismatch"); - } else { - panic!("expected_notification is not StateChange variant"); - } - } - _ => { - panic!("Expected a notification message, got nothing") - } + loop { + tokio::select! { + Some(event) = notification_receiver.recv() => { + if let Some(pos) = expected_notifications.iter().position(|e| e == &event) { + if let NotificationEvent::StateChange { subscriber, status, topic, respond_to } = event { + println!( + "Change Notification received: {} - {} - {}", + subscriber.unwrap_or_default().to_uri(true), + topic.to_uri(true), + status + ); + let _ = respond_to.send(()); + } + + // Send ack back to test case that was providing the expected_notifications back channel + let matched = expected_notifications.remove(pos); + if let NotificationEvent::StateChange { respond_to, .. } = matched { + let _ = respond_to.send(()); + } + } else { + panic!("Received unexpected notification event: {event:?}"); + } + }, + _ = shutdown_notification_cloned.notified() => { + break; + }, + }; } Ok(()) }); // Spawn off subscription manager task + let shutdown_notification_cloned = shutdown_notification.clone(); helpers::spawn_and_log_error(async move { handle_message( config.clone(), Arc::new(transport_mock), command_receiver, notification_sender, - shutdown_notification, + shutdown_notification_cloned, ) .await; Ok(()) }); - CommandSender { command_sender } + CommandSender { + command_sender, + shutdowner: shutdown_notification, + } } // Allows configuration of expected invoke_method() calls from subscription manager (provide expected request and response for utransport mock) @@ -204,18 +219,27 @@ mod tests { let (notification_sender, _) = mpsc::channel::(config.notification_command_buffer); + let shutdown_notification_cloned = shutdown_notification.clone(); helpers::spawn_and_log_error(async move { handle_message( config, mock_transport, command_receiver, notification_sender, - shutdown_notification, + shutdown_notification_cloned, ) .await; Ok(()) }); - CommandSender { command_sender } + + CommandSender { + command_sender, + shutdowner: shutdown_notification, + } + } + + async fn shutdown(&self) { + self.shutdowner.notify_waiters(); } async fn subscribe( @@ -825,6 +849,7 @@ mod tests { } // [utest->req~usubscription-subscribe-notifications~1] + // [utest->dsn~usubscription-change-notification-update~1] #[tokio::test] async fn test_local_subscribe_notification() { helpers::init_once(); @@ -845,16 +870,17 @@ mod tests { }; let command_sender = - CommandSender::new_with_expected_notification(expected_notification).await; + CommandSender::new_with_expected_notifications(vec![expected_notification]).await; // Operation to test let result = command_sender.subscribe(topic, subscriber, None).await; assert!(result.is_ok()); let _ = state_changed.await; - // notification_checked.notified().await; + command_sender.shutdown().await; } + // [utest->dsn~usubscription-change-notification-update~1] #[tokio::test] async fn test_local_unsubscribe_notification() { helpers::init_once(); @@ -884,7 +910,7 @@ mod tests { }; let command_sender = - CommandSender::new_with_expected_notification(expected_notification).await; + CommandSender::new_with_expected_notifications(vec![expected_notification]).await; command_sender .set_topic_subscribers(desired_state) @@ -896,29 +922,84 @@ mod tests { assert!(result.is_ok()); let _ = state_changed.await; + command_sender.shutdown().await; } - // [utest->req~usubscription-subscribe-notifications~1] + // TODO: Let's see if this is actually covered by a requirement - otherwise it should go + // #[tokio::test] + // async fn test_remote_subscribe_notification() { + // helpers::init_once(); + + // // Prepare things + // let topic = test_lib::helpers::remote_topic1_uri(); + // let (respond_to, state_changed) = oneshot::channel::<()>(); + + // let expected_notification = NotificationEvent::StateChange { + // subscriber: None, + // topic: topic.clone(), + // status: SubscriptionStatus { + // state: State::SUBSCRIBE_PENDING.into(), + // ..Default::default() + // }, + // respond_to, + // }; + + // let command_sender = + // CommandSender::new_with_expected_notifications(vec![expected_notification]).await; + + // let sender = command_sender + // .get_remote_subcription_change_sender() + // .await + // .expect("Error retrieving remote-subscription change event command channel"); + + // // Initiate notification event + // let _ = sender + // .send(InternalSubscriptionEvent::TopicStateUpdate { + // topic: topic.clone(), + // state: State::SUBSCRIBE_PENDING, + // }) + // .await; + + // // ensure that we have run through all the async layers and reached the notification assertion statements + // let _ = state_changed.await; + // command_sender.shutdown().await; + // } + + // [utest->dsn~usubscription-change-notification-update~1] #[tokio::test] - async fn test_remote_subscribe_notification() { + async fn test_state_change_notification() { helpers::init_once(); // Prepare things - let topic = test_lib::helpers::local_topic1_uri(); + let topic = test_lib::helpers::remote_topic1_uri(); + let subscriber = test_lib::helpers::subscriber_uri1(); let (respond_to, state_changed) = oneshot::channel::<()>(); let expected_notification = NotificationEvent::StateChange { - subscriber: None, + subscriber: subscriber.clone().into(), topic: topic.clone(), status: SubscriptionStatus { - state: State::SUBSCRIBE_PENDING.into(), + state: State::UNSUBSCRIBE_PENDING.into(), ..Default::default() }, respond_to, }; let command_sender = - CommandSender::new_with_expected_notification(expected_notification).await; + CommandSender::new_with_expected_notifications(vec![expected_notification]).await; + + // We need a subscriber to topic, which we're subsequently expecting a state change notification to be sent to + // let subscribers = HashMap>>::new(); + // set starting state + #[allow(clippy::mutable_key_type)] + let mut desired_state: persistency::SubscriptionSet = HashMap::new(); + #[allow(clippy::mutable_key_type)] + let entry = desired_state.entry(topic.clone()).or_default(); + entry.insert(subscriber.clone(), None); + assert!(command_sender + .set_topic_subscribers(desired_state) + .await + .is_ok()); let sender = command_sender .get_remote_subcription_change_sender() @@ -929,12 +1010,13 @@ mod tests { let _ = sender .send(InternalSubscriptionEvent::TopicStateUpdate { topic: topic.clone(), - state: State::SUBSCRIBE_PENDING, + state: State::UNSUBSCRIBE_PENDING, }) .await; // ensure that we have run through all the async layers and reached the notification assertion statements let _ = state_changed.await; + command_sender.shutdown().await; } // [utest->req~usubscription-fetch-subscribers~1] diff --git a/up-subscription/src/tests/test_lib.rs b/up-subscription/src/tests/test_lib.rs index 4bc2914..21018c3 100644 --- a/up-subscription/src/tests/test_lib.rs +++ b/up-subscription/src/tests/test_lib.rs @@ -127,7 +127,7 @@ pub(crate) mod mocks { .map_err(|e| { UStatus::fail_with_code( up_rust::UCode::INTERNAL, - format!("Error building response: {}", e), + format!("Error building response: {e}"), ) }) .unwrap();