Skip to content

Commit

Permalink
[actor shutdown] Context method + no-deliver state (#71)
Browse files Browse the repository at this point in the history
### Summary

Adds an initial `shutdown` message to the actor context. This allows an
actor to immediately shutdown (only callable from within the actor) and
sets some state on the actor so that no new messages are received.

### Motivation

Part of #29.

This is only a small subset of actor shutdown logic, see #29 for full
details.

### Test Plan

Still TBD. I'll see how this evolves as shutdown evolves over the course
of implementing #29.
  • Loading branch information
JohnMurray committed Jul 2, 2023
1 parent 9687172 commit ffe59f3
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 1 deletion.
21 changes: 21 additions & 0 deletions src/actor/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::actor::{ActorAddress, Letter, SenderType};
use crate::executor::ExecutorCommands;
use crate::message::{Message, ToMessage};
use crate::system::RuntimeManagerRef;
use crate::util::CommandChannel;
use crossbeam_channel::Receiver;
use log::{trace, warn};

Expand Down Expand Up @@ -54,6 +56,8 @@ pub struct ActorCell {
pub(crate) address: ActorAddress,
pub(crate) children: Vec<ActorAddress>,
pub(crate) parent: Option<ActorAddress>,

state_flags: u8,
}

impl ActorCell {
Expand All @@ -69,8 +73,17 @@ impl ActorCell {
address,
children: Vec::new(),
parent,
state_flags: 0,
}
}

pub(crate) fn set_shutdown(&mut self) {
self.state_flags |= 0b0000_0001;
}

pub(crate) fn is_shutdown(&self) -> bool {
self.state_flags & 0b0000_0001 != 0
}
}

/// Debug macro for serializing and deserializing a message. The goal is to reduce
Expand All @@ -95,6 +108,7 @@ macro_rules! debug_serialize_msg {
pub struct Context<'a> {
pub(crate) address: &'a ActorAddress,
pub(crate) runtime_manager: &'a RuntimeManagerRef,
pub(crate) executor_command_channel: &'a CommandChannel<ExecutorCommands>,
pub(crate) parent: &'a Option<ActorAddress>,
pub(crate) children: &'a mut Vec<ActorAddress>,
pub(crate) sender: &'a SenderType,
Expand Down Expand Up @@ -190,4 +204,11 @@ impl Context<'_> {
pub fn parent(&self) -> Option<&ActorAddress> {
self.parent.as_ref()
}

/// Perform immediate shutdown for the current actor.
pub fn shutdown(&self) {
self.executor_command_channel
.send(ExecutorCommands::ShutdownActor(self.address.clone()))
.unwrap();
}
}
3 changes: 2 additions & 1 deletion src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

pub(crate) mod thread_executor;

use crate::actor::ActorCell;
use crate::actor::{ActorAddress, ActorCell};
use crate::config::ExecutorType;
use crate::system::RuntimeManagerRef;
use crate::util::CommandChannel;

pub enum ExecutorCommands {
AssignActor(ActorCell),
ShutdownActor(ActorAddress),
Shutdown,
}

Expand Down
11 changes: 11 additions & 0 deletions src/executor/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,18 @@ impl Executor for ThreadExecutor {
cell.actor.before_start(Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
executor_command_channel: &self.command_channel,
parent: &cell.parent,
children: &mut cell.children,
sender: &SenderType::System,
});
self.actor_cells.insert(cell.address.uri.clone(), cell);
}
ExecutorCommands::ShutdownActor(address) => {
let cell = self.actor_cells.get_mut(&address.uri).unwrap();
cell.set_shutdown()
// TODO: Call on_shutdown hook for actor
}
ExecutorCommands::Shutdown => {
info!("received shutdown command");
break;
Expand All @@ -93,6 +99,10 @@ impl Executor for ThreadExecutor {
// Iterate over the actor-cells and check if there are any non-empty mailboxes.
// If one is found, process a message from it.
for (_, cell) in self.actor_cells.iter_mut() {
if !cell.is_shutdown() {
// TODO: Forward messages to dead-letter-queue
continue;
}
if !cell.mailbox.is_empty() {
let result = cell.mailbox.try_recv();
if let Ok(letter) = result {
Expand All @@ -101,6 +111,7 @@ impl Executor for ThreadExecutor {
Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
executor_command_channel: &self.command_channel,
parent: &cell.parent,
children: &mut cell.children,
sender: &letter.sender,
Expand Down

0 comments on commit ffe59f3

Please sign in to comment.