Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnMurray committed Jun 26, 2023
1 parent 12457be commit d6140cd
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 47 deletions.
145 changes: 102 additions & 43 deletions src/executor/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::collections::HashMap;
use std::thread;
use std::time::Duration;

use crate::actor::{ActorCell, Context, SenderType, Uri};
use crate::actor::{ActorCell, Context, Letter, SenderType, Uri};
use crate::executor::{
CommandChannel, Executor, ExecutorCommands, ExecutorFactory, ExecutorHandle,
};
use crate::message::system::PoisonPill;
use crate::system::RuntimeManagerRef;

pub struct ThreadExecutorFactory {}
Expand All @@ -23,6 +24,24 @@ impl ExecutorFactory for ThreadExecutorFactory {
}
}

// Macro for quickly constructing a context object within the thread executor. The construction
// of the context almost always looks the same, just some slight differences with the sender.
macro_rules! context {
($self:tt, $cell:tt, $sender:path) => {
context!($self, $cell, ($sender))
};
($self:tt, $cell:tt, $sender:expr) => {
Context {
address: &$cell.address,
runtime_manager: &$self.runtime_manager,
executor_channel: &$self.command_channel,
parent: &$cell.parent,
children: &mut $cell.children,
sender: &$sender,
}
};
}

/// thread-local executor responsible for processing messages on actors
struct ThreadExecutor {
// Name of the executor, which is part of the address to actors and used
Expand Down Expand Up @@ -63,72 +82,112 @@ impl ThreadExecutor {
panic!("Actor name {} already exists", address);
}
}

/// Process a system message for a given actor.
///
/// System messages have special handling logic that that involves the executor, so they
/// can't be passed on to the actor in the traditional fashion. Depending on the message,
/// they still may be forwarded to the actor.
fn process_system_message(letter: Letter, cell: &mut ActorCell, context: Context) {

Check warning on line 91 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Check

associated function `process_system_message` is never used

Check warning on line 91 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Lints

associated function `process_system_message` is never used

Check warning on line 91 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Test

associated function `process_system_message` is never used
if let Some(_) = letter.payload.as_any().downcast_ref::<PoisonPill>() {

Check warning on line 92 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Lints

redundant pattern matching, consider using `is_some()`
trace!(
"received poison pill for {}. Calling shutdown hook",
&cell.address.uri
);
cell.actor.before_shutdown(context);
// TODO: Signal shutdown to the execute
} else {
// System messages should always be known as only messages defined by the crate
// can be system messages *and* system messages cannot be sent remotely.
panic!("Unknown system message type: {:?}", letter.payload);
}
}

fn shutdown_actor(&mut self, uri: &Uri) {
if let Some(mut cell) = self.actor_cells.remove(&uri) {

Check warning on line 107 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Lints

this expression creates a reference which is immediately dereferenced by the compiler
trace!("calling before_shutdown for actor {}", &uri);
// TODO: Send the remaining messages in the mailbox to the dead letter queue
// TODO: Figure out how to redirect all Sender<Letter> handles to the dead letter queue
cell.actor
.before_shutdown(context!(self, cell, SenderType::System));
}
}

fn assign_actor(&mut self, mut cell: ActorCell) {
debug!("received actor assignment for {}", &cell.address.uri);
self.assert_unique_address(&cell.address.uri);
trace!("calling before_start for actor {}", &cell.address.uri);
cell.actor
.before_start(context!(self, cell, SenderType::System));
self.actor_cells.insert(cell.address.uri.clone(), cell);
}
}

impl Executor for ThreadExecutor {
fn run(mut self) {
const SLEEP_DURATION_MS: u64 = 1;

loop {
// Handle executor commands before handling any actor messages. Generally it is expected
// to have very few of these per loop.
if !self.command_channel.recv_is_empty() {
match self.command_channel.recv().unwrap() {
ExecutorCommands::AssignActor(mut cell) => {
debug!("received actor assignment for {}", &cell.address.uri);
self.assert_unique_address(&cell.address.uri);
trace!("calling before_start for actor {}", &cell.address.uri);
cell.actor.before_start(Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
executor_channel: &self.command_channel,
parent: &cell.parent,
children: &mut cell.children,
sender: &SenderType::System,
});
self.actor_cells.insert(cell.address.uri.clone(), cell);
ExecutorCommands::AssignActor(cell) => {
self.assign_actor(cell);
}
ExecutorCommands::ShutdownActor(address) => {
trace!("calling before_shutdown for actor {}", &address.uri);
// TODO: Send the remaining messages in the mailbox to the dead letter queue
// TODO: Figure out how to redirect all Sender<Letter> handles to the dead letter queue
if let Some(mut cell) = self.actor_cells.remove(&address.uri) {
cell.actor.before_shutdown(Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
executor_channel: &self.command_channel,
parent: &cell.parent,
children: &mut cell.children,
sender: &SenderType::System,
});
}
self.shutdown_actor(&address.uri);
}
ExecutorCommands::Shutdown => {
info!("received shutdown command");
// Break to exit the main 'loop' in the run function
break;
}
}
}
let mut messages_processed = 0;
// Iterate over the actor-cells and check if there are any non-empty mailboxes.
// If one is found, process a message from it.
for (_, cell) in self.actor_cells.iter_mut() {
// If one is found, process a single message from it. This maintains fairness
// amongst message processing, but may result in large amounts of waste if there
// are a high number of mostly idle actors.
let cells_iter = self.actor_cells.iter_mut();
for (_, cell) in cells_iter {
if !cell.mailbox.is_empty() {
let result = cell.mailbox.try_recv();
if let Ok(letter) = result {
trace!("[{}] processing message: {:?}", &cell.address, &letter);
cell.actor.receive(
Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
executor_channel: &self.command_channel,
parent: &cell.parent,
children: &mut cell.children,
sender: &letter.sender,
},
letter.payload,
);
messages_processed += 1;
// If the letter is a system message, perform any necessary pre-processing
// of the message before (potentially) passing it on to the actor.
if letter
.payload
.is_system_message(&crate::message::private::Local::Value)
{
trace!(
"[{}] processing system message: {:?}",
&cell.address,
&letter
);
// self.process_system_message(letter, cell);
// TODO: Mark the cell as unable to receive/process new messages (???)
// TODO: Send shutdown signals to all children
// TODO: Wait for exit signals from children
}
// For all non-system messages, pass the message directly to the actor
else {
trace!("[{}] processing message: {:?}", &cell.address, &letter);
cell.actor
.receive(context!(self, cell, letter.sender), letter.payload);
}
}
}
}
trace!("nothing to do, sleeping...");
thread::sleep(Duration::from_millis(SLEEP_DURATION_MS));

// Inject a small sleep in the thread executor if a loop resulted in no work being
// performed. Otherwise the executor will spin at 100% CPU usage.
if (messages_processed == 0) {

Check warning on line 187 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Check

unnecessary parentheses around `if` condition

Check warning on line 187 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Lints

unnecessary parentheses around `if` condition

Check warning on line 187 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Test

unnecessary parentheses around `if` condition

Check warning on line 187 in src/executor/thread_executor.rs

View workflow job for this annotation

GitHub Actions / Test

unnecessary parentheses around `if` condition
trace!("nothing to do, sleeping...");
thread::sleep(Duration::from_millis(SLEEP_DURATION_MS));
}
}

self.runtime_manager.notify_shutdown(self.name);
Expand Down
9 changes: 7 additions & 2 deletions src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ pub trait Message: prost::Message {
prost::Message::encoded_len(self)
}

/// Returns true if this message is a system message. This method takes a ref
/// to a private Local enum, which makes this callable _only_ from within the
/// busan crate and _not_ implementable outside of it.
#[doc(hidden)]
fn is_system_message(&self) -> bool {
fn is_system_message(&self, _local: &private::Local) -> bool {
false
}
}
Expand All @@ -49,7 +52,9 @@ impl<M: Message> ToMessage<M> for M {
*/
pub(crate) mod private {
#[doc(hidden)]
pub enum Local {}
pub enum Local {
Value,
}
#[doc(hidden)]
pub trait IsLocal {}
impl IsLocal for Local {}
Expand Down
5 changes: 3 additions & 2 deletions src/message/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use prost::DecodeError;
use std::any::Any;

#[derive(prost::Message)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, prost::Message)]
pub struct PoisonPill {}

impl super::Message for PoisonPill {
Expand All @@ -19,7 +20,7 @@ impl super::Message for PoisonPill {
prost::Message::merge(self, buf)
}

fn is_system_message(&self) -> bool {
fn is_system_message(&self, _local: &super::private::Local) -> bool {
true
}
}

0 comments on commit d6140cd

Please sign in to comment.