diff --git a/src/executor/thread_executor.rs b/src/executor/thread_executor.rs index 665e10c..634e74e 100644 --- a/src/executor/thread_executor.rs +++ b/src/executor/thread_executor.rs @@ -3,10 +3,11 @@ use std::collections::HashMap; use std::thread; use std::time::Duration; -use crate::actor::{ActorCell, Context, SenderType, Uri}; +use crate::actor::{ActorCell, Context, Letter, SenderType, Uri}; use crate::executor::{ CommandChannel, Executor, ExecutorCommands, ExecutorFactory, ExecutorHandle, }; +use crate::message::system::PoisonPill; use crate::system::RuntimeManagerRef; pub struct ThreadExecutorFactory {} @@ -23,6 +24,24 @@ impl ExecutorFactory for ThreadExecutorFactory { } } +// Macro for quickly constructing a context object within the thread executor. The construction +// of the context almost always looks the same, just some slight differences with the sender. +macro_rules! context { + ($self:tt, $cell:tt, $sender:path) => { + context!($self, $cell, ($sender)) + }; + ($self:tt, $cell:tt, $sender:expr) => { + Context { + address: &$cell.address, + runtime_manager: &$self.runtime_manager, + executor_channel: &$self.command_channel, + parent: &$cell.parent, + children: &mut $cell.children, + sender: &$sender, + } + }; +} + /// thread-local executor responsible for processing messages on actors struct ThreadExecutor { // Name of the executor, which is part of the address to actors and used @@ -63,72 +82,112 @@ impl ThreadExecutor { panic!("Actor name {} already exists", address); } } + + /// Process a system message for a given actor. + /// + /// System messages have special handling logic that that involves the executor, so they + /// can't be passed on to the actor in the traditional fashion. Depending on the message, + /// they still may be forwarded to the actor. + fn process_system_message(letter: Letter, cell: &mut ActorCell, context: Context) { + if let Some(_) = letter.payload.as_any().downcast_ref::() { + trace!( + "received poison pill for {}. Calling shutdown hook", + &cell.address.uri + ); + cell.actor.before_shutdown(context); + // TODO: Signal shutdown to the execute + } else { + // System messages should always be known as only messages defined by the crate + // can be system messages *and* system messages cannot be sent remotely. + panic!("Unknown system message type: {:?}", letter.payload); + } + } + + fn shutdown_actor(&mut self, uri: &Uri) { + if let Some(mut cell) = self.actor_cells.remove(&uri) { + trace!("calling before_shutdown for actor {}", &uri); + // TODO: Send the remaining messages in the mailbox to the dead letter queue + // TODO: Figure out how to redirect all Sender handles to the dead letter queue + cell.actor + .before_shutdown(context!(self, cell, SenderType::System)); + } + } + + fn assign_actor(&mut self, mut cell: ActorCell) { + debug!("received actor assignment for {}", &cell.address.uri); + self.assert_unique_address(&cell.address.uri); + trace!("calling before_start for actor {}", &cell.address.uri); + cell.actor + .before_start(context!(self, cell, SenderType::System)); + self.actor_cells.insert(cell.address.uri.clone(), cell); + } } + impl Executor for ThreadExecutor { fn run(mut self) { const SLEEP_DURATION_MS: u64 = 1; loop { + // Handle executor commands before handling any actor messages. Generally it is expected + // to have very few of these per loop. if !self.command_channel.recv_is_empty() { match self.command_channel.recv().unwrap() { - ExecutorCommands::AssignActor(mut cell) => { - debug!("received actor assignment for {}", &cell.address.uri); - self.assert_unique_address(&cell.address.uri); - trace!("calling before_start for actor {}", &cell.address.uri); - cell.actor.before_start(Context { - address: &cell.address, - runtime_manager: &self.runtime_manager, - executor_channel: &self.command_channel, - parent: &cell.parent, - children: &mut cell.children, - sender: &SenderType::System, - }); - self.actor_cells.insert(cell.address.uri.clone(), cell); + ExecutorCommands::AssignActor(cell) => { + self.assign_actor(cell); } ExecutorCommands::ShutdownActor(address) => { - trace!("calling before_shutdown for actor {}", &address.uri); - // TODO: Send the remaining messages in the mailbox to the dead letter queue - // TODO: Figure out how to redirect all Sender handles to the dead letter queue - if let Some(mut cell) = self.actor_cells.remove(&address.uri) { - cell.actor.before_shutdown(Context { - address: &cell.address, - runtime_manager: &self.runtime_manager, - executor_channel: &self.command_channel, - parent: &cell.parent, - children: &mut cell.children, - sender: &SenderType::System, - }); - } + self.shutdown_actor(&address.uri); } ExecutorCommands::Shutdown => { info!("received shutdown command"); + // Break to exit the main 'loop' in the run function break; } } } + let mut messages_processed = 0; // Iterate over the actor-cells and check if there are any non-empty mailboxes. - // If one is found, process a message from it. - for (_, cell) in self.actor_cells.iter_mut() { + // If one is found, process a single message from it. This maintains fairness + // amongst message processing, but may result in large amounts of waste if there + // are a high number of mostly idle actors. + let cells_iter = self.actor_cells.iter_mut(); + for (_, cell) in cells_iter { if !cell.mailbox.is_empty() { let result = cell.mailbox.try_recv(); if let Ok(letter) = result { - trace!("[{}] processing message: {:?}", &cell.address, &letter); - cell.actor.receive( - Context { - address: &cell.address, - runtime_manager: &self.runtime_manager, - executor_channel: &self.command_channel, - parent: &cell.parent, - children: &mut cell.children, - sender: &letter.sender, - }, - letter.payload, - ); + messages_processed += 1; + // If the letter is a system message, perform any necessary pre-processing + // of the message before (potentially) passing it on to the actor. + if letter + .payload + .is_system_message(&crate::message::private::Local::Value) + { + trace!( + "[{}] processing system message: {:?}", + &cell.address, + &letter + ); + // self.process_system_message(letter, cell); + // TODO: Mark the cell as unable to receive/process new messages (???) + // TODO: Send shutdown signals to all children + // TODO: Wait for exit signals from children + } + // For all non-system messages, pass the message directly to the actor + else { + trace!("[{}] processing message: {:?}", &cell.address, &letter); + cell.actor + .receive(context!(self, cell, letter.sender), letter.payload); + } } } } - trace!("nothing to do, sleeping..."); - thread::sleep(Duration::from_millis(SLEEP_DURATION_MS)); + + // Inject a small sleep in the thread executor if a loop resulted in no work being + // performed. Otherwise the executor will spin at 100% CPU usage. + if (messages_processed == 0) { + trace!("nothing to do, sleeping..."); + thread::sleep(Duration::from_millis(SLEEP_DURATION_MS)); + } } self.runtime_manager.notify_shutdown(self.name); diff --git a/src/message/mod.rs b/src/message/mod.rs index be6f280..1cd9549 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -22,8 +22,11 @@ pub trait Message: prost::Message { prost::Message::encoded_len(self) } + /// Returns true if this message is a system message. This method takes a ref + /// to a private Local enum, which makes this callable _only_ from within the + /// busan crate and _not_ implementable outside of it. #[doc(hidden)] - fn is_system_message(&self) -> bool { + fn is_system_message(&self, _local: &private::Local) -> bool { false } } @@ -49,7 +52,9 @@ impl ToMessage for M { */ pub(crate) mod private { #[doc(hidden)] - pub enum Local {} + pub enum Local { + Value, + } #[doc(hidden)] pub trait IsLocal {} impl IsLocal for Local {} diff --git a/src/message/system.rs b/src/message/system.rs index d5824a8..f6fed1d 100644 --- a/src/message/system.rs +++ b/src/message/system.rs @@ -3,7 +3,8 @@ use prost::DecodeError; use std::any::Any; -#[derive(prost::Message)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, prost::Message)] pub struct PoisonPill {} impl super::Message for PoisonPill { @@ -19,7 +20,7 @@ impl super::Message for PoisonPill { prost::Message::merge(self, buf) } - fn is_system_message(&self) -> bool { + fn is_system_message(&self, _local: &super::private::Local) -> bool { true } }