From 7ee8efa4d321a027081c1e9736bc9a5415ae7f86 Mon Sep 17 00:00:00 2001 From: Maria Kuklina <101095419+kmd-fl@users.noreply.github.com> Date: Thu, 28 Sep 2023 15:20:21 +0200 Subject: [PATCH] fix(spell-bus): resubscribe spell on double subscription (#1797) * fix double subscription and add trace to spell-bus-api commands --- crates/spell-event-bus/src/bus.rs | 115 ++++++++++++++++++++++++++---- 1 file changed, 103 insertions(+), 12 deletions(-) diff --git a/crates/spell-event-bus/src/bus.rs b/crates/spell-event-bus/src/bus.rs index 36f328531e..51e556ff7f 100644 --- a/crates/spell-event-bus/src/bus.rs +++ b/crates/spell-event-bus/src/bus.rs @@ -5,7 +5,7 @@ use futures::StreamExt; use futures::{future, FutureExt}; use peer_metrics::SpellMetrics; use std::cmp::Ordering; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -97,6 +97,7 @@ impl PartialOrd for Scheduled { struct SubscribersState { subscribers: PeerEventSubscribers, scheduled: BinaryHeap, + active: HashSet>, } impl SubscribersState { @@ -104,10 +105,11 @@ impl SubscribersState { Self { subscribers: PeerEventSubscribers::new(), scheduled: BinaryHeap::new(), + active: HashSet::new(), } } - fn subscribe(&mut self, spell_id: SpellId, config: &SpellTriggerConfigs) -> Option<()> { + fn subscribe(&mut self, spell_id: SpellId, config: &SpellTriggerConfigs) { let spell_id = Arc::new(spell_id); for config in &config.triggers { match config { @@ -126,11 +128,12 @@ impl SubscribersState { } } } - Some(()) + self.active.insert(spell_id); } /// Returns true if spell_id was removed from subscribers fn unsubscribe(&mut self, spell_id: &SpellId) { + self.active.remove(spell_id); self.scheduled .retain(|scheduled| *scheduled.data.id != *spell_id); self.subscribers.remove(spell_id); @@ -240,12 +243,22 @@ impl SpellEventBus { let Command { action, reply } = command; match &action { Action::Subscribe(spell_id, config) => { - state.subscribe(spell_id.clone(), config).unwrap_or(()); + log::trace!("Subscribe {spell_id} to {:?}", config); + if state.active.contains(spell_id) { + log::warn!( + "spell {spell_id} is already running; re-subscribe to the new configuration" + ); + state.unsubscribe(spell_id); + } + + state.subscribe(spell_id.clone(), config); }, Action::Unsubscribe(spell_id) => { + log::trace!("Unsubscribe {spell_id}"); state.unsubscribe(spell_id); }, Action::Start => { + log::trace!("Start the bus"); is_started = true; } }; @@ -264,13 +277,17 @@ impl SpellEventBus { if let Some(scheduled_spell) = state.scheduled.pop() { log::trace!("Execute: {:?}", scheduled_spell); let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs(); + let spell_id = scheduled_spell.data.id.clone(); Self::trigger_spell(&send_events, &scheduled_spell.data.id, TriggerInfo::Timer(TimerEvent{ timestamp }))?; // Do not reschedule the spell otherwise. if let Some(rescheduled) = Scheduled::at(scheduled_spell.data, Instant::now()) { log::trace!("Reschedule: {:?}", rescheduled); state.scheduled.push(rescheduled); - } else if let Some(m) = &self.spell_metrics { - m.observe_finished_spell(); + } else { + state.active.remove(&spell_id); + if let Some(m) = &self.spell_metrics { + m.observe_finished_spell(); + } } } }, @@ -379,6 +396,10 @@ mod tests { .expect("Could not subscribe timer"); } + async fn subscribe_oneshot(api: &SpellEventBusApi, spell_id: SpellId) { + subscribe_timer(api, spell_id, TimerConfig::oneshot(Instant::now())).await; + } + async fn subscribe_periodic_endless( api: &SpellEventBusApi, spell_id: SpellId, @@ -458,12 +479,8 @@ mod tests { let _ = api.start_scheduling().await; let event_stream = UnboundedReceiverStream::new(event_receiver); let spell1_id = "spell1".to_string(); - subscribe_timer( - &api, - spell1_id.clone(), - TimerConfig::oneshot(Instant::now()), - ) - .await; + subscribe_oneshot(&api, spell1_id.clone()).await; + let spell2_id = "spell2".to_string(); subscribe_periodic_endless(&api, spell2_id.clone(), Duration::from_millis(5)).await; @@ -582,4 +599,78 @@ mod tests { }, ); } + + #[tokio::test] + async fn test_double_subscribe_before_run() { + //log_utils::enable_logs(); + let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]); + let bus = bus.start(); + let mut event_stream = UnboundedReceiverStream::new(event_receiver).fuse(); + let spell1_id = "spell1".to_string(); + subscribe_oneshot(&api, spell1_id.clone()).await; + subscribe_oneshot(&api, spell1_id.clone()).await; + let _ = api.start_scheduling().await; + + let mut events = Vec::new(); + // try to receive events twice, we should never receive the second event + for _ in 0..2 { + let timer = tokio::time::sleep(Duration::from_millis(10)); + select! { + event = event_stream.select_next_some() => { + events.push(event); + }, + _ = timer => { break; } + } + } + + try_catch( + || { + assert_eq!( + events.len(), + 1, + "double subscription of the same spell on the same event isn't allowed" + ); + assert_eq!(events[0].spell_id, spell1_id.clone(),); + assert_matches!(events[0].info, TriggerInfo::Timer(_)); + }, + || { + bus.abort(); + }, + ); + } + + #[tokio::test] + async fn test_resubscribing_same_spell() { + let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![]); + let bus = bus.start(); + let _ = api.start_scheduling().await; + let spell1_id = "spell1".to_string(); + subscribe_oneshot(&api, spell1_id.clone()).await; + let event1 = event_receiver.recv().await.unwrap(); + subscribe_oneshot(&api, spell1_id.clone()).await; + let event2 = tokio::time::timeout(Duration::from_millis(10), event_receiver.recv()).await; + + try_catch( + || { + assert_eq!( + event1.spell_id, + spell1_id.clone(), + "first subscription isn't correct" + ); + let event2 = event2.ok().flatten(); + assert!( + event2.is_some(), + "the second subscription of the same spell is ignored" + ); + assert_eq!( + event2.unwrap().spell_id, + spell1_id, + "the second subscription isn't correct" + ); + }, + || { + bus.abort(); + }, + ); + } }