Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions kinode/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ async fn handle_kernel_request(
.expect("event loop: fatal: sender died");
None
}
//
// send 'run' message to a process that's already been initialized
//
t::KernelCommand::RunProcess(process_id) => {
let response =
if let Some(ProcessSender::Userspace(process_sender)) = senders.get(&process_id) {
Expand Down Expand Up @@ -355,10 +357,14 @@ async fn handle_kernel_request(
.await;
None
}
//
// brutal and savage killing: aborting the task.
// do not do this to a process if you don't want to risk
// dropped messages / un-replied-to-requests
// if you want to immediately restart a process or otherwise
// skip the capabilities-cleanup RevokeAll, pass "no-revoke" in the metadata
//
t::KernelCommand::KillProcess(process_id) => {
// brutal and savage killing: aborting the task.
// do not do this to a process if you don't want to risk
// dropped messages / un-replied-to-requests / revoked caps
senders.remove(&process_id);
let process_handle = match process_handles.remove(&process_id) {
Some(ph) => ph,
Expand All @@ -371,13 +377,15 @@ async fn handle_kernel_request(
};
process_handle.abort();
process_map.remove(&process_id);
caps_oracle
.send(t::CapMessage::RevokeAll {
on: process_id.clone(),
responder: None,
})
.await
.expect("event loop: fatal: sender died");
if request.metadata != Some("no-revoke".to_string()) {
caps_oracle
.send(t::CapMessage::RevokeAll {
on: process_id.clone(),
responder: None,
})
.await
.expect("event loop: fatal: sender died");
}
if request.expects_response.is_none() {
t::Printout::new(2, format!("kernel: killing process {process_id}"))
.send(send_to_terminal)
Expand Down Expand Up @@ -919,7 +927,7 @@ pub async fn kernel(
// capabilities oracle: handles all requests to add, drop, and check capabilities
Some(cap_message) = caps_oracle_receiver.recv() => {
if print_full_event_loop {
t::Printout::new(3, format!("{cap_message:?}")).send(&send_to_terminal).await;
t::Printout::new(3, format!("{cap_message}")).send(&send_to_terminal).await;
}
match cap_message {
t::CapMessage::Add { on, caps, responder } => {
Expand Down
40 changes: 19 additions & 21 deletions kinode/src/kernel/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,23 +334,6 @@ pub async fn make_process_loop(
// the process has completed, time to perform cleanup
//

// get caps before killing
let (tx, rx) = tokio::sync::oneshot::channel();
caps_oracle
.send(t::CapMessage::GetAll {
on: metadata.our.process.clone(),
responder: tx,
})
.await?;
let initial_capabilities = rx
.await?
.iter()
.map(|c| t::Capability {
issuer: c.0.issuer.clone(),
params: c.0.params.clone(),
})
.collect();

t::Printout::new(
1,
format!(
Expand Down Expand Up @@ -385,7 +368,23 @@ pub async fn make_process_loop(
}
// if restart, tell ourselves to init the app again, with same capabilities
t::OnExit::Restart => {
// kill
// get caps before killing
let (tx, rx) = tokio::sync::oneshot::channel();
caps_oracle
.send(t::CapMessage::GetAll {
on: metadata.our.process.clone(),
responder: tx,
})
.await?;
let initial_capabilities = rx
.await?
.iter()
.map(|c| t::Capability {
issuer: c.0.issuer.clone(),
params: c.0.params.clone(),
})
.collect();
// kill, **without** revoking capabilities from others!
t::KernelMessage::builder()
.id(rand::random())
.source((&our.node, KERNEL_PROCESS_ID.clone()))
Expand All @@ -397,14 +396,14 @@ pub async fn make_process_loop(
metadata.our.process.clone(),
))
.unwrap(),
metadata: None,
metadata: Some("no-revoke".to_string()),
capabilities: vec![],
}))
.build()
.unwrap()
.send(&send_to_loop)
.await;
// then re-initialize
// then re-initialize with same capabilities
t::KernelMessage::builder()
.id(rand::random())
.source((&our.node, KERNEL_PROCESS_ID.clone()))
Expand Down Expand Up @@ -453,7 +452,6 @@ pub async fn make_process_loop(
.await;
}
// if requests, fire them
// even in death, a process can only message processes it has capabilities for
t::OnExit::Requests(requests) => {
for (address, mut request, blob) in requests {
request.expects_response = None;
Expand Down
36 changes: 36 additions & 0 deletions lib/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,42 @@ pub enum CapMessage {
},
}

impl std::fmt::Display for CapMessage {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
CapMessage::Add { on, caps, .. } => write!(
f,
"caps: add {} on {on}",
caps.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
.join(", ")
),
CapMessage::Drop { on, caps, .. } => write!(
f,
"caps: drop {} on {on}",
caps.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
.join(", ")
),
CapMessage::Has { on, cap, .. } => write!(f, "caps: has {} on {on}", cap),
CapMessage::GetAll { on, .. } => write!(f, "caps: get all on {on}"),
CapMessage::RevokeAll { on, .. } => write!(f, "caps: revoke all on {on}"),
CapMessage::FilterCaps { on, caps, .. } => {
write!(
f,
"caps: filter for {} on {on}",
caps.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
}
}
}

pub type ReverseCapIndex = HashMap<ProcessId, HashMap<ProcessId, Vec<Capability>>>;

pub type ProcessMap = HashMap<ProcessId, PersistedProcess>;
Expand Down