From 15c86f55beb1a646a62ba4c041e24fac58605f58 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 14 May 2024 18:00:48 -0600 Subject: [PATCH 1/4] first pass: handle multiple wit versions, normalize all version values to u32 --- .gitignore | 1 + .../packages/app_store/app_store/src/lib.rs | 2 +- kinode/packages/terminal/terminal/src/lib.rs | 5 ++--- kinode/src/kernel/mod.rs | 7 ++----- kinode/src/kernel/process.rs | 2 +- kinode/src/kernel/standard_host.rs | 2 +- kinode/src/state.rs | 6 +++--- lib/build.rs | 19 ++++++++++++++++--- lib/src/core.rs | 9 +++++---- lib/src/lib.rs | 12 +++++++++++- 10 files changed, 43 insertions(+), 22 deletions(-) diff --git a/.gitignore b/.gitignore index 803a7f277..6e2087c50 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ target/ wit/ **/target/ **/wit/ +**/wit-*/ **/*.wasm .vscode .app-signing diff --git a/kinode/packages/app_store/app_store/src/lib.rs b/kinode/packages/app_store/app_store/src/lib.rs index 717c73328..35d8dd4f5 100644 --- a/kinode/packages/app_store/app_store/src/lib.rs +++ b/kinode/packages/app_store/app_store/src/lib.rs @@ -831,7 +831,7 @@ pub fn handle_install( .body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess { id: parsed_new_process_id.clone(), wasm_bytes_handle: wasm_path, - wit_version: None, + wit_version: None, // TODO get from manifest!!! on_exit: entry.on_exit.clone(), initial_capabilities: HashSet::new(), public: entry.public, diff --git a/kinode/packages/terminal/terminal/src/lib.rs b/kinode/packages/terminal/terminal/src/lib.rs index c793f7499..00b939f27 100644 --- a/kinode/packages/terminal/terminal/src/lib.rs +++ b/kinode/packages/terminal/terminal/src/lib.rs @@ -231,7 +231,7 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul .body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess { id: parsed_new_process_id.clone(), wasm_bytes_handle: wasm_path.clone(), - wit_version: None, + wit_version: None, // update this with new versions if desired on_exit: kt::OnExit::None, initial_capabilities: HashSet::new(), public: entry.public, @@ -297,10 +297,9 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul print_to_terminal( 3, &format!( - "{}: Process {{\n wasm_bytes_handle: {},\n wit_version: {},\n on_exit: {:?},\n public: {}\n capabilities: {}\n}}", + "{}: Process {{\n wasm_bytes_handle: {},\n on_exit: {:?},\n public: {}\n capabilities: {}\n}}", parsed_new_process_id.clone(), wasm_path.clone(), - "None", kt::OnExit::None, entry.public, { diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index 2635cda87..6e10fdab3 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -18,7 +18,7 @@ mod standard_host; const PROCESS_CHANNEL_CAPACITY: usize = 100; -const DEFAULT_WIT_VERSION: u32 = 0; +pub const LATEST_WIT_VERSION: u32 = 0; #[derive(Serialize, Deserialize)] struct StartProcessMetadata { @@ -606,10 +606,7 @@ async fn start_process( process: id.clone(), }, wasm_bytes_handle: process_metadata.persisted.wasm_bytes_handle.clone(), - wit_version: process_metadata - .persisted - .wit_version - .unwrap_or(DEFAULT_WIT_VERSION), + wit_version: process_metadata.persisted.wit_version, on_exit: process_metadata.persisted.on_exit.clone(), public: process_metadata.persisted.public, }; diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 463ed8154..df594d05d 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -734,7 +734,7 @@ pub async fn make_process_loop( body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { id: metadata.our.process.clone(), wasm_bytes_handle: metadata.wasm_bytes_handle, - wit_version: Some(metadata.wit_version), + wit_version: metadata.wit_version, on_exit: metadata.on_exit, initial_capabilities, public: metadata.public, diff --git a/kinode/src/kernel/standard_host.rs b/kinode/src/kernel/standard_host.rs index 96b756ddd..d9de47107 100644 --- a/kinode/src/kernel/standard_host.rs +++ b/kinode/src/kernel/standard_host.rs @@ -267,7 +267,7 @@ impl StandardHost for process::ProcessWasi { body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { id: new_process_id.clone(), wasm_bytes_handle: wasm_path, - wit_version: Some(self.process.metadata.wit_version), + wit_version: self.process.metadata.wit_version, on_exit: t::OnExit::de_wit(on_exit), initial_capabilities: request_capabilities .iter() diff --git a/kinode/src/state.rs b/kinode/src/state.rs index 9c4cbf2cc..af13c77b2 100644 --- a/kinode/src/state.rs +++ b/kinode/src/state.rs @@ -358,7 +358,7 @@ async fn bootstrap( .entry(ProcessId::new(Some("kernel"), "distro", "sys")) .or_insert(PersistedProcess { wasm_bytes_handle: "".into(), - wit_version: None, + wit_version: Some(crate::kernel::LATEST_WIT_VERSION), on_exit: OnExit::Restart, capabilities: runtime_caps.clone(), public: false, @@ -368,7 +368,7 @@ async fn bootstrap( .entry(ProcessId::new(Some("net"), "distro", "sys")) .or_insert(PersistedProcess { wasm_bytes_handle: "".into(), - wit_version: None, + wit_version: Some(crate::kernel::LATEST_WIT_VERSION), on_exit: OnExit::Restart, capabilities: runtime_caps.clone(), public: false, @@ -379,7 +379,7 @@ async fn bootstrap( .entry(runtime_module.0) .or_insert(PersistedProcess { wasm_bytes_handle: "".into(), - wit_version: None, + wit_version: Some(crate::kernel::LATEST_WIT_VERSION), on_exit: OnExit::Restart, capabilities: runtime_caps.clone(), public: runtime_module.3, diff --git a/lib/build.rs b/lib/build.rs index c06a8f774..3662a3463 100644 --- a/lib/build.rs +++ b/lib/build.rs @@ -1,6 +1,9 @@ -const KINODE_WIT_URL: &str = +const KINODE_WIT_0_7_0_URL: &str = "https://raw.githubusercontent.com/kinode-dao/kinode-wit/aa2c8b11c9171b949d1991c32f58591c0e881f85/kinode.wit"; +const KINODE_WIT_0_8_0_URL: &str = + "https://raw.githubusercontent.com/kinode-dao/kinode-wit/v0.8/kinode.wit"; + fn main() -> anyhow::Result<()> { if std::env::var("SKIP_BUILD_SCRIPT").is_ok() { println!("Skipping build script"); @@ -9,11 +12,21 @@ fn main() -> anyhow::Result<()> { let pwd = std::env::current_dir()?; - let wit_file = pwd.join("wit").join("kinode.wit"); + let wit_file = pwd.join("wit-v0.7.0").join("kinode.wit"); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + kit::build::download_file(KINODE_WIT_0_7_0_URL, &wit_file) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e))?; + Ok::<(), anyhow::Error>(()) + })?; + + let wit_file = pwd.join("wit-v0.8.0").join("kinode.wit"); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - kit::build::download_file(KINODE_WIT_URL, &wit_file) + kit::build::download_file(KINODE_WIT_0_8_0_URL, &wit_file) .await .map_err(|e| anyhow::anyhow!("{:?}", e))?; Ok(()) diff --git a/lib/src/core.rs b/lib/src/core.rs index dd5e4b5be..524a08ee2 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -838,7 +838,8 @@ pub struct UnencryptedIdentity { pub struct ProcessMetadata { pub our: Address, pub wasm_bytes_handle: String, - pub wit_version: u32, + /// if None, use the oldest version: 0.7.0 + pub wit_version: Option, pub on_exit: OnExit, pub public: bool, } @@ -1019,7 +1020,7 @@ impl std::fmt::Display for PersistedProcess { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "Process {{\n wasm_bytes_handle: {},\n wit_version: {},\n on_exit: {:?},\n public: {}\n capabilities: {}\n}}", + "Process {{\n wasm_bytes_handle: {},\n wit_version: {:?},\n on_exit: {:?},\n public: {}\n capabilities: {}\n}}", { if &self.wasm_bytes_handle == "" { "(none, this is a runtime process)" @@ -1027,7 +1028,7 @@ impl std::fmt::Display for PersistedProcess { &self.wasm_bytes_handle } }, - self.wit_version.unwrap_or_default(), + self.wit_version, self.on_exit, self.public, { @@ -1081,7 +1082,7 @@ pub struct Erc721Properties { pub code_hashes: HashMap, pub license: Option, pub screenshots: Option>, - pub wit_version: Option<(u32, u32, u32)>, + pub wit_version: Option, } /// the type that gets deserialized from each entry in the array in `manifest.json` diff --git a/lib/src/lib.rs b/lib/src/lib.rs index b26622891..61214b232 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -13,7 +13,17 @@ pub use kinode::process; pub use kinode::process::standard as wit; wasmtime::component::bindgen!({ - path: "wit", + path: "wit-v0.7.0", world: "process", async: true, }); + +pub mod v0 { + pub use kinode::process; + pub use kinode::process::standard as wit; + wasmtime::component::bindgen!({ + path: "wit-v0.8.0", + world: "process-v0", + async: true, + }); +} From 44ca456684e5b0aa244a204501cba0429826a4f1 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 14 May 2024 20:58:59 -0600 Subject: [PATCH 2/4] add en_wits and de_wits for v0 --- lib/src/core.rs | 143 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/lib/src/core.rs b/lib/src/core.rs index 524a08ee2..93d1768fe 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -147,6 +147,13 @@ impl ProcessId { publisher_node: self.publisher_node.clone(), } } + pub fn en_wit_v0(&self) -> crate::v0::wit::ProcessId { + crate::v0::wit::ProcessId { + process_name: self.process_name.clone(), + package_name: self.package_name.clone(), + publisher_node: self.publisher_node.clone(), + } + } pub fn de_wit(wit: wit::ProcessId) -> ProcessId { ProcessId { process_name: wit.process_name, @@ -154,6 +161,13 @@ impl ProcessId { publisher_node: wit.publisher_node, } } + pub fn de_wit_v0(wit: crate::v0::wit::ProcessId) -> ProcessId { + ProcessId { + process_name: wit.process_name, + package_name: wit.package_name, + publisher_node: wit.publisher_node, + } + } } impl std::str::FromStr for ProcessId { @@ -276,6 +290,12 @@ impl Address { process: self.process.en_wit(), } } + pub fn en_wit_v0(&self) -> crate::v0::wit::Address { + crate::v0::wit::Address { + node: self.node.clone(), + process: self.process.en_wit_v0(), + } + } pub fn de_wit(wit: wit::Address) -> Address { Address { node: wit.node, @@ -625,6 +645,20 @@ pub fn de_wit_request(wit: wit::Request) -> Request { } } +pub fn de_wit_request_v0(wit: crate::v0::wit::Request) -> Request { + Request { + inherit: wit.inherit, + expects_response: wit.expects_response, + body: wit.body, + metadata: wit.metadata, + capabilities: wit + .capabilities + .iter() + .map(|cap| de_wit_capability_v0(cap.clone())) + .collect(), + } +} + pub fn en_wit_request(request: Request) -> wit::Request { wit::Request { inherit: request.inherit, @@ -639,6 +673,20 @@ pub fn en_wit_request(request: Request) -> wit::Request { } } +pub fn en_wit_request_v0(request: Request) -> crate::v0::wit::Request { + crate::v0::wit::Request { + inherit: request.inherit, + expects_response: request.expects_response, + body: request.body, + metadata: request.metadata, + capabilities: request + .capabilities + .iter() + .map(|cap| en_wit_capability_v0(cap.clone())) + .collect(), + } +} + pub fn de_wit_response(wit: wit::Response) -> Response { Response { inherit: wit.inherit, @@ -652,6 +700,19 @@ pub fn de_wit_response(wit: wit::Response) -> Response { } } +pub fn de_wit_response_v0(wit: crate::v0::wit::Response) -> Response { + Response { + inherit: wit.inherit, + body: wit.body, + metadata: wit.metadata, + capabilities: wit + .capabilities + .iter() + .map(|cap| de_wit_capability_v0(cap.clone())) + .collect(), + } +} + pub fn en_wit_response(response: Response) -> wit::Response { wit::Response { inherit: response.inherit, @@ -665,6 +726,19 @@ pub fn en_wit_response(response: Response) -> wit::Response { } } +pub fn en_wit_response_v0(response: Response) -> crate::v0::wit::Response { + crate::v0::wit::Response { + inherit: response.inherit, + body: response.body, + metadata: response.metadata, + capabilities: response + .capabilities + .iter() + .map(|cap| en_wit_capability_v0(cap.clone())) + .collect(), + } +} + pub fn de_wit_blob(wit: Option) -> Option { match wit { None => None, @@ -675,6 +749,16 @@ pub fn de_wit_blob(wit: Option) -> Option { } } +pub fn de_wit_blob_v0(wit: Option) -> Option { + match wit { + None => None, + Some(wit) => Some(LazyLoadBlob { + mime: wit.mime, + bytes: wit.bytes, + }), + } +} + pub fn en_wit_blob(load: Option) -> Option { match load { None => None, @@ -685,6 +769,16 @@ pub fn en_wit_blob(load: Option) -> Option { } } +pub fn en_wit_blob_v0(load: Option) -> Option { + match load { + None => None, + Some(load) => Some(crate::v0::wit::LazyLoadBlob { + mime: load.mime, + bytes: load.bytes, + }), + } +} + pub fn de_wit_capability(wit: wit::Capability) -> (Capability, Vec) { ( Capability { @@ -702,6 +796,23 @@ pub fn de_wit_capability(wit: wit::Capability) -> (Capability, Vec) { ) } +pub fn de_wit_capability_v0(wit: crate::v0::wit::Capability) -> (Capability, Vec) { + ( + Capability { + issuer: Address { + node: wit.issuer.node, + process: ProcessId { + process_name: wit.issuer.process.process_name, + package_name: wit.issuer.process.package_name, + publisher_node: wit.issuer.process.publisher_node, + }, + }, + params: wit.params, + }, + vec![], + ) +} + pub fn en_wit_capability(cap: (Capability, Vec)) -> wit::Capability { wit::Capability { issuer: cap.0.issuer.en_wit(), @@ -709,6 +820,13 @@ pub fn en_wit_capability(cap: (Capability, Vec)) -> wit::Capability { } } +pub fn en_wit_capability_v0(cap: (Capability, Vec)) -> crate::v0::wit::Capability { + crate::v0::wit::Capability { + issuer: cap.0.issuer.en_wit_v0(), + params: cap.0.params, + } +} + pub fn en_wit_message(message: Message) -> wit::Message { match message { Message::Request(request) => wit::Message::Request(en_wit_request(request)), @@ -718,6 +836,15 @@ pub fn en_wit_message(message: Message) -> wit::Message { } } +pub fn en_wit_message_v0(message: Message) -> crate::v0::wit::Message { + match message { + Message::Request(request) => crate::v0::wit::Message::Request(en_wit_request_v0(request)), + Message::Response((response, context)) => { + crate::v0::wit::Message::Response((en_wit_response_v0(response), context)) + } + } +} + pub fn en_wit_send_error(error: SendError) -> wit::SendError { wit::SendError { kind: en_wit_send_error_kind(error.kind), @@ -726,6 +853,15 @@ pub fn en_wit_send_error(error: SendError) -> wit::SendError { } } +pub fn en_wit_send_error_v0(error: SendError) -> crate::v0::wit::SendError { + crate::v0::wit::SendError { + kind: en_wit_send_error_kind_v0(error.kind), + target: error.target.en_wit_v0(), + message: en_wit_message_v0(error.message), + lazy_load_blob: en_wit_blob_v0(error.lazy_load_blob), + } +} + pub fn en_wit_send_error_kind(kind: SendErrorKind) -> wit::SendErrorKind { match kind { SendErrorKind::Offline => wit::SendErrorKind::Offline, @@ -733,6 +869,13 @@ pub fn en_wit_send_error_kind(kind: SendErrorKind) -> wit::SendErrorKind { } } +pub fn en_wit_send_error_kind_v0(kind: SendErrorKind) -> crate::v0::wit::SendErrorKind { + match kind { + SendErrorKind::Offline => crate::v0::wit::SendErrorKind::Offline, + SendErrorKind::Timeout => crate::v0::wit::SendErrorKind::Timeout, + } +} + // // END SYNC WITH process_lib // From 00d0d6cfe36a667c58f14a34084acae339d04429 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Thu, 16 May 2024 13:49:38 -0600 Subject: [PATCH 3/4] WIP: support standard_host and standard_host_v0 in kernel --- kinode/src/kernel/mod.rs | 4 +- kinode/src/kernel/process.rs | 723 +++++++------------- kinode/src/kernel/standard_host.rs | 420 +++++++++++- kinode/src/kernel/standard_host_v0.rs | 934 ++++++++++++++++++++++++++ lib/src/core.rs | 46 ++ 5 files changed, 1634 insertions(+), 493 deletions(-) create mode 100644 kinode/src/kernel/standard_host_v0.rs diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index 6e10fdab3..ad76c2f16 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -13,8 +13,10 @@ use lib::types::core::{self as t, STATE_PROCESS_ID, VFS_PROCESS_ID}; /// Manipulate a single process. pub mod process; -/// Implement the functions served to processes by `kinode.wit`. +/// Implement the functions served to processes by `wit-v0.7.0/kinode.wit`. mod standard_host; +/// Implement the functions served to processes by `wit-v0.8.0/kinode.wit`. +mod standard_host_v0; const PROCESS_CHANNEL_CAPACITY: usize = 100; diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index df594d05d..21886ef21 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -1,10 +1,9 @@ use crate::KERNEL_PROCESS_ID; use anyhow::Result; use lib::types::core as t; -pub use lib::wit; -pub use lib::wit::Host as StandardHost; +pub use lib::v0::ProcessV0; pub use lib::Process; -use ring::signature::{self, KeyPair}; +use ring::signature; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use tokio::fs; @@ -71,412 +70,179 @@ impl WasiView for ProcessWasi { } } -pub async fn send_and_await_response( - process: &mut ProcessWasi, - source: Option, - target: wit::Address, - request: wit::Request, - blob: Option, -) -> Result> { - if request.expects_response.is_none() { - return Err(anyhow::anyhow!( - "kernel: got invalid send_and_await_response() Request from {:?}: must expect response", - process.process.metadata.our.process - )); +pub struct ProcessWasiV0 { + pub process: ProcessState, + table: Table, + wasi: WasiCtx, +} + +impl WasiView for ProcessWasiV0 { + fn table(&mut self) -> &mut Table { + &mut self.table } - let id = process - .process - .send_request(source, target, request, None, blob) - .await; - match id { - Ok(id) => match process.process.get_specific_message_for_process(id).await { - Ok((address, wit::Message::Response(response))) => { - Ok(Ok((address, wit::Message::Response(response)))) - } - Ok((_address, wit::Message::Request(_))) => Err(anyhow::anyhow!( - "fatal: received Request instead of Response" - )), - Err((net_err, _context)) => Ok(Err(net_err)), - }, - Err(e) => Err(e), + fn ctx(&mut self) -> &mut WasiCtx { + &mut self.wasi } } -impl ProcessState { - /// Ingest latest message directed to this process, and save it as the current message. - /// If there is no message in the queue, wait async until one is received. - pub async fn get_next_message_for_process( - &mut self, - ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - let res = match self.message_queue.pop_front() { - Some(message_from_queue) => message_from_queue, - None => self.ingest_message().await, - }; - self.kernel_message_to_process_receive(res) - } +async fn make_component( + engine: Engine, + wasm_bytes: &[u8], + home_directory_path: String, + process_state: ProcessState, +) -> Result<(Process, Store, MemoryOutputPipe)> { + let component = Component::new(&engine, wasm_bytes.to_vec()) + .expect("make_process_loop: couldn't read file"); - /// instead of ingesting latest, wait for a specific ID and queue all others - async fn get_specific_message_for_process( - &mut self, - awaited_message_id: u64, - ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - // first, check if the awaited message is already in the queue and handle if so - for (i, message) in self.message_queue.iter().enumerate() { - match message { - Ok(ref km) if km.id == awaited_message_id => { - let km = self.message_queue.remove(i).unwrap(); - return self.kernel_message_to_process_receive(km); - } - _ => continue, - } - } - // next, wait for the awaited message to arrive - loop { - let res = self.ingest_message().await; - let id = match &res { - Ok(km) => km.id, - Err(e) => e.id, - }; - if id == awaited_message_id { - return self.kernel_message_to_process_receive(res); - } else { - self.message_queue.push_back(res); - } - } - } + let mut linker = Linker::new(&engine); + Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap(); - /// ingest next valid message from kernel. - /// cancel any timeout task associated with this message. - /// if the message is a response, only enqueue if we have an outstanding request for it. - async fn ingest_message(&mut self) -> Result { - loop { - let message = self - .recv_in_process - .recv() - .await - .expect("fatal: process couldn't receive next message"); - - match &message { - Ok(km) => match &km.message { - t::Message::Response(_) => { - if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) { - timeout_handle.abort(); - return message; - } - } - _ => { - return message; - } - }, - Err(e) => { - if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) { - timeout_handle.abort(); - return message; - } - } - } + let table = Table::new(); + let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE); + + let our_process_id = process_state.metadata.our.process.clone(); + let send_to_terminal = process_state.send_to_terminal.clone(); + + let tmp_path = format!( + "{}/vfs/{}:{}/tmp", + home_directory_path, + our_process_id.package(), + our_process_id.publisher() + ); + + let mut wasi = WasiCtxBuilder::new(); + + // TODO make guarantees about this + if let Ok(Ok(())) = tokio::time::timeout( + std::time::Duration::from_secs(5), + fs::create_dir_all(&tmp_path), + ) + .await + { + if let Ok(wasi_tempdir) = + Dir::open_ambient_dir(tmp_path.clone(), wasi_common::sync::ambient_authority()) + { + wasi.preopened_dir( + wasi_tempdir, + DirPerms::all(), + FilePerms::all(), + tmp_path.clone(), + ) + .env("TEMP_DIR", tmp_path); } } - /// Convert a message from the main event loop into a result for the process to receive. - /// If the message is a response or error, get context if we have one. - fn kernel_message_to_process_receive( - &mut self, - incoming: Result, - ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - let (mut km, context) = match incoming { - Ok(mut km) => match km.message { - t::Message::Request(_) => { - self.last_blob = km.lazy_load_blob; - km.lazy_load_blob = None; - self.prompting_message = Some(km.clone()); - (km, None) - } - t::Message::Response(_) => match self.contexts.remove(&km.id) { - Some((context, _timeout_handle)) => { - self.last_blob = km.lazy_load_blob; - km.lazy_load_blob = None; - self.prompting_message = context.prompting_message; - (km, context.context) - } - None => { - self.last_blob = km.lazy_load_blob; - km.lazy_load_blob = None; - self.prompting_message = Some(km.clone()); - (km, None) - } - }, - }, - Err(e) => match self.contexts.remove(&e.id) { - None => return Err((t::en_wit_send_error(e.error), None)), - Some((context, _timeout_handle)) => { - self.prompting_message = context.prompting_message; - return Err((t::en_wit_send_error(e.error), context.context)); - } - }, - }; + let wasi = wasi.stderr(wasi_stderr.clone()).build(); - let pk = signature::UnparsedPublicKey::new( - &signature::ED25519, - self.keypair.as_ref().public_key(), - ); - - // prune any invalid capabilities before handing to process - // where invalid = supposedly issued by us, but not signed properly by us - match &mut km.message { - t::Message::Request(request) => { - request.capabilities.retain(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => true, - Err(_) => false, - } - } else { - return true; - } - }); - } - t::Message::Response((response, _)) => { - response.capabilities.retain(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => true, - Err(_) => false, - } - } else { - return true; - } - }); - } - }; + wasmtime_wasi::command::add_to_linker(&mut linker).unwrap(); - Ok(( - km.source.en_wit(), - match km.message { - t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)), - // NOTE: we throw away whatever context came from the sender, that's not ours - t::Message::Response((response, _sent_context)) => { - wit::Message::Response((t::en_wit_response(response), context)) - } - }, - )) - } + let mut store = Store::new( + &engine, + ProcessWasi { + process: process_state, + table, + wasi, + }, + ); - /// takes Request generated by a process and sends it to the main event loop. - /// will only fail if process does not have capability to send to target. - /// if the request has a timeout (expects response), start a task to track - /// that timeout and return timeout error if it expires. - pub async fn send_request( - &mut self, - // only used when kernel steps in to get/set state - fake_source: Option, - target: wit::Address, - request: wit::Request, - new_context: Option, - blob: Option, - ) -> Result { - let source = fake_source.unwrap_or(self.metadata.our.clone()); - let mut request = t::de_wit_request(request); - - // if request chooses to inherit, it means to take the ID and lazy_load_blob, - // if any, from the last message it ingested - - // if request chooses to inherit, match id to precedessor - // otherwise, id is generated randomly - let request_id: u64 = if request.inherit && self.prompting_message.is_some() { - self.prompting_message.as_ref().unwrap().id - } else { - loop { - let id = rand::random(); - if !self.contexts.contains_key(&id) { - break id; - } + let (bindings, _bindings) = + match Process::instantiate_async(&mut store, &component, &linker).await { + Ok(b) => b, + Err(e) => { + let _ = send_to_terminal + .send(t::Printout { + verbosity: 0, + content: format!( + "mk: process {:?} failed to instantiate: {:?}", + our_process_id, e, + ), + }) + .await; + return Err(e); } }; - // if a blob is provided, it will be used; otherwise, if inherit is true, - // and a predecessor exists, its blob will be used; otherwise, no blob will be used. - let blob = match blob { - Some(p) => Some(t::LazyLoadBlob { - mime: p.mime, - bytes: p.bytes, - }), - None => match request.inherit { - true => self.last_blob.clone(), - false => None, - }, - }; + Ok((bindings, store, wasi_stderr)) +} - if !request.capabilities.is_empty() { - request.capabilities = { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.caps_oracle - .send(t::CapMessage::FilterCaps { - on: self.metadata.our.process.clone(), - caps: request - .capabilities - .into_iter() - .map(|(cap, _)| cap) - .collect(), - responder: tx, - }) - .await - .expect("fatal: process couldn't access capabilities oracle"); - rx.await - .expect("fatal: process couldn't receive capabilities") - }; - } +async fn make_component_v0( + engine: Engine, + wasm_bytes: &[u8], + home_directory_path: String, + process_state: ProcessState, +) -> Result<(ProcessV0, Store, MemoryOutputPipe)> { + let component = Component::new(&engine, wasm_bytes.to_vec()) + .expect("make_process_loop: couldn't read file"); - // if the request expects a response, modify the process' context map as needed - // and set a timer. - // TODO optimize this SIGNIFICANTLY: stop spawning tasks - // and use a global clock + garbage collect step to check for timeouts - if let Some(timeout_secs) = request.expects_response { - let this_request = request.clone(); - let this_blob = blob.clone(); - let self_sender = self.self_sender.clone(); - let original_target = t::Address::de_wit(target.clone()); - let timeout_handle = tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; - let _ = self_sender - .send(Err(t::WrappedSendError { - id: request_id, - source: original_target.clone(), - error: t::SendError { - kind: t::SendErrorKind::Timeout, - target: original_target, - message: t::Message::Request(this_request), - lazy_load_blob: this_blob, - }, - })) - .await; - }); - self.contexts.insert( - request_id, - ( - ProcessContext { - prompting_message: self.prompting_message.clone(), - context: new_context, - }, - timeout_handle, - ), - ); - } + let mut linker = Linker::new(&engine); + ProcessV0::add_to_linker(&mut linker, |state: &mut ProcessWasiV0| state).unwrap(); - // rsvp is set based on this priority: - // 1. whether this request expects a response -- if so, rsvp = our address, always - // 2. whether this request inherits -- if so, rsvp = prompting message's rsvp - // 3. if neither, rsvp = None - let kernel_message = t::KernelMessage { - id: request_id, - source, - target: t::Address::de_wit(target), - rsvp: match ( - request.expects_response, - request.inherit, - &self.prompting_message, - ) { - (Some(_), _, _) => { - // this request expects response, so receives any response - // make sure to use the real source, not a fake injected-by-kernel source - Some(self.metadata.our.clone()) - } - (None, true, Some(ref prompt)) => { - // this request inherits, so response will be routed to prompting message - prompt.rsvp.clone() - } - _ => None, - }, - message: t::Message::Request(request), - lazy_load_blob: blob, - }; + let table = Table::new(); + let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE); - self.send_to_loop - .send(kernel_message) - .await - .expect("fatal: kernel couldn't send request"); + let our_process_id = process_state.metadata.our.process.clone(); + let send_to_terminal = process_state.send_to_terminal.clone(); - Ok(request_id) - } + let tmp_path = format!( + "{}/vfs/{}:{}/tmp", + home_directory_path, + our_process_id.package(), + our_process_id.publisher() + ); + + let mut wasi = WasiCtxBuilder::new(); - /// takes Response generated by a process and sends it to the main event loop. - pub async fn send_response( - &mut self, - response: wit::Response, - blob: Option, - ) { - let mut response = t::de_wit_response(response); - - // the process requires a prompting_message in order to issue a response - let Some(ref prompting_message) = self.prompting_message else { - print( - &self.send_to_terminal, - 0, - format!("kernel: need non-None prompting_message to handle Response {response:?}"), + // TODO make guarantees about this + if let Ok(Ok(())) = tokio::time::timeout( + std::time::Duration::from_secs(5), + fs::create_dir_all(&tmp_path), + ) + .await + { + if let Ok(wasi_tempdir) = + Dir::open_ambient_dir(tmp_path.clone(), wasi_common::sync::ambient_authority()) + { + wasi.preopened_dir( + wasi_tempdir, + DirPerms::all(), + FilePerms::all(), + tmp_path.clone(), ) - .await; - return; - }; + .env("TEMP_DIR", tmp_path); + } + } - // given the current process state, produce the id and target that - // a response it emits should have. - let (id, target) = ( - prompting_message.id, - match &prompting_message.rsvp { - None => prompting_message.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - ); - - let blob = match response.inherit { - true => self.last_blob.clone(), - false => t::de_wit_blob(blob), - }; + let wasi = wasi.stderr(wasi_stderr.clone()).build(); + + wasmtime_wasi::command::add_to_linker(&mut linker).unwrap(); + + let mut store = Store::new( + &engine, + ProcessWasiV0 { + process: process_state, + table, + wasi, + }, + ); - if !response.capabilities.is_empty() { - response.capabilities = { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.caps_oracle - .send(t::CapMessage::FilterCaps { - on: self.metadata.our.process.clone(), - caps: response - .capabilities - .into_iter() - .map(|(cap, _)| cap) - .collect(), - responder: tx, + let (bindings, _bindings) = + match ProcessV0::instantiate_async(&mut store, &component, &linker).await { + Ok(b) => b, + Err(e) => { + let _ = send_to_terminal + .send(t::Printout { + verbosity: 0, + content: format!( + "mk: process {:?} failed to instantiate: {:?}", + our_process_id, e, + ), }) - .await - .expect("fatal: process couldn't access capabilities oracle"); - rx.await - .expect("fatal: process couldn't receive capabilities") - }; - } + .await; + return Err(e); + } + }; - self.send_to_loop - .send(t::KernelMessage { - id, - source: self.metadata.our.clone(), - target, - rsvp: None, - message: t::Message::Response(( - response, - // the context will be set by the process receiving this Response. - None, - )), - lazy_load_blob: blob, - }) - .await - .expect("fatal: kernel couldn't send response"); - } + Ok((bindings, store, wasi_stderr)) } /// create a specific process, and generate a task that will run it. @@ -526,111 +292,103 @@ pub async fn make_process_loop( send_to_process.send(message).await?; } - let component = - Component::new(&engine, wasm_bytes.clone()).expect("make_process_loop: couldn't read file"); + let process_state = ProcessState { + keypair: keypair.clone(), + metadata: metadata.clone(), + recv_in_process, + self_sender: send_to_process, + send_to_loop: send_to_loop.clone(), + send_to_terminal: send_to_terminal.clone(), + prompting_message: None, + last_blob: None, + contexts: HashMap::new(), + message_queue: VecDeque::new(), + caps_oracle: caps_oracle.clone(), + }; - let mut linker = Linker::new(&engine); - Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap(); + let metadata = match metadata.wit_version { + // assume missing version is oldest wit version + None => { + println!("WIT 0.7.0 OR NONE GIVEN\r"); - let table = Table::new(); - let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE); + let (bindings, mut store, wasi_stderr) = + make_component(engine, &wasm_bytes, home_directory_path, process_state).await?; - let tmp_path = format!( - "{}/vfs/{}:{}/tmp", - home_directory_path, - metadata.our.process.package(), - metadata.our.process.publisher() - ); - - let mut wasi = WasiCtxBuilder::new(); + // the process will run until it returns from init() or crashes + match bindings + .call_init(&mut store, &metadata.our.to_string()) + .await + { + Ok(()) => { + let _ = send_to_terminal + .send(t::Printout { + verbosity: 1, + content: format!( + "process {} returned without error", + metadata.our.process + ), + }) + .await; + } + Err(_) => { + let stderr = wasi_stderr.contents().into(); + let stderr = String::from_utf8(stderr)?; + let _ = send_to_terminal + .send(t::Printout { + verbosity: 0, + content: format!( + "\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}", + metadata.our.process, stderr, + ), + }) + .await; + } + }; - // TODO make guarantees about this - if let Ok(Ok(())) = tokio::time::timeout( - std::time::Duration::from_secs(5), - fs::create_dir_all(&tmp_path), - ) - .await - { - if let Ok(wasi_tempdir) = - Dir::open_ambient_dir(tmp_path.clone(), wasi_common::sync::ambient_authority()) - { - wasi.preopened_dir( - wasi_tempdir, - DirPerms::all(), - FilePerms::all(), - tmp_path.clone(), - ) - .env("TEMP_DIR", tmp_path); + // update metadata to what was mutated by process in store + store.data().process.metadata.to_owned() } - } - - let wasi = wasi.stderr(wasi_stderr.clone()).build(); - - wasmtime_wasi::command::add_to_linker(&mut linker).unwrap(); + // match version numbers + // assume higher uncovered version number is latest version + Some(0) | _ => { + println!("WIT 0.8.0 OR HIGHER\r"); - let mut store = Store::new( - &engine, - ProcessWasi { - process: ProcessState { - keypair: keypair.clone(), - metadata: metadata.clone(), - recv_in_process, - self_sender: send_to_process, - send_to_loop: send_to_loop.clone(), - send_to_terminal: send_to_terminal.clone(), - prompting_message: None, - last_blob: None, - contexts: HashMap::new(), - message_queue: VecDeque::new(), - caps_oracle: caps_oracle.clone(), - }, - table, - wasi, - }, - ); + let (bindings, mut store, wasi_stderr) = + make_component_v0(engine, &wasm_bytes, home_directory_path, process_state).await?; - let (bindings, _bindings) = - match Process::instantiate_async(&mut store, &component, &linker).await { - Ok(b) => b, - Err(e) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "mk: process {:?} failed to instantiate: {:?}", - metadata.our.process, e, - ), - }) - .await; - return Err(e); - } - }; + // the process will run until it returns from init() or crashes + match bindings + .call_init(&mut store, &metadata.our.to_string()) + .await + { + Ok(()) => { + let _ = send_to_terminal + .send(t::Printout { + verbosity: 1, + content: format!( + "process {} returned without error", + metadata.our.process + ), + }) + .await; + } + Err(_) => { + let stderr = wasi_stderr.contents().into(); + let stderr = String::from_utf8(stderr)?; + let _ = send_to_terminal + .send(t::Printout { + verbosity: 0, + content: format!( + "\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}", + metadata.our.process, stderr, + ), + }) + .await; + } + }; - // the process will run until it returns from init() or crashes - match bindings - .call_init(&mut store, &metadata.our.to_string()) - .await - { - Ok(()) => { - let _ = send_to_terminal - .send(t::Printout { - verbosity: 1, - content: format!("process {} returned without error", metadata.our.process), - }) - .await; - } - Err(_) => { - let stderr = wasi_stderr.contents().into(); - let stderr = String::from_utf8(stderr)?; - let _ = send_to_terminal - .send(t::Printout { - verbosity: 0, - content: format!( - "\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}", - metadata.our.process, stderr, - ), - }) - .await; + // update metadata to what was mutated by process in store + store.data().process.metadata.to_owned() } }; @@ -638,9 +396,6 @@ pub async fn make_process_loop( // the process has completed, time to perform cleanup // - // update metadata to what was mutated by process in store - let metadata = store.data().process.metadata.to_owned(); - let our_kernel = t::Address { node: metadata.our.node.clone(), process: KERNEL_PROCESS_ID.clone(), @@ -818,7 +573,7 @@ pub async fn make_process_loop( Ok(()) } -async fn print(sender: &t::PrintSender, verbosity: u8, content: String) { +pub async fn print(sender: &t::PrintSender, verbosity: u8, content: String) { let _ = sender .send(t::Printout { verbosity, content }) .await diff --git a/kinode/src/kernel/standard_host.rs b/kinode/src/kernel/standard_host.rs index d9de47107..72c684e6b 100644 --- a/kinode/src/kernel/standard_host.rs +++ b/kinode/src/kernel/standard_host.rs @@ -2,10 +2,10 @@ use crate::kernel::process; use crate::KERNEL_PROCESS_ID; use crate::VFS_PROCESS_ID; use anyhow::Result; - use lib::types::core::{self as t, STATE_PROCESS_ID}; pub use lib::wit; pub use lib::wit::Host as StandardHost; +use ring::signature::{self, KeyPair}; async fn print_debug(proc: &process::ProcessState, content: &str) { let _ = proc @@ -17,6 +17,410 @@ async fn print_debug(proc: &process::ProcessState, content: &str) { .await; } +impl process::ProcessState { + /// Ingest latest message directed to this process, and save it as the current message. + /// If there is no message in the queue, wait async until one is received. + async fn get_next_message_for_process( + &mut self, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + let res = match self.message_queue.pop_front() { + Some(message_from_queue) => message_from_queue, + None => self.ingest_message().await, + }; + self.kernel_message_to_process_receive(res) + } + + /// instead of ingesting latest, wait for a specific ID and queue all others + async fn get_specific_message_for_process( + &mut self, + awaited_message_id: u64, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + // first, check if the awaited message is already in the queue and handle if so + for (i, message) in self.message_queue.iter().enumerate() { + match message { + Ok(ref km) if km.id == awaited_message_id => { + let km = self.message_queue.remove(i).unwrap(); + return self.kernel_message_to_process_receive(km); + } + _ => continue, + } + } + // next, wait for the awaited message to arrive + loop { + let res = self.ingest_message().await; + let id = match &res { + Ok(km) => km.id, + Err(e) => e.id, + }; + if id == awaited_message_id { + return self.kernel_message_to_process_receive(res); + } else { + self.message_queue.push_back(res); + } + } + } + + /// ingest next valid message from kernel. + /// cancel any timeout task associated with this message. + /// if the message is a response, only enqueue if we have an outstanding request for it. + async fn ingest_message(&mut self) -> Result { + loop { + let message = self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"); + + match &message { + Ok(km) => match &km.message { + t::Message::Response(_) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) { + timeout_handle.abort(); + return message; + } + } + _ => { + return message; + } + }, + Err(e) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) { + timeout_handle.abort(); + return message; + } + } + } + } + } + + /// Convert a message from the main event loop into a result for the process to receive. + /// If the message is a response or error, get context if we have one. + fn kernel_message_to_process_receive( + &mut self, + incoming: Result, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + let (mut km, context) = match incoming { + Ok(mut km) => match km.message { + t::Message::Request(_) => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) + } + t::Message::Response(_) => match self.contexts.remove(&km.id) { + Some((context, _timeout_handle)) => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = context.prompting_message; + (km, context.context) + } + None => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) + } + }, + }, + Err(e) => match self.contexts.remove(&e.id) { + None => return Err((t::en_wit_send_error(e.error), None)), + Some((context, _timeout_handle)) => { + self.prompting_message = context.prompting_message; + return Err((t::en_wit_send_error(e.error), context.context)); + } + }, + }; + + let pk = signature::UnparsedPublicKey::new( + &signature::ED25519, + self.keypair.as_ref().public_key(), + ); + + // prune any invalid capabilities before handing to process + // where invalid = supposedly issued by us, but not signed properly by us + match &mut km.message { + t::Message::Request(request) => { + request.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + t::Message::Response((response, _)) => { + response.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + }; + + Ok(( + km.source.en_wit(), + match km.message { + t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)), + // NOTE: we throw away whatever context came from the sender, that's not ours + t::Message::Response((response, _sent_context)) => { + wit::Message::Response((t::en_wit_response(response), context)) + } + }, + )) + } + + /// takes Request generated by a process and sends it to the main event loop. + /// will only fail if process does not have capability to send to target. + /// if the request has a timeout (expects response), start a task to track + /// that timeout and return timeout error if it expires. + async fn send_request( + &mut self, + // only used when kernel steps in to get/set state + fake_source: Option, + target: wit::Address, + request: wit::Request, + new_context: Option, + blob: Option, + ) -> Result { + let source = fake_source.unwrap_or(self.metadata.our.clone()); + let mut request = t::de_wit_request(request); + + // if request chooses to inherit, it means to take the ID and lazy_load_blob, + // if any, from the last message it ingested + + // if request chooses to inherit, match id to precedessor + // otherwise, id is generated randomly + let request_id: u64 = if request.inherit && self.prompting_message.is_some() { + self.prompting_message.as_ref().unwrap().id + } else { + loop { + let id = rand::random(); + if !self.contexts.contains_key(&id) { + break id; + } + } + }; + + // if a blob is provided, it will be used; otherwise, if inherit is true, + // and a predecessor exists, its blob will be used; otherwise, no blob will be used. + let blob = match blob { + Some(p) => Some(t::LazyLoadBlob { + mime: p.mime, + bytes: p.bytes, + }), + None => match request.inherit { + true => self.last_blob.clone(), + false => None, + }, + }; + + if !request.capabilities.is_empty() { + request.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: request + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } + + // if the request expects a response, modify the process' context map as needed + // and set a timer. + // TODO optimize this SIGNIFICANTLY: stop spawning tasks + // and use a global clock + garbage collect step to check for timeouts + if let Some(timeout_secs) = request.expects_response { + let this_request = request.clone(); + let this_blob = blob.clone(); + let self_sender = self.self_sender.clone(); + let original_target = t::Address::de_wit(target.clone()); + let timeout_handle = tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + let _ = self_sender + .send(Err(t::WrappedSendError { + id: request_id, + source: original_target.clone(), + error: t::SendError { + kind: t::SendErrorKind::Timeout, + target: original_target, + message: t::Message::Request(this_request), + lazy_load_blob: this_blob, + }, + })) + .await; + }); + self.contexts.insert( + request_id, + ( + process::ProcessContext { + prompting_message: self.prompting_message.clone(), + context: new_context, + }, + timeout_handle, + ), + ); + } + + // rsvp is set based on this priority: + // 1. whether this request expects a response -- if so, rsvp = our address, always + // 2. whether this request inherits -- if so, rsvp = prompting message's rsvp + // 3. if neither, rsvp = None + let kernel_message = t::KernelMessage { + id: request_id, + source, + target: t::Address::de_wit(target), + rsvp: match ( + request.expects_response, + request.inherit, + &self.prompting_message, + ) { + (Some(_), _, _) => { + // this request expects response, so receives any response + // make sure to use the real source, not a fake injected-by-kernel source + Some(self.metadata.our.clone()) + } + (None, true, Some(ref prompt)) => { + // this request inherits, so response will be routed to prompting message + prompt.rsvp.clone() + } + _ => None, + }, + message: t::Message::Request(request), + lazy_load_blob: blob, + }; + + self.send_to_loop + .send(kernel_message) + .await + .expect("fatal: kernel couldn't send request"); + + Ok(request_id) + } + + /// takes Response generated by a process and sends it to the main event loop. + async fn send_response(&mut self, response: wit::Response, blob: Option) { + let mut response = t::de_wit_response(response); + + // the process requires a prompting_message in order to issue a response + let Some(ref prompting_message) = self.prompting_message else { + process::print( + &self.send_to_terminal, + 0, + format!("kernel: need non-None prompting_message to handle Response {response:?}"), + ) + .await; + return; + }; + + // given the current process state, produce the id and target that + // a response it emits should have. + let (id, target) = ( + prompting_message.id, + match &prompting_message.rsvp { + None => prompting_message.source.clone(), + Some(rsvp) => rsvp.clone(), + }, + ); + + let blob = match response.inherit { + true => self.last_blob.clone(), + false => t::de_wit_blob(blob), + }; + + if !response.capabilities.is_empty() { + response.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: response + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } + + self.send_to_loop + .send(t::KernelMessage { + id, + source: self.metadata.our.clone(), + target, + rsvp: None, + message: t::Message::Response(( + response, + // the context will be set by the process receiving this Response. + None, + )), + lazy_load_blob: blob, + }) + .await + .expect("fatal: kernel couldn't send response"); + } +} + +async fn send_and_await_response( + process: &mut process::ProcessWasi, + source: Option, + target: wit::Address, + request: wit::Request, + blob: Option, +) -> Result> { + if request.expects_response.is_none() { + return Err(anyhow::anyhow!( + "kernel: got invalid send_and_await_response() Request from {:?}: must expect response", + process.process.metadata.our.process + )); + } + let id = process + .process + .send_request(source, target, request, None, blob) + .await; + match id { + Ok(id) => match process.process.get_specific_message_for_process(id).await { + Ok((address, wit::Message::Response(response))) => { + Ok(Ok((address, wit::Message::Response(response)))) + } + Ok((_address, wit::Message::Request(_))) => Err(anyhow::anyhow!( + "fatal: received Request instead of Response" + )), + Err((net_err, _context)) => Ok(Err(net_err)), + }, + Err(e) => Err(e), + } +} + /// /// create the process API. this is where the functions that a process can use live. /// @@ -63,7 +467,7 @@ impl StandardHost for process::ProcessWasi { /// asking it to fetch the current state saved under this process async fn get_state(&mut self) -> Result>> { let old_last_blob = self.process.last_blob.clone(); - let res = match process::send_and_await_response( + let res = match send_and_await_response( self, Some(t::Address { node: self.process.metadata.our.node.clone(), @@ -105,7 +509,7 @@ impl StandardHost for process::ProcessWasi { /// this process with these bytes async fn set_state(&mut self, bytes: Vec) -> Result<()> { let old_last_blob = self.process.last_blob.clone(); - let res = match process::send_and_await_response( + let res = match send_and_await_response( self, Some(t::Address { node: self.process.metadata.our.node.clone(), @@ -146,7 +550,7 @@ impl StandardHost for process::ProcessWasi { /// asking it to delete the current state saved under this process async fn clear_state(&mut self) -> Result<()> { let old_last_blob = self.process.last_blob.clone(); - let res = match process::send_and_await_response( + let res = match send_and_await_response( self, Some(t::Address { node: self.process.metadata.our.node.clone(), @@ -201,7 +605,7 @@ impl StandardHost for process::ProcessWasi { node: self.process.metadata.our.node.clone(), process: VFS_PROCESS_ID.en_wit(), }; - let Ok(Ok((_, hash_response))) = process::send_and_await_response( + let Ok(Ok((_, hash_response))) = send_and_await_response( self, None, vfs_address.clone(), @@ -251,7 +655,7 @@ impl StandardHost for process::ProcessWasi { self.process.metadata.our.process.publisher(), ); // TODO I think we need to kill this process first in case it already exists - let Ok(Ok((_, _response))) = process::send_and_await_response( + let Ok(Ok((_, _response))) = send_and_await_response( self, Some(t::Address { node: self.process.metadata.our.node.clone(), @@ -314,7 +718,7 @@ impl StandardHost for process::ProcessWasi { let _ = rx.await.unwrap(); } // finally, send the command to run the new process - let Ok(Ok((_, response))) = process::send_and_await_response( + let Ok(Ok((_, response))) = send_and_await_response( self, Some(t::Address { node: self.process.metadata.our.node.clone(), @@ -519,6 +923,6 @@ impl StandardHost for process::ProcessWasi { request: wit::Request, blob: Option, ) -> Result> { - process::send_and_await_response(self, None, target, request, blob).await + send_and_await_response(self, None, target, request, blob).await } } diff --git a/kinode/src/kernel/standard_host_v0.rs b/kinode/src/kernel/standard_host_v0.rs new file mode 100644 index 000000000..9ab407249 --- /dev/null +++ b/kinode/src/kernel/standard_host_v0.rs @@ -0,0 +1,934 @@ +use crate::kernel::process; +use crate::KERNEL_PROCESS_ID; +use crate::VFS_PROCESS_ID; +use anyhow::Result; +use lib::types::core::{self as t, STATE_PROCESS_ID}; +pub use lib::v0::wit; +pub use lib::v0::wit::Host as StandardHost; +use ring::signature::{self, KeyPair}; + +async fn print_debug(proc: &process::ProcessState, content: &str) { + let _ = proc + .send_to_terminal + .send(t::Printout { + verbosity: 2, + content: format!("{}: {}", proc.metadata.our.process, content), + }) + .await; +} + +impl process::ProcessState { + /// Ingest latest message directed to this process, and save it as the current message. + /// If there is no message in the queue, wait async until one is received. + async fn get_next_message_for_process_v0( + &mut self, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + let res = match self.message_queue.pop_front() { + Some(message_from_queue) => message_from_queue, + None => self.ingest_message_v0().await, + }; + self.kernel_message_to_process_receive_v0(res) + } + + /// instead of ingesting latest, wait for a specific ID and queue all others + async fn get_specific_message_for_process_v0( + &mut self, + awaited_message_id: u64, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + // first, check if the awaited message is already in the queue and handle if so + for (i, message) in self.message_queue.iter().enumerate() { + match message { + Ok(ref km) if km.id == awaited_message_id => { + let km = self.message_queue.remove(i).unwrap(); + return self.kernel_message_to_process_receive_v0(km); + } + _ => continue, + } + } + // next, wait for the awaited message to arrive + loop { + let res = self.ingest_message_v0().await; + let id = match &res { + Ok(km) => km.id, + Err(e) => e.id, + }; + if id == awaited_message_id { + return self.kernel_message_to_process_receive_v0(res); + } else { + self.message_queue.push_back(res); + } + } + } + + /// ingest next valid message from kernel. + /// cancel any timeout task associated with this message. + /// if the message is a response, only enqueue if we have an outstanding request for it. + async fn ingest_message_v0(&mut self) -> Result { + loop { + let message = self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"); + + match &message { + Ok(km) => match &km.message { + t::Message::Response(_) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) { + timeout_handle.abort(); + return message; + } + } + _ => { + return message; + } + }, + Err(e) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) { + timeout_handle.abort(); + return message; + } + } + } + } + } + + /// Convert a message from the main event loop into a result for the process to receive. + /// If the message is a response or error, get context if we have one. + fn kernel_message_to_process_receive_v0( + &mut self, + incoming: Result, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + let (mut km, context) = match incoming { + Ok(mut km) => match km.message { + t::Message::Request(_) => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) + } + t::Message::Response(_) => match self.contexts.remove(&km.id) { + Some((context, _timeout_handle)) => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = context.prompting_message; + (km, context.context) + } + None => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) + } + }, + }, + Err(e) => match self.contexts.remove(&e.id) { + None => return Err((t::en_wit_send_error_v0(e.error), None)), + Some((context, _timeout_handle)) => { + self.prompting_message = context.prompting_message; + return Err((t::en_wit_send_error_v0(e.error), context.context)); + } + }, + }; + + let pk = signature::UnparsedPublicKey::new( + &signature::ED25519, + self.keypair.as_ref().public_key(), + ); + + // prune any invalid capabilities before handing to process + // where invalid = supposedly issued by us, but not signed properly by us + match &mut km.message { + t::Message::Request(request) => { + request.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + t::Message::Response((response, _)) => { + response.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + }; + + Ok(( + km.source.en_wit_v0(), + match km.message { + t::Message::Request(request) => { + wit::Message::Request(t::en_wit_request_v0(request)) + } + // NOTE: we throw away whatever context came from the sender, that's not ours + t::Message::Response((response, _sent_context)) => { + wit::Message::Response((t::en_wit_response_v0(response), context)) + } + }, + )) + } + + /// takes Request generated by a process and sends it to the main event loop. + /// will only fail if process does not have capability to send to target. + /// if the request has a timeout (expects response), start a task to track + /// that timeout and return timeout error if it expires. + async fn send_request_v0( + &mut self, + // only used when kernel steps in to get/set state + fake_source: Option, + target: wit::Address, + request: wit::Request, + new_context: Option, + blob: Option, + ) -> Result { + let source = fake_source.unwrap_or(self.metadata.our.clone()); + let mut request = t::de_wit_request_v0(request); + + // if request chooses to inherit, it means to take the ID and lazy_load_blob, + // if any, from the last message it ingested + + // if request chooses to inherit, match id to precedessor + // otherwise, id is generated randomly + let request_id: u64 = if request.inherit && self.prompting_message.is_some() { + self.prompting_message.as_ref().unwrap().id + } else { + loop { + let id = rand::random(); + if !self.contexts.contains_key(&id) { + break id; + } + } + }; + + // if a blob is provided, it will be used; otherwise, if inherit is true, + // and a predecessor exists, its blob will be used; otherwise, no blob will be used. + let blob = match blob { + Some(p) => Some(t::LazyLoadBlob { + mime: p.mime, + bytes: p.bytes, + }), + None => match request.inherit { + true => self.last_blob.clone(), + false => None, + }, + }; + + if !request.capabilities.is_empty() { + request.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: request + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } + + // if the request expects a response, modify the process' context map as needed + // and set a timer. + // TODO optimize this SIGNIFICANTLY: stop spawning tasks + // and use a global clock + garbage collect step to check for timeouts + if let Some(timeout_secs) = request.expects_response { + let this_request = request.clone(); + let this_blob = blob.clone(); + let self_sender = self.self_sender.clone(); + let original_target = t::Address::de_wit_v0(target.clone()); + let timeout_handle = tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + let _ = self_sender + .send(Err(t::WrappedSendError { + id: request_id, + source: original_target.clone(), + error: t::SendError { + kind: t::SendErrorKind::Timeout, + target: original_target, + message: t::Message::Request(this_request), + lazy_load_blob: this_blob, + }, + })) + .await; + }); + self.contexts.insert( + request_id, + ( + process::ProcessContext { + prompting_message: self.prompting_message.clone(), + context: new_context, + }, + timeout_handle, + ), + ); + } + + // rsvp is set based on this priority: + // 1. whether this request expects a response -- if so, rsvp = our address, always + // 2. whether this request inherits -- if so, rsvp = prompting message's rsvp + // 3. if neither, rsvp = None + let kernel_message = t::KernelMessage { + id: request_id, + source, + target: t::Address::de_wit_v0(target), + rsvp: match ( + request.expects_response, + request.inherit, + &self.prompting_message, + ) { + (Some(_), _, _) => { + // this request expects response, so receives any response + // make sure to use the real source, not a fake injected-by-kernel source + Some(self.metadata.our.clone()) + } + (None, true, Some(ref prompt)) => { + // this request inherits, so response will be routed to prompting message + prompt.rsvp.clone() + } + _ => None, + }, + message: t::Message::Request(request), + lazy_load_blob: blob, + }; + + self.send_to_loop + .send(kernel_message) + .await + .expect("fatal: kernel couldn't send request"); + + Ok(request_id) + } + + /// takes Response generated by a process and sends it to the main event loop. + async fn send_response_v0(&mut self, response: wit::Response, blob: Option) { + let mut response = t::de_wit_response_v0(response); + + // the process requires a prompting_message in order to issue a response + let Some(ref prompting_message) = self.prompting_message else { + process::print( + &self.send_to_terminal, + 0, + format!("kernel: need non-None prompting_message to handle Response {response:?}"), + ) + .await; + return; + }; + + // given the current process state, produce the id and target that + // a response it emits should have. + let (id, target) = ( + prompting_message.id, + match &prompting_message.rsvp { + None => prompting_message.source.clone(), + Some(rsvp) => rsvp.clone(), + }, + ); + + let blob = match response.inherit { + true => self.last_blob.clone(), + false => t::de_wit_blob_v0(blob), + }; + + if !response.capabilities.is_empty() { + response.capabilities = { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.caps_oracle + .send(t::CapMessage::FilterCaps { + on: self.metadata.our.process.clone(), + caps: response + .capabilities + .into_iter() + .map(|(cap, _)| cap) + .collect(), + responder: tx, + }) + .await + .expect("fatal: process couldn't access capabilities oracle"); + rx.await + .expect("fatal: process couldn't receive capabilities") + }; + } + + self.send_to_loop + .send(t::KernelMessage { + id, + source: self.metadata.our.clone(), + target, + rsvp: None, + message: t::Message::Response(( + response, + // the context will be set by the process receiving this Response. + None, + )), + lazy_load_blob: blob, + }) + .await + .expect("fatal: kernel couldn't send response"); + } +} + +async fn send_and_await_response( + process: &mut process::ProcessWasiV0, + source: Option, + target: wit::Address, + request: wit::Request, + blob: Option, +) -> Result> { + if request.expects_response.is_none() { + return Err(anyhow::anyhow!( + "kernel: got invalid send_and_await_response() Request from {:?}: must expect response", + process.process.metadata.our.process + )); + } + let id = process + .process + .send_request_v0(source, target, request, None, blob) + .await; + match id { + Ok(id) => match process + .process + .get_specific_message_for_process_v0(id) + .await + { + Ok((address, wit::Message::Response(response))) => { + Ok(Ok((address, wit::Message::Response(response)))) + } + Ok((_address, wit::Message::Request(_))) => Err(anyhow::anyhow!( + "fatal: received Request instead of Response" + )), + Err((net_err, _context)) => Ok(Err(net_err)), + }, + Err(e) => Err(e), + } +} + +/// +/// create the process API. this is where the functions that a process can use live. +/// +#[async_trait::async_trait] +impl StandardHost for process::ProcessWasiV0 { + // + // system utils: + // + + /// Print a message to the runtime terminal. Add the name of the process to the + /// beginning of the string, so user can verify source. + async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> { + self.process + .send_to_terminal + .send(t::Printout { + verbosity, + content: format!( + "{}:{}: {}", + self.process.metadata.our.process.package(), + self.process.metadata.our.process.publisher(), + content + ), + }) + .await + .map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}")) + } + + // + // process management: + // + + /// TODO critical: move to kernel logic to enable persistence of choice made here + async fn set_on_exit(&mut self, on_exit: wit::OnExit) -> Result<()> { + self.process.metadata.on_exit = t::OnExit::de_wit_v0(on_exit); + print_debug(&self.process, "set new on-exit behavior").await; + Ok(()) + } + + async fn get_on_exit(&mut self) -> Result { + Ok(self.process.metadata.on_exit.en_wit_v0()) + } + + /// create a message from the *kernel* to the filesystem, + /// asking it to fetch the current state saved under this process + async fn get_state(&mut self) -> Result>> { + let old_last_blob = self.process.last_blob.clone(); + let res = match send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: STATE_PROCESS_ID.en_wit_v0(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::StateAction::GetState( + self.process.metadata.our.process.clone(), + )) + .unwrap(), + metadata: Some(self.process.metadata.our.process.to_string()), + capabilities: vec![], + }, + None, + ) + .await + { + Ok(Ok(_resp)) => { + // basically assuming filesystem responding properly here + match &self.process.last_blob { + None => Ok(None), + Some(blob) => Ok(Some(blob.bytes.clone())), + } + } + _ => Ok(None), + }; + self.process.last_blob = old_last_blob; + return res; + } + + /// create a message from the *kernel* to the filesystem, + /// asking it to replace the current state saved under + /// this process with these bytes + async fn set_state(&mut self, bytes: Vec) -> Result<()> { + let old_last_blob = self.process.last_blob.clone(); + let res = match send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: STATE_PROCESS_ID.en_wit_v0(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::StateAction::SetState( + self.process.metadata.our.process.clone(), + )) + .unwrap(), + metadata: Some(self.process.metadata.our.process.to_string()), + capabilities: vec![], + }, + Some(wit::LazyLoadBlob { mime: None, bytes }), + ) + .await + { + Ok(Ok(_resp)) => { + // basically assuming filesystem responding properly here + Ok(()) + } + _ => Err(anyhow::anyhow!( + "filesystem did not respond properly to SetState!!" + )), + }; + self.process.last_blob = old_last_blob; + print_debug(&self.process, "persisted state").await; + return res; + } + + /// create a message from the *kernel* to the filesystem, + /// asking it to delete the current state saved under this process + async fn clear_state(&mut self) -> Result<()> { + let old_last_blob = self.process.last_blob.clone(); + let res = match send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: STATE_PROCESS_ID.en_wit_v0(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::StateAction::DeleteState( + self.process.metadata.our.process.clone(), + )) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ) + .await + { + Ok(Ok(_resp)) => { + // basically assuming filesystem responding properly here + Ok(()) + } + _ => Err(anyhow::anyhow!( + "filesystem did not respond properly to ClearState!!" + )), + }; + self.process.last_blob = old_last_blob; + print_debug(&self.process, "cleared persisted state").await; + return res; + } + + /// shortcut to spawn a new process. the child process will automatically + /// be able to send messages to the parent process, and vice versa. + /// the .wasm file for the process must already be in VFS. + async fn spawn( + &mut self, + name: Option, + wasm_path: String, // must be located within package's drive + on_exit: wit::OnExit, + request_capabilities: Vec, + grant_capabilities: Vec, + public: bool, + ) -> Result> { + // save existing blob to restore later + let old_last_blob = self.process.last_blob.clone(); + let vfs_address = wit::Address { + node: self.process.metadata.our.node.clone(), + process: VFS_PROCESS_ID.en_wit_v0(), + }; + let Ok(Ok((_, hash_response))) = send_and_await_response( + self, + None, + vfs_address.clone(), + wit::Request { + inherit: false, + expects_response: Some(5), + body: serde_json::to_vec(&t::VfsRequest { + path: wasm_path.clone(), + action: t::VfsAction::Read, + }) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ) + .await + else { + println!("spawn: GetHash fail"); + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let wit::Message::Response((wit::Response { body, .. }, _)) = hash_response else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let t::VfsResponse::Read = serde_json::from_slice(&body).unwrap() else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let Some(t::LazyLoadBlob { mime: _, ref bytes }) = self.process.last_blob else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + + let name = match name { + Some(name) => name, + None => rand::random::().to_string(), + }; + let new_process_id = t::ProcessId::new( + Some(&name), + self.process.metadata.our.process.package(), + self.process.metadata.our.process.publisher(), + ); + // TODO I think we need to kill this process first in case it already exists + let Ok(Ok((_, _response))) = send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.en_wit_v0(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), // TODO evaluate + body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { + id: new_process_id.clone(), + wasm_bytes_handle: wasm_path, + wit_version: self.process.metadata.wit_version, + on_exit: t::OnExit::de_wit_v0(on_exit), + initial_capabilities: request_capabilities + .iter() + .map(|cap| t::Capability { + issuer: t::Address::de_wit_v0(cap.clone().issuer), + params: cap.clone().params, + }) + .collect(), + public, + }) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + Some(wit::LazyLoadBlob { + mime: None, + bytes: bytes.to_vec(), + }), + ) + .await + else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NameTaken)); + }; + // insert messaging capabilities into requested processes + for process in grant_capabilities { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.process + .caps_oracle + .send(t::CapMessage::Add { + on: t::ProcessId::de_wit_v0(process), + caps: vec![t::Capability { + issuer: t::Address { + node: self.process.metadata.our.node.clone(), + process: new_process_id.clone(), + }, + params: "\"messaging\"".into(), + }], + responder: tx, + }) + .await + .unwrap(); + let _ = rx.await.unwrap(); + } + // finally, send the command to run the new process + let Ok(Ok((_, response))) = send_and_await_response( + self, + Some(t::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.clone(), + }), + wit::Address { + node: self.process.metadata.our.node.clone(), + process: KERNEL_PROCESS_ID.en_wit_v0(), + }, + wit::Request { + inherit: false, + expects_response: Some(5), // TODO evaluate + body: serde_json::to_vec(&t::KernelCommand::RunProcess(new_process_id.clone())) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ) + .await + else { + // reset blob to what it was + self.process.last_blob = old_last_blob; + return Ok(Err(wit::SpawnError::NameTaken)); + }; + // reset blob to what it was + self.process.last_blob = old_last_blob; + let wit::Message::Response((wit::Response { body, .. }, _)) = response else { + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + let t::KernelResponse::StartedProcess = serde_json::from_slice(&body).unwrap() else { + return Ok(Err(wit::SpawnError::NoFileAtPath)); + }; + // child processes are always able to Message parent + let (tx, rx) = tokio::sync::oneshot::channel(); + self.process + .caps_oracle + .send(t::CapMessage::Add { + on: new_process_id.clone(), + caps: vec![t::Capability { + issuer: self.process.metadata.our.clone(), + params: "\"messaging\"".into(), + }], + responder: tx, + }) + .await + .unwrap(); + let _ = rx.await.unwrap(); + + // parent process is always able to Message child + let (tx, rx) = tokio::sync::oneshot::channel(); + self.process + .caps_oracle + .send(t::CapMessage::Add { + on: self.process.metadata.our.process.clone(), + caps: vec![t::Capability { + issuer: t::Address { + node: self.process.metadata.our.node.clone(), + process: new_process_id.clone(), + }, + params: "\"messaging\"".into(), + }], + responder: tx, + }) + .await + .unwrap(); + let _ = rx.await.unwrap(); + print_debug(&self.process, "spawned a new process").await; + Ok(Ok(new_process_id.en_wit_v0().to_owned())) + } + + // + // capabilities management + // + + async fn save_capabilities(&mut self, caps: Vec) -> Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self + .process + .caps_oracle + .send(t::CapMessage::Add { + on: self.process.metadata.our.process.clone(), + caps: caps + .iter() + .map(|cap| t::de_wit_capability_v0(cap.clone()).0) + .collect(), + responder: tx, + }) + .await?; + let _ = rx.await?; + Ok(()) + } + + async fn drop_capabilities(&mut self, caps: Vec) -> Result<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self + .process + .caps_oracle + .send(t::CapMessage::Drop { + on: self.process.metadata.our.process.clone(), + caps: caps + .iter() + .map(|cap| t::de_wit_capability_v0(cap.clone()).0) + .collect(), + responder: tx, + }) + .await?; + let _ = rx.await?; + Ok(()) + } + + async fn our_capabilities(&mut self) -> Result> { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self + .process + .caps_oracle + .send(t::CapMessage::GetAll { + on: self.process.metadata.our.process.clone(), + responder: tx, + }) + .await?; + let caps = rx.await?; + Ok(caps + .into_iter() + .map(|cap| wit::Capability { + issuer: t::Address::en_wit_v0(&cap.0.issuer), + params: cap.0.params, + }) + .collect()) + } + + // + // message I/O: + // + + /// from a process: receive the next incoming message. will wait async until a message is received. + /// the incoming message can be a Request or a Response, or an Error of the Network variety. + async fn receive( + &mut self, + ) -> Result)>> { + Ok(self.process.get_next_message_for_process_v0().await) + } + + /// from a process: grab the blob part of the current prompting message. + /// if the prompting message did not have a blob, will return None. + /// will also return None if there is no prompting message. + async fn get_blob(&mut self) -> Result> { + Ok(t::en_wit_blob_v0(self.process.last_blob.clone())) + } + + async fn send_request( + &mut self, + target: wit::Address, + request: wit::Request, + context: Option, + blob: Option, + ) -> Result<()> { + let id = self + .process + .send_request_v0(None, target, request, context, blob) + .await; + match id { + Ok(_id) => Ok(()), + Err(e) => Err(e), + } + } + + async fn send_requests( + &mut self, + requests: Vec<( + wit::Address, + wit::Request, + Option, + Option, + )>, + ) -> Result<()> { + for request in requests { + let id = self + .process + .send_request_v0(None, request.0, request.1, request.2, request.3) + .await; + match id { + Ok(_id) => continue, + Err(e) => return Err(e), + } + } + Ok(()) + } + + async fn send_response( + &mut self, + response: wit::Response, + blob: Option, + ) -> Result<()> { + self.process.send_response_v0(response, blob).await; + Ok(()) + } + + async fn send_and_await_response( + &mut self, + target: wit::Address, + request: wit::Request, + blob: Option, + ) -> Result> { + send_and_await_response(self, None, target, request, blob).await + } +} diff --git a/lib/src/core.rs b/lib/src/core.rs index 31da0a1b6..81e2e089e 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -306,6 +306,16 @@ impl Address { }, } } + pub fn de_wit_v0(wit: crate::v0::wit::Address) -> Address { + Address { + node: wit.node, + process: ProcessId { + process_name: wit.process.process_name, + package_name: wit.process.package_name, + publisher_node: wit.process.publisher_node, + }, + } + } } impl std::str::FromStr for Address { @@ -529,6 +539,24 @@ impl OnExit { } } + pub fn en_wit_v0(&self) -> crate::v0::wit::OnExit { + match self { + OnExit::None => crate::v0::wit::OnExit::None, + OnExit::Restart => crate::v0::wit::OnExit::Restart, + OnExit::Requests(reqs) => crate::v0::wit::OnExit::Requests( + reqs.iter() + .map(|(address, request, blob)| { + ( + address.en_wit_v0(), + en_wit_request_v0(request.clone()), + en_wit_blob_v0(blob.clone()), + ) + }) + .collect(), + ), + } + } + pub fn de_wit(wit: wit::OnExit) -> Self { match wit { wit::OnExit::None => OnExit::None, @@ -546,6 +574,24 @@ impl OnExit { ), } } + + pub fn de_wit_v0(wit: crate::v0::wit::OnExit) -> Self { + match wit { + crate::v0::wit::OnExit::None => OnExit::None, + crate::v0::wit::OnExit::Restart => OnExit::Restart, + crate::v0::wit::OnExit::Requests(reqs) => OnExit::Requests( + reqs.into_iter() + .map(|(address, request, blob)| { + ( + Address::de_wit_v0(address), + de_wit_request_v0(request), + de_wit_blob_v0(blob), + ) + }) + .collect(), + ), + } + } } impl std::fmt::Display for Message { From e54c0cf04144951f94b342f87329897f1dbbac9b Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Thu, 16 May 2024 17:06:55 -0600 Subject: [PATCH 4/4] feat: kernel switching on process wit version --- Cargo.lock | 73 +++---------------- kinode/Cargo.toml | 2 +- .../packages/app_store/app_store/Cargo.toml | 2 +- .../packages/app_store/app_store/src/lib.rs | 8 +- kinode/packages/app_store/metadata.json | 5 +- kinode/src/kernel/process.rs | 4 - kinode/src/state.rs | 2 +- lib/Cargo.toml | 2 +- 8 files changed, 23 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20ce7611c..1cad1af36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -649,7 +649,7 @@ dependencies = [ "alloy-sol-types 0.7.0", "anyhow", "bincode", - "kinode_process_lib 0.7.1", + "kinode_process_lib 0.8.0", "rand 0.8.5", "serde", "serde_json", @@ -3084,7 +3084,7 @@ dependencies = [ "hmac", "http 1.1.0", "jwt", - "kit 0.4.1", + "kit", "lazy_static", "lib", "log", @@ -3130,29 +3130,8 @@ dependencies = [ [[package]] name = "kinode_process_lib" -version = "0.6.0" -source = "git+https://github.com/kinode-dao/process_lib.git?rev=84b3d84#84b3d84c7c31185f15691a288f1b45dbffb18fe2" -dependencies = [ - "alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)", - "alloy-primitives 0.6.4", - "alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)", - "alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)", - "anyhow", - "bincode", - "http 1.1.0", - "mime_guess", - "rand 0.8.5", - "serde", - "serde_json", - "thiserror", - "url", - "wit-bindgen", -] - -[[package]] -name = "kinode_process_lib" -version = "0.7.0" -source = "git+https://github.com/kinode-dao/process_lib.git?rev=2aa3a1a#2aa3a1a22e8a88e46864d474d777422eb1f1b60b" +version = "0.7.1" +source = "git+https://github.com/kinode-dao/process_lib?tag=v0.7.2#61a8de975fd0a812a0a033ee0975fb83dd166224" dependencies = [ "alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cad7935)", "alloy-primitives 0.7.0", @@ -3173,8 +3152,8 @@ dependencies = [ [[package]] name = "kinode_process_lib" -version = "0.7.1" -source = "git+https://github.com/kinode-dao/process_lib?tag=v0.7.2#61a8de975fd0a812a0a033ee0975fb83dd166224" +version = "0.8.0" +source = "git+https://github.com/kinode-dao/process_lib?rev=7eb3a04#7eb3a04f9be79d1cc3a52fa460faeea7ba3008ed" dependencies = [ "alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cad7935)", "alloy-primitives 0.7.0", @@ -3193,44 +3172,10 @@ dependencies = [ "wit-bindgen", ] -[[package]] -name = "kit" -version = "0.3.1" -source = "git+https://github.com/kinode-dao/kit?rev=659f59e#659f59ec9b8a503e4998e1bbe4b991429fa7ff33" -dependencies = [ - "anyhow", - "base64 0.21.7", - "clap", - "color-eyre", - "dirs 5.0.1", - "fs-err", - "futures-util", - "git2", - "hex", - "kinode_process_lib 0.6.0", - "nix", - "regex", - "reqwest 0.11.27", - "rmp-serde", - "semver 1.0.22", - "serde", - "serde_json", - "thiserror", - "tokio", - "tokio-tungstenite 0.21.0", - "toml", - "tracing", - "tracing-appender", - "tracing-error", - "tracing-subscriber", - "walkdir", - "zip 0.6.6", -] - [[package]] name = "kit" version = "0.4.1" -source = "git+https://github.com/kinode-dao/kit?rev=9ad80d2#9ad80d25db1c0fd137ef236f26e506e89470b0ad" +source = "git+https://github.com/kinode-dao/kit?rev=49093a0#49093a07bd9af62f2a62f96fb82dbefa51e79a71" dependencies = [ "anyhow", "base64 0.21.7", @@ -3241,7 +3186,7 @@ dependencies = [ "futures-util", "git2", "hex", - "kinode_process_lib 0.7.0", + "kinode_process_lib 0.8.0", "nix", "regex", "reqwest 0.11.27", @@ -3302,7 +3247,7 @@ version = "0.7.4" dependencies = [ "alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)", "anyhow", - "kit 0.3.1", + "kit", "lazy_static", "rand 0.8.5", "reqwest 0.12.4", diff --git a/kinode/Cargo.toml b/kinode/Cargo.toml index a20a8b776..6d2b96f7d 100644 --- a/kinode/Cargo.toml +++ b/kinode/Cargo.toml @@ -14,7 +14,7 @@ path = "src/main.rs" [build-dependencies] anyhow = "1.0.71" -kit = { git = "https://github.com/kinode-dao/kit", rev = "9ad80d2" } +kit = { git = "https://github.com/kinode-dao/kit", rev = "49093a0" } rayon = "1.8.1" sha2 = "0.10" tokio = "1.28" diff --git a/kinode/packages/app_store/app_store/Cargo.toml b/kinode/packages/app_store/app_store/Cargo.toml index fad7ba228..1b804bc9c 100644 --- a/kinode/packages/app_store/app_store/Cargo.toml +++ b/kinode/packages/app_store/app_store/Cargo.toml @@ -11,7 +11,7 @@ alloy-primitives = "0.7.0" alloy-sol-types = "0.7.0" anyhow = "1.0" bincode = "1.3.3" -kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.7.2" } +kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "7eb3a04" } rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/kinode/packages/app_store/app_store/src/lib.rs b/kinode/packages/app_store/app_store/src/lib.rs index e7f283dd1..6ab9cdf5e 100644 --- a/kinode/packages/app_store/app_store/src/lib.rs +++ b/kinode/packages/app_store/app_store/src/lib.rs @@ -1021,6 +1021,12 @@ pub fn handle_install( ) -> anyhow::Result<()> { let drive_path = format!("/{package_id}/pkg"); let manifest = fetch_package_manifest(package_id)?; + let metadata = state + .get_downloaded_package(package_id) + .ok_or_else(|| anyhow::anyhow!("package not found in manager"))? + .metadata + .ok_or_else(|| anyhow::anyhow!("package has no metadata"))?; + // always grant read/write to their drive, which we created for them let Some(read_cap) = get_capability( &Address::new(&our.node, ("vfs", "distro", "sys")), @@ -1085,7 +1091,7 @@ pub fn handle_install( .body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess { id: parsed_new_process_id.clone(), wasm_bytes_handle: wasm_path, - wit_version: None, // TODO get from manifest!!! + wit_version: metadata.properties.wit_version, on_exit: entry.on_exit.clone(), initial_capabilities: HashSet::new(), public: entry.public, diff --git a/kinode/packages/app_store/metadata.json b/kinode/packages/app_store/metadata.json index a3346d8d8..235d105a2 100644 --- a/kinode/packages/app_store/metadata.json +++ b/kinode/packages/app_store/metadata.json @@ -4,14 +4,15 @@ "image": "", "properties": { "package_name": "app_store", - "current_version": "0.3.1", "publisher": "sys", + "current_version": "0.3.1", "mirrors": [], "code_hashes": { "0.3.1": "" }, + "wit_version": 0, "dependencies": [] }, "external_url": "https://kinode.org", "animation_url": "" -} +} \ No newline at end of file diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 21886ef21..bd5ef1cf2 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -309,8 +309,6 @@ pub async fn make_process_loop( let metadata = match metadata.wit_version { // assume missing version is oldest wit version None => { - println!("WIT 0.7.0 OR NONE GIVEN\r"); - let (bindings, mut store, wasi_stderr) = make_component(engine, &wasm_bytes, home_directory_path, process_state).await?; @@ -351,8 +349,6 @@ pub async fn make_process_loop( // match version numbers // assume higher uncovered version number is latest version Some(0) | _ => { - println!("WIT 0.8.0 OR HIGHER\r"); - let (bindings, mut store, wasi_stderr) = make_component_v0(engine, &wasm_bytes, home_directory_path, process_state).await?; diff --git a/kinode/src/state.rs b/kinode/src/state.rs index f1e2da681..04580eec0 100644 --- a/kinode/src/state.rs +++ b/kinode/src/state.rs @@ -596,7 +596,7 @@ async fn bootstrap( std::collections::hash_map::Entry::Vacant(v) => { v.insert(PersistedProcess { wasm_bytes_handle: wasm_bytes_handle.clone(), - wit_version: None, + wit_version: package_metadata.properties.wit_version, on_exit: entry.on_exit, capabilities: requested_caps, public: public_process, diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 63e5612c8..ac1781275 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -12,7 +12,7 @@ license = "Apache-2.0" [build-dependencies] anyhow = "1.0.71" -kit = { git = "https://github.com/kinode-dao/kit", rev = "659f59e" } +kit = { git = "https://github.com/kinode-dao/kit", rev = "49093a0" } reqwest = { version = "0.12.4", features = ["blocking"] } tokio = "1.28"