Skip to content

Commit

Permalink
fix(spells): start resubscribed spells scheduling after full node ini…
Browse files Browse the repository at this point in the history
…tialization [fixes NET-459] (#1592)

* start resubscribed spells scheduling after full node initialization

* refactor

* fix tests

* fix clippy
  • Loading branch information
kmd-fl committed Apr 28, 2023
1 parent 2f9ed98 commit 1016f36
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 30 deletions.
33 changes: 16 additions & 17 deletions crates/spell-event-bus/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,31 +104,29 @@ impl From<TriggerInfoAqua> for TriggerInfo {

#[derive(Debug)]
pub(crate) struct Command {
pub(crate) spell_id: SpellId,
pub(crate) action: Action,
pub(crate) reply: oneshot::Sender<()>,
}

#[derive(Debug, Clone)]
pub enum Action {
/// Subscribe a spell to a list of triggers
Subscribe(SpellTriggerConfigs),
Subscribe(SpellId, SpellTriggerConfigs),
/// Remove all subscriptions of a spell
Unsubscribe,
Unsubscribe(SpellId),
/// Actually start the scheduling
Start,
}

#[derive(Error, Debug)]
pub enum EventBusError {
#[error(
"can't send a command `{action:?}` for spell `{spell_id}` to spell-event-bus: {reason}"
)]
#[error("can't send a command `{action:?}` to spell-event-bus: {reason}")]
SendError {
spell_id: SpellId,
action: Action,
reason: Pin<Box<dyn std::error::Error + Send>>,
},
#[error("can't receive a message from the bus on behalf of spell {0}: sending end is probably dropped")]
ReplyError(SpellId),
#[error("can't receive a message from the bus on behalf of a command {0:?}: sending end is probably dropped")]
ReplyError(Action),
}

#[derive(Clone)]
Expand All @@ -143,23 +141,20 @@ impl std::fmt::Debug for SpellEventBusApi {
}

impl SpellEventBusApi {
async fn send(&self, spell_id: SpellId, action: Action) -> Result<(), EventBusError> {
async fn send(&self, action: Action) -> Result<(), EventBusError> {
let (send, recv) = oneshot::channel();
let command = Command {
spell_id: spell_id.clone(),
action: action.clone(),
reply: send,
};
self.send_cmd_channel
.send(command)
.map_err(|e| EventBusError::SendError {
spell_id: spell_id.clone(),
action,
action: action.clone(),
reason: Box::pin(e),
})?;

recv.await
.map_err(|_| EventBusError::ReplyError(spell_id))?;
recv.await.map_err(|_| EventBusError::ReplyError(action))?;
Ok(())
}

Expand All @@ -171,11 +166,15 @@ impl SpellEventBusApi {
spell_id: SpellId,
config: SpellTriggerConfigs,
) -> Result<(), EventBusError> {
self.send(spell_id, Action::Subscribe(config)).await
self.send(Action::Subscribe(spell_id, config)).await
}

/// Unsubscribe a spell from all events.
pub async fn unsubscribe(&self, spell_id: SpellId) -> Result<(), EventBusError> {
self.send(spell_id, Action::Unsubscribe).await
self.send(Action::Unsubscribe(spell_id)).await
}

pub async fn start_scheduling(&self) -> Result<(), EventBusError> {
self.send(Action::Start).await
}
}
34 changes: 23 additions & 11 deletions crates/spell-event-bus/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ impl SubscribersState {
#[derive(Debug, Error)]
enum BusInternalError {
// oneshot::Sender doesn't provide the reasons why it failed to send a message
#[error("failed to send a result of a command execution ({1:?}) for a spell {0}: receiving end probably dropped")]
Reply(SpellId, Action),
#[error(
"failed to send a result of a command execution ({0:?}): receiving end probably dropped"
)]
Reply(Action),
#[error("failed to send notification about a peer event {1:?} to spell {0}: {2}")]
SendEvent(SpellId, TriggerInfo, Pin<Box<dyn std::error::Error>>),
}
Expand Down Expand Up @@ -192,7 +194,7 @@ impl SpellEventBus {

pub fn start(self) -> task::JoinHandle<()> {
task::Builder::new()
.name("Bus")
.name("spell-bus")
.spawn(self.run())
.expect("Could not spawn task")
}
Expand All @@ -208,6 +210,7 @@ impl SpellEventBus {
let mut sources_channel = futures::stream::select_all(sources);

let mut state = SubscribersState::new();
let mut is_started = false;
loop {
let now = Instant::now();

Expand All @@ -234,26 +237,29 @@ impl SpellEventBus {
let result: Result<(), BusInternalError> = try {
select! {
Some(command) = self.recv_cmd_channel.recv() => {
let Command { spell_id, action, reply } = command;
let Command { action, reply } = command;
match &action {
Action::Subscribe(config) => {
Action::Subscribe(spell_id, config) => {
state.subscribe(spell_id.clone(), config).unwrap_or(());
},
Action::Unsubscribe => {
state.unsubscribe(&spell_id);
Action::Unsubscribe(spell_id) => {
state.unsubscribe(spell_id);
},
Action::Start => {
is_started = true;
}
};
reply.send(()).map_err(|_| {
BusInternalError::Reply(spell_id, action)
BusInternalError::Reply(action)
})?;
},
Some(event) = sources_channel.next() => {
Some(event) = sources_channel.next(), if is_started => {
for spell_id in state.subscribers(&event.get_type()) {
let event = TriggerInfo::Peer(event.clone());
Self::trigger_spell(&send_events, spell_id, event)?;
}
},
_ = timer_task => {
_ = timer_task, if is_started => {
// The timer is triggered only if there are some spells to be awaken.
if let Some(scheduled_spell) = state.scheduled.pop() {
log::trace!("Execute: {:?}", scheduled_spell);
Expand All @@ -264,7 +270,7 @@ impl SpellEventBus {
log::trace!("Reschedule: {:?}", rescheduled);
state.scheduled.push(rescheduled);
} else if let Some(m) = &self.spell_metrics {
m.observe_finished_spell();
m.observe_finished_spell();
}
}
},
Expand Down Expand Up @@ -390,6 +396,7 @@ mod tests {
async fn test_subscribe_one() {
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let _ = api.start_scheduling().await;
let event_stream = UnboundedReceiverStream::new(event_receiver);

let spell1_id = "spell1".to_string();
Expand All @@ -414,6 +421,7 @@ mod tests {
async fn test_subscribe_many() {
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let _ = api.start_scheduling().await;
let event_stream = UnboundedReceiverStream::new(event_receiver);

let mut spell_ids = hashmap![
Expand Down Expand Up @@ -447,6 +455,7 @@ mod tests {
async fn test_subscribe_oneshot() {
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![]);
let bus = bus.start();
let _ = api.start_scheduling().await;
let event_stream = UnboundedReceiverStream::new(event_receiver);
let spell1_id = "spell1".to_string();
subscribe_timer(
Expand Down Expand Up @@ -483,6 +492,7 @@ mod tests {
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]);
let mut event_stream = UnboundedReceiverStream::new(event_receiver);
let bus = bus.start();
let _ = api.start_scheduling().await;

let spell1_id = "spell1".to_string();
subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await;
Expand Down Expand Up @@ -512,6 +522,7 @@ mod tests {
let recv = UnboundedReceiverStream::new(recv).boxed();
let (bus, api, mut event_receiver) = SpellEventBus::new(None, vec![recv]);
let bus = bus.start();
let _ = api.start_scheduling().await;

let spell1_id = "spell1".to_string();
subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await;
Expand Down Expand Up @@ -542,6 +553,7 @@ mod tests {
let (bus, api, event_receiver) = SpellEventBus::new(None, vec![recv]);
let event_stream = UnboundedReceiverStream::new(event_receiver);
let bus = bus.start();
let _ = api.start_scheduling().await;

let spell1_id = "spell1".to_string();
subscribe_peer_event(&api, spell1_id.clone(), vec![PeerEventType::Connected]).await;
Expand Down
13 changes: 11 additions & 2 deletions particle-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use prometheus_client::registry::Registry;
use script_storage::{ScriptStorageApi, ScriptStorageBackend, ScriptStorageConfig};
use server_config::{NetworkConfig, ResolvedConfig, ServicesConfig};
use sorcerer::Sorcerer;
use spell_event_bus::api::{PeerEvent, TriggerEvent};
use spell_event_bus::api::{PeerEvent, SpellEventBusApi, TriggerEvent};
use spell_event_bus::bus::SpellEventBus;
use tokio::sync::{mpsc, oneshot};
use tokio::task;
Expand All @@ -75,6 +75,7 @@ pub struct Node<RT: AquaRuntime> {
aquavm_pool: AquamarineBackend<RT, Arc<Builtins<Connectivity>>>,
script_storage: ScriptStorageBackend,
builtins_deployer: BuiltinsDeployer,
spell_event_bus_api: SpellEventBusApi,
spell_event_bus: SpellEventBus,
spell_events_receiver: mpsc::UnboundedReceiver<TriggerEvent>,
sorcerer: Sorcerer,
Expand Down Expand Up @@ -259,7 +260,7 @@ impl<RT: AquaRuntime> Node<RT> {
builtins.modules.clone(),
aquamarine_api.clone(),
config.clone(),
spell_event_bus_api,
spell_event_bus_api.clone(),
key_manager.clone(),
spell_metrics,
);
Expand Down Expand Up @@ -308,6 +309,7 @@ impl<RT: AquaRuntime> Node<RT> {
aquavm_pool,
script_storage_backend,
builtins_deployer,
spell_event_bus_api,
spell_event_bus,
spell_events_receiver,
sorcerer,
Expand Down Expand Up @@ -374,6 +376,7 @@ impl<RT: AquaRuntime> Node<RT> {
aquavm_pool: AquamarineBackend<RT, Arc<Builtins<Connectivity>>>,
script_storage: ScriptStorageBackend,
builtins_deployer: BuiltinsDeployer,
spell_event_bus_api: SpellEventBusApi,
spell_event_bus: SpellEventBus,
spell_events_receiver: mpsc::UnboundedReceiver<TriggerEvent>,
sorcerer: Sorcerer,
Expand All @@ -395,6 +398,7 @@ impl<RT: AquaRuntime> Node<RT> {
aquavm_pool,
script_storage,
builtins_deployer,
spell_event_bus_api,
spell_event_bus,
spell_events_receiver,
sorcerer,
Expand Down Expand Up @@ -489,6 +493,11 @@ impl<RT: AquaRuntime> Node<RT> {
.await
.wrap_err("builtins deploy failed")?;

let result = self.spell_event_bus_api.start_scheduling().await;
if let Err(e) = result {
log::error!("running spell event bus failed: {}", e);
}

Ok(exit_outlet)
}

Expand Down

0 comments on commit 1016f36

Please sign in to comment.