Skip to content

Commit

Permalink
[actor shutdown] cell state bitmask fix (#76)
Browse files Browse the repository at this point in the history
### Summary

When an actor shuts down, the state bitmask in the `ActorCell` is
flipped from the `Context` object when a shutdown is requested.
Previously this was only set within the executor.

### Motivation

Part of #29 

There is an asynchronous delay between an actor signaling it is shutting
down to when the executor sets the state. This is because the request
travels through the runtime manager and then back to the executor via
async channel communications.

What is described above introduces a window where an actor has signaled
shutdown and continues to receive messages. By setting the shutdown flag
directly within the actor (as well as within the executor), messages
processing by the actor will stop immediately.

### Test Plan

Shutdown testing will be covered in the broader actor shutdown effort.
  • Loading branch information
JohnMurray committed Jul 22, 2023
1 parent 7372322 commit b979f87
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"roadmap"
],
"userWords": [
"bitmask",
"busan",
"cpus",
"geoip", // "geo" "IP"
Expand Down
37 changes: 25 additions & 12 deletions src/actor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ pub trait ActorInit {
Self: Sized + Actor;
}

/// Simple type-alias for a bitmask representing the state of an [`ActorCell`].
pub type CellState = u8;

pub mod cell_state {
use super::CellState;

const SHUTDOWN: CellState = 0b0000_0001;

/// Check if the cell is in a shutdown state
pub fn is_shutdown(state: CellState) -> bool {
state & SHUTDOWN == SHUTDOWN
}

/// Set the shell into a shutdown state. __Note__ that this is a one-way action. A cell
/// in a shutdown state cannot be removed from the shutdown state.
pub fn set_shutdown(state: &mut CellState) {
*state |= SHUTDOWN;
}
}

/// [`ActorCell`] is the wrapper to the user-defined actor, wrapping the mailbox parent references,
/// and other actor-related information that is useful internally. This is primarily an internal
/// interface, but is exposed for user-provided executors or extensions.
Expand All @@ -74,8 +94,7 @@ pub struct ActorCell {
pub(crate) address: ActorAddress,
pub(crate) children: Vec<ActorAddress>,
pub(crate) parent: Option<ActorAddress>,

state_flags: u8,
pub(crate) state: CellState,
}

impl ActorCell {
Expand All @@ -91,17 +110,9 @@ impl ActorCell {
address,
children: Vec::new(),
parent,
state_flags: 0,
state: 0,
}
}

pub(crate) fn set_shutdown(&mut self) {
self.state_flags |= 0b0000_0001;
}

pub(crate) fn is_shutdown(&self) -> bool {
self.state_flags & 0b0000_0001 != 0
}
}

/// Debug macro for serializing and deserializing a message. The goal is to reduce
Expand Down Expand Up @@ -130,6 +141,7 @@ pub struct Context<'a> {
pub(crate) parent: &'a Option<ActorAddress>,
pub(crate) children: &'a mut Vec<ActorAddress>,
pub(crate) sender: &'a SenderType,
pub(crate) cell_state: &'a mut CellState,
}

impl Context<'_> {
Expand Down Expand Up @@ -224,7 +236,8 @@ impl Context<'_> {
}

/// Perform immediate shutdown for the current actor.
pub fn shutdown(&self) {
pub fn shutdown(&mut self) {
cell_state::set_shutdown(self.cell_state);
self.executor_command_channel
.send(ExecutorCommands::ShutdownActor(self.address.clone()))
.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions src/executor/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashMap;
use std::thread;
use std::time::Duration;

use crate::actor::{ActorCell, Context, SenderType, Uri};
use crate::actor::{cell_state, ActorCell, Context, SenderType, Uri};
use crate::executor::{
CommandChannel, Executor, ExecutorCommands, ExecutorFactory, ExecutorHandle,
};
Expand Down Expand Up @@ -79,6 +79,7 @@ macro_rules! context {
parent: &$cell.parent,
children: &mut $cell.children,
sender: &$sender,
cell_state: &mut $cell.state,
}
};
}
Expand All @@ -100,7 +101,7 @@ impl Executor for ThreadExecutor {
}
ExecutorCommands::ShutdownActor(address) => {
let cell = self.actor_cells.get_mut(&address.uri).unwrap();
cell.set_shutdown();
cell_state::set_shutdown(&mut cell.state);
cell.actor
.before_stop(context!(self, cell, SenderType::System));

Expand All @@ -126,7 +127,7 @@ impl Executor for ThreadExecutor {
// If one is found, process a message from it.
let mut messages_processed = 0;
for (_, cell) in self.actor_cells.iter_mut() {
if !cell.is_shutdown() {
if !cell_state::is_shutdown(cell.state) {
// TODO: Forward messages to dead-letter-queue
continue;
}
Expand Down

0 comments on commit b979f87

Please sign in to comment.