Skip to content

Commit

Permalink
[actor shutdown] shutdown children (#75)
Browse files Browse the repository at this point in the history
### Summary

Propegates shutdown signals to children via the runtime manager.
Currently there is no coordination between timing/ordering of parents
and children except for the flow of initial messages. That is to say
that the actual moment in which actors are removed from the runtime
queue and the `before`/`after_stop` methods are finished may race. I
think this is a fine trade-off for now. Coordination is a lot trickier,
and possibly not necessary.

### Motivation

Part of #29.

Actors are organized in heirarchical trees. In an actor system, all
actors should be part of the same tree and thus if an actor shuts down
and has children, the shutdown signals have to propegate down the tree.

### Test Plan

TBD. Will address testing as the final step in impelemnting actor
shutdown.
  • Loading branch information
JohnMurray committed Jul 2, 2023
1 parent ad5756d commit 7372322
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
12 changes: 12 additions & 0 deletions src/executor/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ impl Executor for ThreadExecutor {
cell.set_shutdown();
cell.actor
.before_stop(context!(self, cell, SenderType::System));

// Inform the runtime manager of the shutdown and also forward shutdown
// signals to the children.
self.runtime_manager.shutdown_actor(&address, false);
for child in &cell.children {
self.runtime_manager.shutdown_actor(child, true);
}

// TODO: Notify the parent and all of the watchers
// thought: maybe this should be done by the runtime manager when the
// shutdown signal is sent. (add an additional flag of "notify_parent"
// to the message we send in `runtime_manager.shutdown_actor`).
}
ExecutorCommands::Shutdown => {
info!("received shutdown command");
Expand Down
55 changes: 50 additions & 5 deletions src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ActorSystem {
struct RuntimeManager {
/// Map of executor names to their command-channel (for sending commands)
executor_command_channels: HashMap<String, CommandChannel<ExecutorCommands>>,
actor_registry: HashMap<Uri, actor::Mailbox>,
actor_registry: HashMap<Uri, ActorRegistryEntry>,

manager_command_channel: CommandChannel<ManagerCommands>,

Expand Down Expand Up @@ -200,21 +200,40 @@ impl RuntimeManager {
address.set_mailbox(sender.clone());
let cell = ActorCell::new(actor, receiver, address, parent);

self.actor_registry.insert(address_uri, sender);
self.actor_registry.insert(
address_uri,
ActorRegistryEntry {
mailbox: sender,
executor: executor_name.clone(),
},
);

self.executor_command_channels
.get(&executor_name)
.unwrap()
.send(ExecutorCommands::AssignActor(cell))
.unwrap();
}
Ok(ManagerCommands::ShutdownActor {
address,
forward_to_executor,
}) => {
let lookup = self.actor_registry.remove(&address.uri);
if forward_to_executor && lookup.is_some() {
self.executor_command_channels
.get(&lookup.unwrap().executor)

Check warning on line 224 in src/system.rs

View workflow job for this annotation

GitHub Actions / Lints

called `unwrap` on `lookup` after checking its variant with `is_some`
.unwrap()
.send(ExecutorCommands::ShutdownActor(address))
.unwrap();
}
}
Ok(ManagerCommands::ResolveAddress {
address_uri,
return_channel,
}) => {
let mailbox_lookup = self.actor_registry.get(&address_uri);
let result = match mailbox_lookup {
Some(mailbox) => return_channel.try_send(Some(mailbox.clone())),
let lookup = self.actor_registry.get(&address_uri);
let result = match lookup {
Some(registry) => return_channel.try_send(Some(registry.mailbox.clone())),
None => return_channel.try_send(None),
};
if result.is_err() {
Expand Down Expand Up @@ -290,6 +309,15 @@ impl RuntimeManagerRef {
.unwrap();
}

pub(crate) fn shutdown_actor(&self, address: &ActorAddress, forward_to_executor: bool) {
self.manager_command_channel
.send(ManagerCommands::ShutdownActor {
address: address.clone(),
forward_to_executor,
})
.unwrap();
}

/// Resolve an address to mailbox by looking up the actor in the global registry. Note that this
/// will block until the management thread has performed the lookup.
pub(crate) fn resolve_address(&self, address: &ActorAddress) -> Option<actor::Mailbox> {
Expand Down Expand Up @@ -326,10 +354,27 @@ enum ManagerCommands {
parent: Option<ActorAddress>,
},

/// A request to shutdown an actor. This will update the runtime manager's actor registry and,
/// if requested, forward the shutdown request to the executor. This can be handy when you want
/// to shutdown an actor (e.g. a child of an actor that has initiated shutdown), but do not know
/// where the actor is currently running.
ShutdownActor {
address: ActorAddress,
forward_to_executor: bool,
},

/// A request to resolve an actor address to a mailbox. This is given a direct return
/// channel so the sender can block on the result of the lookup if desired.
ResolveAddress {
address_uri: Uri,
return_channel: Sender<Option<actor::Mailbox>>,
},
}

/// Value of actor-registry in the runtime manager. See [`RuntimeManager`] for details.
struct ActorRegistryEntry {
mailbox: actor::Mailbox,
/// The name of the executor the actor is running on (may be used to lookup the executor's
/// command channel).
executor: String,
}

0 comments on commit 7372322

Please sign in to comment.