Skip to content

Commit

Permalink
shutdown + no-deliver
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnMurray committed Jul 2, 2023
1 parent 9687172 commit 414ee25
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 1 deletion.
21 changes: 21 additions & 0 deletions src/actor/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::actor::{ActorAddress, Letter, SenderType};
use crate::executor::ExecutorCommands;
use crate::message::{Message, ToMessage};
use crate::system::RuntimeManagerRef;
use crate::util::CommandChannel;
use crossbeam_channel::Receiver;
use log::{trace, warn};

Expand Down Expand Up @@ -54,6 +56,8 @@ pub struct ActorCell {
pub(crate) address: ActorAddress,
pub(crate) children: Vec<ActorAddress>,
pub(crate) parent: Option<ActorAddress>,

state_flags: u8,
}

impl ActorCell {
Expand All @@ -69,8 +73,17 @@ impl ActorCell {
address,
children: Vec::new(),
parent,
state_flags: 0,
}
}

pub(crate) fn set_shutdown(&mut self) {
self.state_flags |= 0b0000_0001;
}

pub(crate) fn is_shutdown(&self) -> bool {
self.state_flags & 0b0000_0001 != 0
}
}

/// Debug macro for serializing and deserializing a message. The goal is to reduce
Expand All @@ -95,6 +108,7 @@ macro_rules! debug_serialize_msg {
pub struct Context<'a> {
pub(crate) address: &'a ActorAddress,
pub(crate) runtime_manager: &'a RuntimeManagerRef,
pub(crate) executor_command_channel: &'a CommandChannel<ExecutorCommands>,
pub(crate) parent: &'a Option<ActorAddress>,
pub(crate) children: &'a mut Vec<ActorAddress>,
pub(crate) sender: &'a SenderType,
Expand Down Expand Up @@ -190,4 +204,11 @@ impl Context<'_> {
pub fn parent(&self) -> Option<&ActorAddress> {
self.parent.as_ref()
}

/// Perform immediate shutdown for the current actor.
pub fn shutdown(&self) {
self.executor_command_channel
.send(ExecutorCommands::ShutdownActor(self.address.clone()))
.unwrap();
}
}
3 changes: 2 additions & 1 deletion src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

pub(crate) mod thread_executor;

use crate::actor::ActorCell;
use crate::actor::{ActorAddress, ActorCell};
use crate::config::ExecutorType;
use crate::system::RuntimeManagerRef;
use crate::util::CommandChannel;

pub enum ExecutorCommands {
AssignActor(ActorCell),
ShutdownActor(ActorAddress),
Shutdown,
}

Expand Down
11 changes: 11 additions & 0 deletions src/executor/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,18 @@ impl Executor for ThreadExecutor {
cell.actor.before_start(Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
executor_command_channel: &self.command_channel,
parent: &cell.parent,
children: &mut cell.children,
sender: &SenderType::System,
});
self.actor_cells.insert(cell.address.uri.clone(), cell);
}
ExecutorCommands::ShutdownActor(address) => {
let cell = self.actor_cells.get_mut(&address.uri).unwrap();
cell.set_shutdown()
// TODO: Call on_shutdown hook for actor
}
ExecutorCommands::Shutdown => {
info!("received shutdown command");
break;
Expand All @@ -93,6 +99,10 @@ impl Executor for ThreadExecutor {
// Iterate over the actor-cells and check if there are any non-empty mailboxes.
// If one is found, process a message from it.
for (_, cell) in self.actor_cells.iter_mut() {
if !cell.is_shutdown() {
// TODO: Forward messages to dead-letter-queue
continue;
}
if !cell.mailbox.is_empty() {
let result = cell.mailbox.try_recv();
if let Ok(letter) = result {
Expand All @@ -101,6 +111,7 @@ impl Executor for ThreadExecutor {
Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
executor_command_channel: &self.command_channel,
parent: &cell.parent,
children: &mut cell.children,
sender: &letter.sender,
Expand Down

0 comments on commit 414ee25

Please sign in to comment.