diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index 016845fd7..08b9293ce 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -43,7 +43,6 @@ enum ProcessSender { } /// persist kernel's process_map state for next bootup -/// and (TODO) wait for filesystem to respond in the affirmative async fn persist_state( our_name: &str, send_to_loop: &t::MessageSender, @@ -76,7 +75,7 @@ async fn persist_state( Ok(()) } -/// handle messages sent directly to kernel. source is always our own node. +/// handle commands inside messages sent directly to kernel. source is always our own node. async fn handle_kernel_request( our_name: String, keypair: Arc, @@ -528,7 +527,7 @@ async fn handle_kernel_request( } } -// double check immediate run +/// spawn a process loop and insert the process in the relevant kernel state maps async fn start_process( our_name: String, keypair: Arc, @@ -1097,23 +1096,15 @@ pub async fn kernel( match process_map.get(&on) { None => vec![], Some(p) => { - caps.iter().filter_map(|cap| { + caps.into_iter().filter_map(|cap| { // if issuer is message source, then sign the cap if cap.issuer.process == on { - Some(( - cap.clone(), - keypair - .sign(&rmp_serde::to_vec(&cap).unwrap()) - .as_ref() - .to_vec() - )) + let sig = keypair.sign(&rmp_serde::to_vec(&cap).unwrap()); + Some((cap, sig.as_ref().to_vec())) // otherwise, only attach previously saved caps // NOTE we don't need to verify the sigs! } else { - match p.capabilities.get(cap) { - None => None, - Some(sig) => Some((cap.clone(), sig.clone())) - } + p.capabilities.get(&cap).map(|sig| (cap, sig.clone())) } }).collect() }, diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 02d355947..0b9dbe43f 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -1,8 +1,10 @@ use crate::kernel::{ProcessMessageReceiver, ProcessMessageSender}; use crate::KERNEL_PROCESS_ID; use anyhow::Result; -//pub use kinode::process::standard as wit; -//pub use kinode::process::standard::Host as StandardHost; +use lib::types::core as t; +pub use lib::wit; +pub use lib::wit::Host as StandardHost; +pub use lib::Process; use ring::signature::{self, KeyPair}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -11,28 +13,42 @@ use wasmtime::component::*; use wasmtime::{Engine, Store}; use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, Table, WasiCtx, WasiCtxBuilder, WasiView}; -use lib::types::core as t; -pub use lib::wit; -pub use lib::wit::Host as StandardHost; -pub use lib::Process; +const STACK_TRACE_SIZE: usize = 5000; -// bindgen!({ -// path: "wit", -// world: "process", -// async: true, -// }); +pub struct ProcessContext { + // store predecessor in order to set prompting message when popped + pub prompting_message: Option, + // can be empty if a request doesn't set context, but still needs to inherit + pub context: Option, +} pub struct ProcessState { + /// our node's networking keypair pub keypair: Arc, + /// information about ourself pub metadata: t::ProcessMetadata, + /// pipe from which we get messages from the main event loop pub recv_in_process: ProcessMessageReceiver, + /// pipe to send messages to ourself (received in `recv_in_process`) pub self_sender: ProcessMessageSender, + /// pipe for sending messages to the main event loop pub send_to_loop: t::MessageSender, + /// pipe for sending [`t::Printout`]s to the terminal pub send_to_terminal: t::PrintSender, + /// store the current incoming message that we've gotten from receive(), if it + /// is a request. if it is a response, the context map will be used to set this + /// as the message it was when the outgoing request for that response was made. + /// however, the blob stored here will **always** be the blob of the last message + /// received from the event loop. + /// the prompting_message won't have a blob, rather it is stored in last_blob. pub prompting_message: Option, pub last_blob: Option, - pub contexts: HashMap)>, + /// store the contexts and timeout task of all outstanding requests + pub contexts: HashMap)>, + /// store the messages that we've gotten from event loop but haven't processed yet + /// TODO make this an ordered map for O(1) retrieval by ID pub message_queue: VecDeque>, + /// pipe for getting info about capabilities pub caps_oracle: t::CapMessageSender, } @@ -57,8 +73,6 @@ impl WasiView for ProcessWasi { } } -const STACK_TRACE_SIZE: usize = 5000; - pub async fn send_and_await_response( process: &mut ProcessWasi, source: Option, @@ -91,41 +105,78 @@ pub async fn send_and_await_response( } impl ProcessState { - /// Ingest latest message directed to this process, and mark it as the prompting message. + /// Ingest latest message directed to this process, and save it as the current message. /// If there is no message in the queue, wait async until one is received. - /// The message will only be saved as the prompting-message if it's a Request. pub async fn get_next_message_for_process( &mut self, ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { let res = match self.message_queue.pop_front() { Some(message_from_queue) => message_from_queue, - None => self.recv_in_process.recv().await.unwrap(), + None => self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"), }; self.kernel_message_to_process_receive(res) } + /// instead of ingesting latest, wait for a specific ID and queue all others + async fn get_specific_message_for_process( + &mut self, + awaited_message_id: u64, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + // first, check if the awaited message is already in the queue and handle if so + for (i, message) in self.message_queue.iter().enumerate() { + match message { + Ok(ref km) if km.id == awaited_message_id => { + let km = self.message_queue.remove(i).unwrap(); + return self.kernel_message_to_process_receive(km); + } + _ => continue, + } + } + // next, wait for the awaited message to arrive + loop { + let res = self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"); + let id = match &res { + Ok(km) => km.id, + Err(e) => e.id, + }; + if id == awaited_message_id { + return self.kernel_message_to_process_receive(res); + } else { + self.message_queue.push_back(res); + } + } + } + /// takes Request generated by a process and sends it to the main event loop. /// will only fail if process does not have capability to send to target. /// if the request has a timeout (expects response), start a task to track /// that timeout and return timeout error if it expires. pub async fn send_request( &mut self, - fake_source: Option, // only used when kernel steps in to get/set state + // only used when kernel steps in to get/set state + fake_source: Option, target: wit::Address, request: wit::Request, new_context: Option, blob: Option, ) -> Result { - let source = match &fake_source { - Some(_) => fake_source.unwrap(), - None => self.metadata.our.clone(), - }; - // if request chooses to inherit context, match id to prompting_message + let source = fake_source.unwrap_or(self.metadata.our.clone()); + let mut request = t::de_wit_request(request); + + // if request chooses to inherit, it means to take the ID and lazy_load_blob, + // if any, from the last message it ingested + + // if request chooses to inherit, match id to precedessor // otherwise, id is generated randomly - let request_id: u64 = if request.inherit - && request.expects_response.is_none() - && self.prompting_message.is_some() - { + let request_id: u64 = if request.inherit && self.prompting_message.is_some() { self.prompting_message.as_ref().unwrap().id } else { loop { @@ -136,6 +187,8 @@ impl ProcessState { } }; + // if a blob is provided, it will be used; otherwise, if inherit is true, + // and a predecessor exists, its blob will be used; otherwise, no blob will be used. let blob = match blob { Some(p) => Some(t::LazyLoadBlob { mime: p.mime, @@ -147,70 +200,46 @@ impl ProcessState { }, }; - let mut inner_request = t::de_wit_request(request.clone()); - - inner_request.capabilities = { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.caps_oracle - .send(t::CapMessage::FilterCaps { - on: self.metadata.our.process.clone(), - caps: request - .capabilities - .iter() - .map(|cap| t::de_wit_capability(cap.clone()).0) - .collect(), - responder: tx, - }) - .await?; - rx.await? - }; - - // rsvp is set if there was a Request expecting Response - // followed by inheriting Request(s) not expecting Response; - // this is done such that the ultimate request handler knows that, - // in fact, a Response *is* expected. - // could also be None if entire chain of Requests are - // not expecting Response - let kernel_message = t::KernelMessage { - id: request_id, - source: source.clone(), - target: t::Address::de_wit(target.clone()), - rsvp: match ( - request.inherit, - request.expects_response, - &self.prompting_message, - ) { - // this request expects response, so receives any response - // make sure to use the real source, not a fake injected-by-kernel source - (_, Some(_), _) => Some(self.metadata.our.clone()), - // this request inherits, so response will be routed to prompting message - (true, None, Some(ref prompt)) => prompt.rsvp.clone(), - // this request doesn't inherit, and doesn't itself want a response - (false, None, _) => None, - // no rsvp because neither prompting message nor this request wants a response - (_, None, None) => None, - }, - message: t::Message::Request(inner_request), - lazy_load_blob: blob.clone(), - }; + if !request.capabilities.is_empty() { + request.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: request + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } - // modify the process' context map as needed. - // if there is a prompting message, we need to store the ultimate - // even if there is no new context string. - // TODO optimize this significantly + // if the request expects a response, modify the process' context map as needed + // and set a timer. + // TODO optimize this SIGNIFICANTLY: stop spawning tasks + // and use a global clock + garbage collect step to check for timeouts if let Some(timeout_secs) = request.expects_response { + let this_request = request.clone(); + let this_blob = blob.clone(); let self_sender = self.self_sender.clone(); + let original_target = t::Address::de_wit(target.clone()); let timeout_handle = tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; let _ = self_sender .send(Err(t::WrappedSendError { id: request_id, - source: t::Address::de_wit(target.clone()), // TODO check this + source: original_target.clone(), error: t::SendError { kind: t::SendErrorKind::Timeout, - target: t::Address::de_wit(target), - message: t::Message::Request(t::de_wit_request(request.clone())), - lazy_load_blob: blob, + target: original_target, + message: t::Message::Request(this_request), + lazy_load_blob: this_blob, }, })) .await; @@ -218,12 +247,8 @@ impl ProcessState { self.contexts.insert( request_id, ( - t::ProcessContext { - prompting_message: if self.prompting_message.is_some() { - self.prompting_message.clone() - } else { - None - }, + ProcessContext { + prompting_message: self.prompting_message.clone(), context: new_context, }, timeout_handle, @@ -231,6 +256,34 @@ impl ProcessState { ); } + // rsvp is set based on this priority: + // 1. whether this request expects a response -- if so, rsvp = our address, always + // 2. whether this request inherits -- if so, rsvp = prompting message's rsvp + // 3. if neither, rsvp = None + let kernel_message = t::KernelMessage { + id: request_id, + source, + target: t::Address::de_wit(target), + rsvp: match ( + request.expects_response, + request.inherit, + &self.prompting_message, + ) { + (Some(_), _, _) => { + // this request expects response, so receives any response + // make sure to use the real source, not a fake injected-by-kernel source + Some(self.metadata.our.clone()) + } + (None, true, Some(ref prompt)) => { + // this request inherits, so response will be routed to prompting message + prompt.rsvp.clone() + } + _ => None, + }, + message: t::Message::Request(request), + lazy_load_blob: blob, + }; + self.send_to_loop .send(kernel_message) .await @@ -245,43 +298,53 @@ impl ProcessState { response: wit::Response, blob: Option, ) { - let (id, target) = match self.make_response_id_target().await { - Some(r) => r, - None => { - let _ = self - .send_to_terminal - .send(t::Printout { - verbosity: 2, - content: format!("kernel: dropping Response {:?}", response), - }) - .await; - return; - } + let mut response = t::de_wit_response(response); + + // the process requires a prompting_message in order to issue a response + let Some(ref prompting_message) = self.prompting_message else { + print( + &self.send_to_terminal, + 0, + format!("kernel: need non-None prompting_message to handle Response {response:?}"), + ) + .await; + return; }; + // given the current process state, produce the id and target that + // a response it emits should have. + let (id, target) = ( + prompting_message.id, + match &prompting_message.rsvp { + None => prompting_message.source.clone(), + Some(rsvp) => rsvp.clone(), + }, + ); + let blob = match response.inherit { true => self.last_blob.clone(), false => t::de_wit_blob(blob), }; - let mut inner_response = t::de_wit_response(response.clone()); - - inner_response.capabilities = { - let (tx, rx) = tokio::sync::oneshot::channel(); - let _ = self - .caps_oracle - .send(t::CapMessage::FilterCaps { - on: self.metadata.our.process.clone(), - caps: response - .capabilities - .iter() - .map(|cap| t::de_wit_capability(cap.clone()).0) - .collect(), - responder: tx, - }) - .await; - rx.await.expect("fatal: process couldn't get caps") - }; + if !response.capabilities.is_empty() { + response.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: response + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } self.send_to_loop .send(t::KernelMessage { @@ -290,7 +353,7 @@ impl ProcessState { target, rsvp: None, message: t::Message::Response(( - inner_response, + response, // the context will be set by the process receiving this Response. None, )), @@ -300,63 +363,32 @@ impl ProcessState { .expect("fatal: kernel couldn't send response"); } - /// instead of ingesting latest, wait for a specific ID and queue all others - async fn get_specific_message_for_process( - &mut self, - awaited_message_id: u64, - ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - // first, check if the awaited message is already in the queue and handle if so - for (i, message) in self.message_queue.iter().enumerate() { - match message { - Ok(ref km) if km.id == awaited_message_id => { - let km = self.message_queue.remove(i).unwrap(); - return self.kernel_message_to_process_receive(km.clone()); - } - _ => continue, - } - } - // next, wait for the awaited message to arrive - loop { - let res = self.recv_in_process.recv().await.unwrap(); - match res { - Ok(ref km) if km.id == awaited_message_id => { - return self.kernel_message_to_process_receive(Ok(km.clone())) - } - Ok(km) => self.message_queue.push_back(Ok(km)), - Err(e) if e.id == awaited_message_id => { - return self.kernel_message_to_process_receive(Err(e)) - } - Err(e) => self.message_queue.push_back(Err(e)), - } - } - } - - /// convert a message from the main event loop into a result for the process to receive - /// if the message is a response or error, get context if we have one + /// Convert a message from the main event loop into a result for the process to receive. + /// If the message is a response or error, get context if we have one. fn kernel_message_to_process_receive( &mut self, - res: Result, + incoming: Result, ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - let (context, km) = match res { - Ok(km) => match &km.message { + let (mut km, context) = match incoming { + Ok(mut km) => match km.message { t::Message::Request(_) => { - self.last_blob = km.lazy_load_blob.clone(); + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; self.prompting_message = Some(km.clone()); - (None, km) + (km, None) } t::Message::Response(_) => { if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) { timeout_handle.abort(); - self.last_blob = km.lazy_load_blob.clone(); - self.prompting_message = match context.prompting_message { - None => Some(km.clone()), - Some(prompting_message) => Some(prompting_message), - }; - (context.context, km) + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = context.prompting_message; + (km, context.context) } else { - self.last_blob = km.lazy_load_blob.clone(); + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; self.prompting_message = Some(km.clone()); - (None, km) + (km, None) } } }, @@ -375,74 +407,54 @@ impl ProcessState { self.keypair.as_ref().public_key(), ); + // prune any invalid capabilities before handing to process + // where invalid = supposedly issued by us, but not signed properly by us + match &mut km.message { + t::Message::Request(request) => { + request.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + t::Message::Response((response, _)) => { + response.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + }; + Ok(( km.source.en_wit(), match km.message { - t::Message::Request(mut request) => { - // prune any invalid caps before sending - request.capabilities = request - .capabilities - .iter() - .filter_map(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => Some((cap.clone(), sig.clone())), - Err(_) => None, - } - } else { - return Some((cap.clone(), sig.clone())); - } - }) - .collect::)>>(); - wit::Message::Request(t::en_wit_request(request)) - } + t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)), // NOTE: we throw away whatever context came from the sender, that's not ours - t::Message::Response((mut response, _context)) => { - // prune any invalid caps before sending - response.capabilities = response - .capabilities - .iter() - .filter_map(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => Some((cap.clone(), sig.clone())), - Err(_) => None, - } - } else { - return Some((cap.clone(), sig.clone())); - } - }) - .collect::)>>(); + t::Message::Response((response, _sent_context)) => { wit::Message::Response((t::en_wit_response(response), context)) } }, )) } - - /// Given the current process state, return the id and target that - /// a response it emits should have. This takes into - /// account the `rsvp` of the prompting message, if any. - async fn make_response_id_target(&self) -> Option<(u64, t::Address)> { - let Some(ref prompting_message) = self.prompting_message else { - println!("need non-None prompting_message to handle Response"); - return None; - }; - Some(( - prompting_message.id, - match &prompting_message.rsvp { - None => prompting_message.source.clone(), - Some(address) => address.clone(), - }, - )) - } } /// create a specific process, and generate a task that will run it. @@ -740,3 +752,10 @@ pub async fn make_process_loop( } Ok(()) } + +async fn print(sender: &t::PrintSender, verbosity: u8, content: String) { + let _ = sender + .send(t::Printout { verbosity, content }) + .await + .expect("fatal: kernel terminal print pipe died!"); +} diff --git a/kinode/src/kernel/standard_host.rs b/kinode/src/kernel/standard_host.rs index 3e9c30f41..52ae80896 100644 --- a/kinode/src/kernel/standard_host.rs +++ b/kinode/src/kernel/standard_host.rs @@ -1,6 +1,4 @@ use crate::kernel::process; -//use crate::kernel::process::kinode::process::standard as wit; -//use crate::kernel::process::StandardHost; use crate::KERNEL_PROCESS_ID; use crate::VFS_PROCESS_ID; use anyhow::Result; @@ -27,6 +25,7 @@ impl StandardHost for process::ProcessWasi { // // system utils: // + async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> { self.process .send_to_terminal @@ -376,6 +375,7 @@ impl StandardHost for process::ProcessWasi { print_debug(&self.process, "spawned a new process").await; Ok(Ok(new_process_id.en_wit().to_owned())) } + // // capabilities management // diff --git a/lib/src/core.rs b/lib/src/core.rs index d9885be79..398f6ab8d 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1030,14 +1030,6 @@ impl std::fmt::Display for PersistedProcess { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ProcessContext { - // store ultimate in order to set prompting message if needed - pub prompting_message: Option, - // can be empty if a request doesn't set context, but still needs to inherit - pub context: Option, -} - pub type PackageVersion = (u32, u32, u32); /// the type that gets deserialized from `metadata.json` in a package