Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ARG TARGETARCH

RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
&& apt-get -y install \
cmake \
curl \
gcc \
git \
Expand Down
2 changes: 1 addition & 1 deletion .env.oft-current
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ UP_SPEC_FILE_PATTERNS="up-spec/up-l3/usubscription/v3/README.adoc"

# The file patterns that specify this component's resources which contain specification items
# that cover the requirements
COMPONENT_FILE_PATTERNS="*.adoc *.md *.rs .github up-subscription/src"
COMPONENT_FILE_PATTERNS="*.md .github up-subscription/src"

OFT_FILE_PATTERNS="$UP_SPEC_FILE_PATTERNS $COMPONENT_FILE_PATTERNS"
OFT_TAGS=""
2 changes: 1 addition & 1 deletion .env.oft-latest
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ UP_SPEC_FILE_PATTERNS="up-spec/up-l3/usubscription/v3/README.adoc"

# The file patterns that specify this component's resources which contain specification items
# that cover the requirements
COMPONENT_FILE_PATTERNS="*.adoc *.md *.rs .github up-subscription/src"
COMPONENT_FILE_PATTERNS="*.md .github up-subscription/src"

OFT_FILE_PATTERNS="$UP_SPEC_FILE_PATTERNS $COMPONENT_FILE_PATTERNS"
OFT_TAGS="_,uSubscription"
Empty file modified tools/coverage.sh
100644 → 100755
Empty file.
Empty file modified tools/generate_test_coverage_report.sh
100644 → 100755
Empty file.
Empty file modified tools/startup.sh
100644 → 100755
Empty file.
4 changes: 2 additions & 2 deletions up-subscription-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl StartupError {
impl std::fmt::Display for StartupError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ConfigurationError(e) => f.write_fmt(format_args!("Configuration error: {}", e)),
Self::ConfigurationError(e) => f.write_fmt(format_args!("Configuration error: {e}")),
}
}
}
Expand Down Expand Up @@ -193,7 +193,7 @@ async fn main() {
Ok(_) => {
debug!("Success, running daemonized");
}
Err(e) => error!("Error, {}", e),
Err(e) => error!("Error, {e}"),
}
}

Expand Down
5 changes: 2 additions & 3 deletions up-subscription/src/common/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
{
task::spawn(async move {
if let Err(e) = fut.await {
error!("{}", e)
error!("{e}")
}
})
}
Expand All @@ -53,8 +53,7 @@ where
{
if resource_id != expected_resource_id {
return Err(ServiceInvocationError::InvalidArgument(format!(
"Wrong resource ID (expected {}, got {})",
expected_resource_id, resource_id
"Wrong resource ID (expected {expected_resource_id}, got {resource_id})"
)));
}
let Some(payload) = payload else {
Expand Down
7 changes: 2 additions & 5 deletions up-subscription/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,10 @@ impl USubscriptionConfiguration {
let persistency_path = if let Some(path_string) = persistency_path {
let p = Path::new(&path_string);
p.try_exists().unwrap_or_else(|_| {
panic!("Persistency storage path does not exist {}", path_string)
panic!("Persistency storage path does not exist {path_string}")
});
if !p.is_dir() {
panic!(
"Persistency storage path is not a directory {}",
path_string
);
panic!("Persistency storage path is not a directory {path_string}");
}
p.to_path_buf()
} else {
Expand Down
39 changes: 38 additions & 1 deletion up-subscription/src/notification_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,43 @@ pub(crate) enum NotificationEvent {
},
}

impl PartialEq for NotificationEvent {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(
NotificationEvent::StateChange {
subscriber: s1,
topic: t1,
status: st1,
..
},
NotificationEvent::StateChange {
subscriber: s2,
topic: t2,
status: st2,
..
},
) => s1 == s2 && t1 == t2 && st1 == st2,
(
NotificationEvent::AddNotifyee {
subscriber: s1,
topic: t1,
},
NotificationEvent::AddNotifyee {
subscriber: s2,
topic: t2,
},
) => s1 == s2 && t1 == t2,
(
NotificationEvent::RemoveNotifyee { subscriber: s1 },
NotificationEvent::RemoveNotifyee { subscriber: s2 },
) => s1 == s2,
// Don't care about the test-only variants
_ => false,
}
}
}

// Keeps track of and sends subscription update notification to all registered update-notification channels.
// Interfacing with this purely works via channels.
pub(crate) async fn notification_engine(
Expand Down Expand Up @@ -186,7 +223,7 @@ pub(crate) async fn notification_engine(
}
}

let _r = respond_to.send(());
let _ = respond_to.send(());
}
#[cfg(test)]
NotificationEvent::GetNotificationTopics { respond_to } => {
Expand Down
4 changes: 2 additions & 2 deletions up-subscription/src/persistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl PersistencyError {
impl std::fmt::Display for PersistencyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {}", e)),
Self::InternalError(e) => f.write_fmt(format_args!("Internal error: {}", e)),
Self::SerializationError(e) => f.write_fmt(format_args!("Serialization error: {e}")),
Self::InternalError(e) => f.write_fmt(format_args!("Internal error: {e}")),
}
}
}
Expand Down
51 changes: 39 additions & 12 deletions up-subscription/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub(crate) async fn handle_message(
expiry,
) {
// [impl->req~usubscription-subscribe-notifications~1]
// [impl->dsn~usubscription-change-notification-update~1]
Ok(result) => {
// Send topic state change notification
notification_manager::notify(
Expand Down Expand Up @@ -262,6 +263,7 @@ pub(crate) async fn handle_message(
) {
Ok(result) => {
// Send topic state change notification
// [impl->dsn~usubscription-change-notification-update~1]
notification_manager::notify(
notification_sender.clone(),
Some(subscriber),
Expand Down Expand Up @@ -364,20 +366,44 @@ pub(crate) async fn handle_message(
InternalSubscriptionEvent::TopicStateUpdate { topic, state } => {
match remote_topics.set_topic_state(&topic, state) {
Ok(_) => {
// Set up timed unsubscription here, in case state changes to ::PENDING
// We're supposed to send topic change notifications to any subscribers of topic
// [impl->dsn~usubscription-change-notification-update~1]
if let Ok(subscribers) = subscriptions.get_topic_subscribers(&topic) {
let topic_clone = topic.clone();
let notification_sender_clone = notification_sender.clone();
// Want to do this out off the main control flow
helpers::spawn_and_log_error(async move {
for subscriber in subscribers {
notification_manager::notify(
notification_sender_clone.clone(),
Some(subscriber),
topic_clone.clone(),
SubscriptionStatus {
state: state.into(),
..Default::default()
},
)
.await;
}
Ok(())
});
} else {
warn!("Failed to send topic state change update notification to topic subscribers");
}

// Send topic state change notification - in the case of remote subscriptions,
// the subscriber is usubscription service itself, so leave that field empty.
notification_manager::notify(
notification_sender.clone(),
None,
topic,
SubscriptionStatus {
state: state.into(),
..Default::default()
},
)
.await;
// TODO: Let's see if this is actually covered by a requirement - otherwise it should go
// notification_manager::notify(
// notification_sender.clone(),
// None,
// topic,
// SubscriptionStatus {
// state: state.into(),
// ..Default::default()
// },
// )
// .await;
}
Err(e) => {
panic!("Persistency failure {e}");
Expand All @@ -397,6 +423,7 @@ pub(crate) async fn handle_message(
) {
Ok(result) => {
// Send topic state change notification
// [impl->dsn~usubscription-change-notification-update~1]
notification_manager::notify(
notification_sender.clone(),
Some(subscriber),
Expand Down Expand Up @@ -699,7 +726,7 @@ async fn remote_unsubscribe(
.await;
}
code => {
debug!("Got {:?} remote unsubscribe response", code);
debug!("Got {code:?} remote unsubscribe response");
return Err(UStatus::fail_with_code(
code,
"Error during remote unsubscribe",
Expand Down
Loading