Skip to content

Commit

Permalink
fix(spell-bus): resubscribe spell on double subscription (#1797)
Browse files Browse the repository at this point in the history
* fix double subscription and add trace to spell-bus-api commands
  • Loading branch information
kmd-fl committed Sep 28, 2023
1 parent e8a11fe commit 7ee8efa
Showing 1 changed file with 103 additions and 12 deletions.
115 changes: 103 additions & 12 deletions crates/spell-event-bus/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::StreamExt;
use futures::{future, FutureExt};
use peer_metrics::SpellMetrics;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -97,17 +97,19 @@ impl PartialOrd for Scheduled {
struct SubscribersState {
subscribers: PeerEventSubscribers,
scheduled: BinaryHeap<Scheduled>,
active: HashSet<Arc<SpellId>>,
}

impl SubscribersState {
fn new() -> Self {
Self {
subscribers: PeerEventSubscribers::new(),
scheduled: BinaryHeap::new(),
active: HashSet::new(),
}
}

fn subscribe(&mut self, spell_id: SpellId, config: &SpellTriggerConfigs) -> Option<()> {
fn subscribe(&mut self, spell_id: SpellId, config: &SpellTriggerConfigs) {
let spell_id = Arc::new(spell_id);
for config in &config.triggers {
match config {
Expand All @@ -126,11 +128,12 @@ impl SubscribersState {
}
}
}
Some(())
self.active.insert(spell_id);
}

/// Returns true if spell_id was removed from subscribers
fn unsubscribe(&mut self, spell_id: &SpellId) {
self.active.remove(spell_id);
self.scheduled
.retain(|scheduled| *scheduled.data.id != *spell_id);
self.subscribers.remove(spell_id);
Expand Down Expand Up @@ -240,12 +243,22 @@ impl SpellEventBus {
let Command { action, reply } = command;
match &action {
Action::Subscribe(spell_id, config) => {
state.subscribe(spell_id.clone(), config).unwrap_or(());
log::trace!("Subscribe {spell_id} to {:?}", config);
if state.active.contains(spell_id) {
log::warn!(
"spell {spell_id} is already running; re-subscribe to the new configuration"
);
state.unsubscribe(spell_id);
}

state.subscribe(spell_id.clone(), config);
},
Action::Unsubscribe(spell_id) => {
log::trace!("Unsubscribe {spell_id}");
state.unsubscribe(spell_id);
},
Action::Start => {
log::trace!("Start the bus");
is_started = true;
}
};
Expand All @@ -264,13 +277,17 @@ impl SpellEventBus {
if let Some(scheduled_spell) = state.scheduled.pop() {
log::trace!("Execute: {:?}", scheduled_spell);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
let spell_id = scheduled_spell.data.id.clone();
Self::trigger_spell(&send_events, &scheduled_spell.data.id, TriggerInfo::Timer(TimerEvent{ timestamp }))?;
// Do not reschedule the spell otherwise.
if let Some(rescheduled) = Scheduled::at(scheduled_spell.data, Instant::now()) {
log::trace!("Reschedule: {:?}", rescheduled);
state.scheduled.push(rescheduled);
} else if let Some(m) = &self.spell_metrics {
m.observe_finished_spell();
} else {
state.active.remove(&spell_id);
if let Some(m) = &self.spell_metrics {
m.observe_finished_spell();
}
}
}
},
Expand Down Expand Up @@ -379,6 +396,10 @@ mod tests {
.expect("Could not subscribe timer");
}

async fn subscribe_oneshot(api: &SpellEventBusApi, spell_id: SpellId) {
subscribe_timer(api, spell_id, TimerConfig::oneshot(Instant::now())).await;
}

async fn subscribe_periodic_endless(
api: &SpellEventBusApi,
spell_id: SpellId,
Expand Down Expand Up @@ -458,12 +479,8 @@ mod tests {
let _ = api.start_scheduling().await;
let event_stream = UnboundedReceiverStream::new(event_receiver);
let spell1_id = "spell1".to_string();
subscribe_timer(
&api,
spell1_id.clone(),
TimerConfig::oneshot(Instant::now()),
)
.await;
subscribe_oneshot(&api, spell1_id.clone()).await;

let spell2_id = "spell2".to_string();
subscribe_periodic_endless(&api, spell2_id.clone(), Duration::from_millis(5)).await;

Expand Down Expand Up @@ -582,4 +599,78 @@ mod tests {
},
);
}

#[tokio::test]
async fn test_double_subscribe_before_run() {
//log_utils::enable_logs();
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let mut event_stream = UnboundedReceiverStream::new(event_receiver).fuse();
let spell1_id = "spell1".to_string();
subscribe_oneshot(&api, spell1_id.clone()).await;
subscribe_oneshot(&api, spell1_id.clone()).await;
let _ = api.start_scheduling().await;

let mut events = Vec::new();
// try to receive events twice, we should never receive the second event
for _ in 0..2 {
let timer = tokio::time::sleep(Duration::from_millis(10));
select! {
event = event_stream.select_next_some() => {
events.push(event);
},
_ = timer => { break; }
}
}

try_catch(
|| {
assert_eq!(
events.len(),
1,
"double subscription of the same spell on the same event isn't allowed"
);
assert_eq!(events[0].spell_id, spell1_id.clone(),);
assert_matches!(events[0].info, TriggerInfo::Timer(_));
},
|| {
bus.abort();
},
);
}

#[tokio::test]
async fn test_resubscribing_same_spell() {
let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let _ = api.start_scheduling().await;
let spell1_id = "spell1".to_string();
subscribe_oneshot(&api, spell1_id.clone()).await;
let event1 = event_receiver.recv().await.unwrap();
subscribe_oneshot(&api, spell1_id.clone()).await;
let event2 = tokio::time::timeout(Duration::from_millis(10), event_receiver.recv()).await;

try_catch(
|| {
assert_eq!(
event1.spell_id,
spell1_id.clone(),
"first subscription isn't correct"
);
let event2 = event2.ok().flatten();
assert!(
event2.is_some(),
"the second subscription of the same spell is ignored"
);
assert_eq!(
event2.unwrap().spell_id,
spell1_id,
"the second subscription isn't correct"
);
},
|| {
bus.abort();
},
);
}
}

0 comments on commit 7ee8efa

Please sign in to comment.