Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[actor shutdown] shutdown children #75

Merged
merged 1 commit into from
Jul 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/executor/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ impl Executor for ThreadExecutor {
cell.set_shutdown();
cell.actor
.before_stop(context!(self, cell, SenderType::System));

// Inform the runtime manager of the shutdown and also forward shutdown
// signals to the children.
self.runtime_manager.shutdown_actor(&address, false);
for child in &cell.children {
self.runtime_manager.shutdown_actor(child, true);
}

// TODO: Notify the parent and all of the watchers
// thought: maybe this should be done by the runtime manager when the
// shutdown signal is sent. (add an additional flag of "notify_parent"
// to the message we send in `runtime_manager.shutdown_actor`).
}
ExecutorCommands::Shutdown => {
info!("received shutdown command");
Expand Down
55 changes: 50 additions & 5 deletions src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
struct RuntimeManager {
/// Map of executor names to their command-channel (for sending commands)
executor_command_channels: HashMap<String, CommandChannel<ExecutorCommands>>,
actor_registry: HashMap<Uri, actor::Mailbox>,
actor_registry: HashMap<Uri, ActorRegistryEntry>,

manager_command_channel: CommandChannel<ManagerCommands>,

Expand Down Expand Up @@ -200,21 +200,40 @@
address.set_mailbox(sender.clone());
let cell = ActorCell::new(actor, receiver, address, parent);

self.actor_registry.insert(address_uri, sender);
self.actor_registry.insert(
address_uri,
ActorRegistryEntry {
mailbox: sender,
executor: executor_name.clone(),
},
);

self.executor_command_channels
.get(&executor_name)
.unwrap()
.send(ExecutorCommands::AssignActor(cell))
.unwrap();
}
Ok(ManagerCommands::ShutdownActor {
address,
forward_to_executor,
}) => {
let lookup = self.actor_registry.remove(&address.uri);
if forward_to_executor && lookup.is_some() {
self.executor_command_channels
.get(&lookup.unwrap().executor)

Check warning on line 224 in src/system.rs

View workflow job for this annotation

GitHub Actions / Lints

called `unwrap` on `lookup` after checking its variant with `is_some`
.unwrap()
.send(ExecutorCommands::ShutdownActor(address))
.unwrap();
}
}
Ok(ManagerCommands::ResolveAddress {
address_uri,
return_channel,
}) => {
let mailbox_lookup = self.actor_registry.get(&address_uri);
let result = match mailbox_lookup {
Some(mailbox) => return_channel.try_send(Some(mailbox.clone())),
let lookup = self.actor_registry.get(&address_uri);
let result = match lookup {
Some(registry) => return_channel.try_send(Some(registry.mailbox.clone())),
None => return_channel.try_send(None),
};
if result.is_err() {
Expand Down Expand Up @@ -290,6 +309,15 @@
.unwrap();
}

pub(crate) fn shutdown_actor(&self, address: &ActorAddress, forward_to_executor: bool) {
self.manager_command_channel
.send(ManagerCommands::ShutdownActor {
address: address.clone(),
forward_to_executor,
})
.unwrap();
}

/// Resolve an address to mailbox by looking up the actor in the global registry. Note that this
/// will block until the management thread has performed the lookup.
pub(crate) fn resolve_address(&self, address: &ActorAddress) -> Option<actor::Mailbox> {
Expand Down Expand Up @@ -326,10 +354,27 @@
parent: Option<ActorAddress>,
},

/// A request to shutdown an actor. This will update the runtime manager's actor registry and,
/// if requested, forward the shutdown request to the executor. This can be handy when you want
/// to shutdown an actor (e.g. a child of an actor that has initiated shutdown), but do not know
/// where the actor is currently running.
ShutdownActor {
address: ActorAddress,
forward_to_executor: bool,
},

/// A request to resolve an actor address to a mailbox. This is given a direct return
/// channel so the sender can block on the result of the lookup if desired.
ResolveAddress {
address_uri: Uri,
return_channel: Sender<Option<actor::Mailbox>>,
},
}

/// Value of actor-registry in the runtime manager. See [`RuntimeManager`] for details.
struct ActorRegistryEntry {
mailbox: actor::Mailbox,
/// The name of the executor the actor is running on (may be used to lookup the executor's
/// command channel).
executor: String,
}
Loading