From 5baebb42433da472999789d0e3de7876dd872aa5 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 17 Jun 2024 16:19:32 -0400 Subject: [PATCH 1/7] vfs: make all imports explicit --- kinode/src/vfs.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 4312a21cd..67002cf21 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -1,14 +1,19 @@ use dashmap::DashMap; +use lib::types::core::{ + Address, CapMessage, CapMessageSender, Capability, DirEntry, FileMetadata, FileType, + KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, + Printout, ProcessId, Request, Response, VfsAction, VfsError, VfsRequest, VfsResponse, + KERNEL_PROCESS_ID, VFS_PROCESS_ID, +}; use std::collections::{HashMap, VecDeque}; -use std::io::prelude::*; +use std::io::Read; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; -use tokio::fs; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; -use tokio::sync::Mutex; - -use lib::types::core::*; +use tokio::{ + fs, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, + sync::Mutex, +}; pub async fn vfs( our_node: String, @@ -590,7 +595,7 @@ async fn open_file>( Some(file) => Arc::clone(file.value()), None => { let file = Arc::new(Mutex::new( - OpenOptions::new() + tokio::fs::OpenOptions::new() .read(true) .write(true) .create(create) From cfd9959c82bc189ba3954f44ccb557554de28297 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 17 Jun 2024 22:35:37 -0400 Subject: [PATCH 2/7] vfs: refactor for less copying, cleaner --- kinode/src/main.rs | 3 +- kinode/src/vfs.rs | 490 ++++++++++++++++++--------------------------- 2 files changed, 201 insertions(+), 292 deletions(-) diff --git a/kinode/src/main.rs b/kinode/src/main.rs index ac4a4b671..930aa60c9 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -280,6 +280,7 @@ async fn main() { * if any of these modules fail, the program exits with an error. */ let networking_keypair_arc = Arc::new(decoded_keyfile.networking_keypair); + let our_name_arc = Arc::new(our.name.clone()); let (kernel_process_map, db, reverse_cap_index) = state::load_state( our.name.clone(), @@ -387,7 +388,7 @@ async fn main() { print_sender.clone(), )); tasks.spawn(vfs::vfs( - our.name.clone(), + our_name_arc, kernel_message_sender.clone(), print_sender.clone(), vfs_message_receiver, diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 67002cf21..c1e5d8d94 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -16,53 +16,53 @@ use tokio::{ }; pub async fn vfs( - our_node: String, + our_node: Arc, send_to_loop: MessageSender, send_to_terminal: PrintSender, mut recv_from_loop: MessageReceiver, send_to_caps_oracle: CapMessageSender, home_directory_path: String, ) -> anyhow::Result<()> { - let our_node = Arc::new(our_node); let vfs_path = format!("{home_directory_path}/vfs"); if let Err(e) = fs::create_dir_all(&vfs_path).await { - panic!("failed creating vfs dir! {:?}", e); + panic!("failed creating vfs dir! {e:?}"); } - let vfs_path = fs::canonicalize(&vfs_path).await?; + let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?); let open_files: Arc>>> = Arc::new(DashMap::new()); - let mut process_queues: HashMap>>> = - HashMap::new(); + let process_queues: HashMap>>> = HashMap::new(); while let Some(km) = recv_from_loop.recv().await { if *our_node != km.source.node { - let _ = send_to_terminal.send(Printout { - verbosity: 1, - content: format!( + let _ = send_to_terminal + .send(Printout { + verbosity: 1, + content: format!( "vfs: got request from {}, but requests must come from our node {our_node}\r", km.source.node, ), - }); + }) + .await; continue; } let queue = process_queues - .entry(km.source.process.clone()) - .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - .clone(); + .get(&km.source.process) + .cloned() + .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); { let mut queue_lock = queue.lock().await; - queue_lock.push_back(km.clone()); + queue_lock.push_back(km); } // clone Arcs let our_node = our_node.clone(); - let send_to_caps_oracle = send_to_caps_oracle.clone(); - let send_to_terminal = send_to_terminal.clone(); let send_to_loop = send_to_loop.clone(); + let send_to_terminal = send_to_terminal.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); let open_files = open_files.clone(); let vfs_path = vfs_path.clone(); @@ -76,12 +76,17 @@ pub async fn vfs( km, open_files, &send_to_loop, - &send_to_terminal, &send_to_caps_oracle, &vfs_path, ) .await { + let _ = send_to_terminal + .send(Printout { + verbosity: 1, + content: format!("vfs: {e}\r"), + }) + .await; let _ = send_to_loop .send(make_error_message( our_node.to_string(), @@ -102,7 +107,6 @@ async fn handle_request( km: KernelMessage, open_files: Arc>>>, send_to_loop: &MessageSender, - send_to_terminal: &PrintSender, send_to_caps_oracle: &CapMessageSender, vfs_path: &PathBuf, ) -> Result<(), VfsError> { @@ -130,26 +134,10 @@ async fn handle_request( // special case for root reading list of all drives. if request.action == VfsAction::ReadDir && request.path == "/" { // check if src has root - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - send_to_caps_oracle - .send(CapMessage::Has { - on: km.source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "root": true, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await?; - let has_root_cap = recv_cap_bool.await?; + let has_root_cap = + read_capability("", "", true, our_node, &km.source, send_to_caps_oracle).await; if has_root_cap { - let mut dir = fs::read_dir(vfs_path.clone()).await?; + let mut dir = fs::read_dir(&vfs_path).await?; let mut entries = Vec::new(); while let Some(entry) = dir.next_entry().await? { let entry_path = entry.path(); @@ -187,29 +175,29 @@ async fn handle_request( let _ = send_to_loop.send(response).await; return Ok(()); } else { - let no_cap_error = VfsError::NoCap { + return Err(VfsError::NoCap { action: request.action.to_string(), - path: request.path.clone(), - }; - return Err(no_cap_error); + path: request.path, + }); } } // current prepend to filepaths needs to be: /package_id/drive/path let (package_id, drive, rest) = parse_package_and_drive(&request.path, &vfs_path).await?; - let drive = format!("/{}/{}", package_id, drive); - let path = PathBuf::from(request.path.clone()); + let drive = format!("/{package_id}/{drive}"); + let action = request.action; + let path = PathBuf::from(request.path); if &km.source.process != &*KERNEL_PROCESS_ID { check_caps( our_node, - km.source.clone(), - send_to_caps_oracle.clone(), - &request, - path.clone(), - drive.clone(), - package_id, - vfs_path.clone(), + &km.source, + &send_to_caps_oracle, + &action, + &path, + &drive, + &package_id, + &vfs_path, ) .await?; } @@ -217,27 +205,25 @@ async fn handle_request( let base_drive = join_paths_safely(&vfs_path, &drive); let path = join_paths_safely(&base_drive, &rest); - let (body, bytes) = match request.action { + let (response_body, bytes) = match action { VfsAction::CreateDrive => { - // handled in check_caps. - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + let drive_path = join_paths_safely(&vfs_path, &drive); + fs::create_dir_all(drive_path).await?; + (VfsResponse::Ok, None) } VfsAction::CreateDir => { - // check error mapping - // fs::create_dir_all(path).await.map_err(|e| VfsError::IOError { source: e, path: path.clone() })?; fs::create_dir(path).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::CreateDirAll => { fs::create_dir_all(path).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::CreateFile => { // create truncates any file that might've existed before open_files.remove(&path); let _file = open_file(open_files, path, true, true).await?; - - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::OpenFile { create } => { // open file opens an existing file, or creates a new one if create is true @@ -245,13 +231,12 @@ async fn handle_request( let mut file = file.lock().await; // extra in the case file was just created, todo refactor out. file.seek(SeekFrom::Start(0)).await?; - - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::CloseFile => { // removes file from scope, resets file_handle and cursor. open_files.remove(&path); - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::WriteAll => { // doesn't create a file, writes at exact cursor. @@ -263,7 +248,7 @@ async fn handle_request( let file = open_file(open_files, path, false, false).await?; let mut file = file.lock().await; file.write_all(&blob.bytes).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::Write => { let Some(blob) = km.lazy_load_blob else { @@ -272,7 +257,7 @@ async fn handle_request( }); }; fs::write(path, &blob.bytes).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::Append => { let Some(blob) = km.lazy_load_blob else { @@ -284,44 +269,31 @@ async fn handle_request( let mut file = file.lock().await; file.seek(SeekFrom::End(0)).await?; file.write_all(&blob.bytes).await?; - - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::SyncAll => { let file = open_file(open_files, path, false, false).await?; let file = file.lock().await; file.sync_all().await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::Read => { let contents = fs::read(&path).await?; - - ( - serde_json::to_vec(&VfsResponse::Read).unwrap(), - Some(contents), - ) + (VfsResponse::Read, Some(contents)) } VfsAction::ReadToEnd => { let file = open_file(open_files, path.clone(), false, false).await?; let mut file = file.lock().await; let mut contents = Vec::new(); - file.read_to_end(&mut contents).await?; - - ( - serde_json::to_vec(&VfsResponse::Read).unwrap(), - Some(contents), - ) + (VfsResponse::Read, Some(contents)) } VfsAction::ReadExact(length) => { let file = open_file(open_files, path, false, false).await?; let mut file = file.lock().await; let mut contents = vec![0; length as usize]; file.read_exact(&mut contents).await?; - ( - serde_json::to_vec(&VfsResponse::Read).unwrap(), - Some(contents), - ) + (VfsResponse::Read, Some(contents)) } VfsAction::ReadDir => { let mut dir = fs::read_dir(path).await?; @@ -338,20 +310,14 @@ async fn handle_request( }; entries.push(dir_entry); } - ( - serde_json::to_vec(&VfsResponse::ReadDir(entries)).unwrap(), - None, - ) + (VfsResponse::ReadDir(entries), None) } VfsAction::ReadToString => { let file = open_file(open_files, path, false, false).await?; let mut file = file.lock().await; let mut contents = String::new(); file.read_to_string(&mut contents).await?; - ( - serde_json::to_vec(&VfsResponse::ReadToString(contents)).unwrap(), - None, - ) + (VfsResponse::ReadToString(contents), None) } VfsAction::Seek { seek_from } => { let file = open_file(open_files, path, false, false).await?; @@ -363,59 +329,51 @@ async fn handle_request( lib::types::core::SeekFrom::Current(offset) => std::io::SeekFrom::Current(offset), }; let response = file.seek(seek_from).await?; - ( - serde_json::to_vec(&VfsResponse::SeekFrom(response)).unwrap(), - None, - ) + (VfsResponse::SeekFrom(response), None) } VfsAction::RemoveFile => { fs::remove_file(&path).await?; open_files.remove(&path); - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::RemoveDir => { fs::remove_dir(path).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::RemoveDirAll => { fs::remove_dir_all(path).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::Rename { new_path } => { let new_path = join_paths_safely(&vfs_path, &new_path); fs::rename(path, new_path).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::CopyFile { new_path } => { let new_path = join_paths_safely(&vfs_path, &new_path); fs::copy(path, new_path).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::Metadata => { let metadata = fs::metadata(&path).await?; - let file_type = get_file_type(&metadata); let meta = FileMetadata { len: metadata.len(), file_type, }; - - ( - serde_json::to_vec(&VfsResponse::Metadata(meta)).unwrap(), - None, - ) + (VfsResponse::Metadata(meta), None) } VfsAction::Len => { let file = open_file(open_files, path, false, false).await?; let file = file.lock().await; let len = file.metadata().await?.len(); - (serde_json::to_vec(&VfsResponse::Len(len)).unwrap(), None) + (VfsResponse::Len(len), None) } VfsAction::SetLen(len) => { let file = open_file(open_files, path, false, false).await?; let file = file.lock().await; file.set_len(len).await?; - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } VfsAction::Hash => { use sha2::{Digest, Sha256}; @@ -432,7 +390,7 @@ async fn handle_request( hasher.update(&buffer[..bytes_read]); } let hash: [u8; 32] = hasher.finalize().into(); - (serde_json::to_vec(&VfsResponse::Hash(hash)).unwrap(), None) + (VfsResponse::Hash(hash), None) } VfsAction::AddZip => { let Some(blob) = km.lazy_load_blob else { @@ -483,51 +441,40 @@ async fn handle_request( }); }; } - (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) + (VfsResponse::Ok, None) } }; if let Some(target) = km.rsvp.or_else(|| { expects_response.map(|_| Address { node: our_node.to_string(), - process: km.source.process.clone(), + process: km.source.process, }) }) { - let response = KernelMessage { - id: km.id, - source: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - target, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body, - metadata, - capabilities: vec![], + let _ = send_to_loop + .send(KernelMessage { + id: km.id, + source: Address { + node: our_node.to_string(), + process: VFS_PROCESS_ID.clone(), }, - None, - )), - lazy_load_blob: bytes.map(|bytes| LazyLoadBlob { - mime: Some("application/octet-stream".into()), - bytes, - }), - }; - - let _ = send_to_loop.send(response).await; - } else { - send_to_terminal - .send(Printout { - verbosity: 2, - content: format!( - "vfs: not sending response: {:?}", - serde_json::from_slice::(&body) - ), + target, + rsvp: None, + message: Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&response_body).unwrap(), + metadata, + capabilities: vec![], + }, + None, + )), + lazy_load_blob: bytes.map(|bytes| LazyLoadBlob { + mime: Some("application/octet-stream".into()), + bytes, + }), }) - .await - .unwrap(); + .await; } Ok(()) @@ -615,37 +562,20 @@ async fn open_file>( async fn check_caps( our_node: &str, - source: Address, - mut send_to_caps_oracle: CapMessageSender, - request: &VfsRequest, - path: PathBuf, - drive: String, - package_id: PackageId, - vfs_dir: PathBuf, + source: &Address, + send_to_caps_oracle: &CapMessageSender, + action: &VfsAction, + path: &PathBuf, + drive: &str, + package_id: &PackageId, + vfs_path: &PathBuf, ) -> Result<(), VfsError> { let src_package_id = PackageId::new(source.process.package(), source.process.publisher()); - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - // check for root cap (todo make temp buffer so this is more efficient?) - send_to_caps_oracle - .send(CapMessage::Has { - on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "root": true, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await?; - let has_root_cap = recv_cap_bool.await?; - - match &request.action { + // every action is valid if package has vfs root cap, but this should only be + // checked for *after* non-root caps are checked, because 99% of the time, + // package will have regular read/write cap regardless of root status. + match &action { VfsAction::CreateDir | VfsAction::CreateDirAll | VfsAction::CreateFile @@ -660,35 +590,18 @@ async fn check_caps( | VfsAction::RemoveDirAll | VfsAction::AddZip | VfsAction::SetLen(_) => { - if src_package_id == package_id { - return Ok(()); - } - - if has_root_cap { + if &src_package_id == package_id { return Ok(()); } - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - send_to_caps_oracle - .send(CapMessage::Has { - on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "kind": "write", - "drive": drive, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await?; - let has_cap = recv_cap_bool.await?; + let has_cap = + read_capability("write", drive, false, our_node, source, send_to_caps_oracle).await; if !has_cap { + // check for root cap + if read_capability("", "", true, our_node, source, send_to_caps_oracle).await { + return Ok(()); + } return Err(VfsError::NoCap { - action: request.action.to_string(), + action: action.to_string(), path: path.display().to_string(), }); } @@ -703,34 +616,18 @@ async fn check_caps( | VfsAction::Hash | VfsAction::Metadata | VfsAction::Len => { - if src_package_id == package_id { - return Ok(()); - } - if has_root_cap { + if &src_package_id == package_id { return Ok(()); } - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - send_to_caps_oracle - .send(CapMessage::Has { - on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "kind": "read", - "drive": drive, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await?; - let has_cap = recv_cap_bool.await?; + let has_cap = + read_capability("read", drive, false, our_node, source, send_to_caps_oracle).await; if !has_cap { + // check for root cap + if read_capability("", "", true, our_node, source, send_to_caps_oracle).await { + return Ok(()); + } return Err(VfsError::NoCap { - action: request.action.to_string(), + action: action.to_string(), path: path.display().to_string(), }); } @@ -738,42 +635,32 @@ async fn check_caps( } VfsAction::CopyFile { new_path } | VfsAction::Rename { new_path } => { // these have 2 paths to validate - if has_root_cap { - return Ok(()); - } - let (new_package_id, new_drive, _rest) = - parse_package_and_drive(new_path, &vfs_dir).await?; + parse_package_and_drive(new_path, &vfs_path).await?; - let new_drive = format!("/{}/{}", new_package_id, new_drive); + let new_drive = format!("/{new_package_id}/{new_drive}"); // if both new and old path are within the package_id path, ok - if (src_package_id == package_id) && (src_package_id == new_package_id) { + if (&src_package_id == package_id) && (src_package_id == new_package_id) { return Ok(()); } // otherwise check write caps. - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - send_to_caps_oracle - .send(CapMessage::Has { - on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "kind": "write", - "drive": drive, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await?; - let has_cap = recv_cap_bool.await?; + let has_cap = read_capability( + "write", + &drive, + false, + our_node, + source, + send_to_caps_oracle, + ) + .await; if !has_cap { + // check for root cap + if read_capability("", "", true, our_node, source, send_to_caps_oracle).await { + return Ok(()); + } return Err(VfsError::NoCap { - action: request.action.to_string(), + action: action.to_string(), path: path.display().to_string(), }); } @@ -783,73 +670,89 @@ async fn check_caps( return Ok(()); } - let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); - send_to_caps_oracle - .send(CapMessage::Has { - on: source.process.clone(), - cap: Capability { - issuer: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - params: serde_json::to_string(&serde_json::json!({ - "kind": "write", - "drive": new_drive, - })) - .unwrap(), - }, - responder: send_cap_bool, - }) - .await?; - let has_cap = recv_cap_bool.await?; + let has_cap = read_capability( + "write", + &new_drive, + false, + our_node, + source, + send_to_caps_oracle, + ) + .await; if !has_cap { + // check for root cap + if read_capability("", "", true, our_node, source, send_to_caps_oracle).await { + return Ok(()); + } return Err(VfsError::NoCap { - action: request.action.to_string(), + action: action.to_string(), path: path.display().to_string(), }); } - Ok(()) } VfsAction::CreateDrive => { - if src_package_id != package_id && !has_root_cap { - return Err(VfsError::NoCap { - action: request.action.to_string(), - path: path.display().to_string(), - }); + if &src_package_id != package_id { + // check for root cap + if !read_capability("", "", true, our_node, source, send_to_caps_oracle).await { + return Err(VfsError::NoCap { + action: action.to_string(), + path: path.display().to_string(), + }); + } } - - add_capability("read", &drive, &our_node, &source, &mut send_to_caps_oracle).await?; - add_capability( - "write", - &drive, - &our_node, - &source, - &mut send_to_caps_oracle, - ) - .await?; - - let drive_path = join_paths_safely(&vfs_dir, &drive); - fs::create_dir_all(drive_path).await?; + add_capability("read", &drive, &our_node, &source, send_to_caps_oracle).await?; + add_capability("write", &drive, &our_node, &source, send_to_caps_oracle).await?; Ok(()) } } } +async fn read_capability( + kind: &str, + drive: &str, + root: bool, + our_node: &str, + source: &Address, + send_to_caps_oracle: &CapMessageSender, +) -> bool { + let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); + if let Err(_) = send_to_caps_oracle + .send(CapMessage::Has { + on: source.process.clone(), + cap: Capability { + issuer: Address { + node: our_node.to_string(), + process: VFS_PROCESS_ID.clone(), + }, + params: if root { + "{{\"root\": true}}".to_string() + } else { + format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}") + }, + }, + responder: send_cap_bool, + }) + .await + { + return false; + } + recv_cap_bool.await.unwrap_or(false) +} + async fn add_capability( kind: &str, drive: &str, our_node: &str, source: &Address, - send_to_caps_oracle: &mut CapMessageSender, + send_to_caps_oracle: &CapMessageSender, ) -> Result<(), VfsError> { let cap = Capability { issuer: Address { node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, - params: serde_json::to_string(&serde_json::json!({ "kind": kind, "drive": drive })) - .unwrap(), + params: format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}"), }; let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); send_to_caps_oracle @@ -859,8 +762,13 @@ async fn add_capability( responder: send_cap_bool, }) .await?; - let _ = recv_cap_bool.await?; - Ok(()) + match recv_cap_bool.await? { + true => Ok(()), + false => Err(VfsError::NoCap { + action: "add_capability".to_string(), + path: drive.to_string(), + }), + } } fn get_file_type(metadata: &std::fs::Metadata) -> FileType { From c84245531449362519750caf6caed1c61653e9b7 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Mon, 17 Jun 2024 23:28:25 -0400 Subject: [PATCH 3/7] add builders for `KernelMessage` and `Printout`, WIP use them everywhere --- kinode/src/net/utils.rs | 14 +---- kinode/src/terminal/mod.rs | 47 ++++++-------- kinode/src/timer.rs | 54 ++++++---------- kinode/src/vfs.rs | 126 ++++++++++++++++--------------------- lib/src/core.rs | 85 +++++++++++++++++++++++++ 5 files changed, 178 insertions(+), 148 deletions(-) diff --git a/kinode/src/net/utils.rs b/kinode/src/net/utils.rs index cb46b3431..22d87abc6 100644 --- a/kinode/src/net/utils.rs +++ b/kinode/src/net/utils.rs @@ -361,20 +361,10 @@ pub async fn parse_hello_message( /// Create a terminal printout at verbosity level 0. pub async fn print_loud(print_tx: &PrintSender, content: &str) { - let _ = print_tx - .send(Printout { - verbosity: 0, - content: content.into(), - }) - .await; + Printout::new(0, content).send(print_tx).await; } /// Create a terminal printout at verbosity level 2. pub async fn print_debug(print_tx: &PrintSender, content: &str) { - let _ = print_tx - .send(Printout { - verbosity: 2, - content: content.into(), - }) - .await; + Printout::new(2, content).send(print_tx).await; } diff --git a/kinode/src/terminal/mod.rs b/kinode/src/terminal/mod.rs index 9c5a9eb1b..998a8c083 100644 --- a/kinode/src/terminal/mod.rs +++ b/kinode/src/terminal/mod.rs @@ -218,17 +218,12 @@ pub async fn terminal( 2 => verbose_mode = 3, _ => verbose_mode = 0, } - let _ = print_tx.send( - Printout { - verbosity: 0, - content: match verbose_mode { - 0 => "verbose mode: off".into(), - 1 => "verbose mode: debug".into(), - 2 => "verbose mode: super-debug".into(), - _ => "verbose mode: full event loop".into(), - } - } - ).await; + Printout::new(0, format!("verbose mode: {}", match verbose_mode { + 0 => "off", + 1 => "debug", + 2 => "super-debug", + _ => "full event loop", + })).send(&print_tx).await; if verbose_mode == 3 { let _ = debug_event_loop.send(DebugCommand::ToggleEventLoop).await; } @@ -243,15 +238,12 @@ pub async fn terminal( }) => { let _ = debug_event_loop.send(DebugCommand::ToggleStepthrough).await; in_step_through = !in_step_through; - let _ = print_tx.send( - Printout { - verbosity: 0, - content: match in_step_through { - false => "debug mode off".into(), - true => "debug mode on: use CTRL+S to step through events".into(), - } - } - ).await; + Printout::new(0, format!("debug mode {}", match in_step_through { + false => "off", + true => "on: use CTRL+S to step through events", + })) + .send(&print_tx) + .await; }, // @@ -273,15 +265,12 @@ pub async fn terminal( .. }) => { logging_mode = !logging_mode; - let _ = print_tx.send( - Printout { - verbosity: 0, - content: match logging_mode { - true => "logging mode: on".into(), - false => "logging mode: off".into(), - } - } - ).await; + Printout::new( + 0, + format!("logging mode: {}", if logging_mode { "on" } else { "off" }) + ) + .send(&print_tx) + .await; }, // // UP / CTRL+P: go up one command in history diff --git a/kinode/src/timer.rs b/kinode/src/timer.rs index b4fe5c6f8..2a01282f2 100644 --- a/kinode/src/timer.rs +++ b/kinode/src/timer.rs @@ -39,23 +39,14 @@ pub async fn timer_service( // we only handle Requests let Message::Request(req) = km.message else { continue }; let Ok(timer_action) = serde_json::from_slice::(&req.body) else { - let _ = print_tx.send(Printout { - verbosity: 1, - content: "timer service received a request with an invalid body".to_string(), - }).await; + Printout::new(1, "timer service received a request with an invalid body").send(&print_tx).await; continue }; match timer_action { TimerAction::Debug => { - let _ = print_tx.send(Printout { - verbosity: 0, - content: format!("timer service active timers ({}):", timer_map.timers.len()), - }).await; + Printout::new(0, format!("timer service active timers ({}):", timer_map.timers.len())).send(&print_tx).await; for (k, v) in timer_map.timers.iter() { - let _ = print_tx.send(Printout { - verbosity: 0, - content: format!("{}: {:?}", k, v), - }).await; + Printout::new(0, format!("{k}: {v:?}")).send(&print_tx).await; } continue } @@ -72,10 +63,7 @@ pub async fn timer_service( send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; continue } - let _ = print_tx.send(Printout { - verbosity: 3, - content: format!("set timer to pop in {}ms", timer_millis), - }).await; + Printout::new(3, format!("set timer to pop in {timer_millis}ms")).send(&print_tx).await; if !timer_map.contains(pop_time) { timer_tasks.spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await; @@ -121,25 +109,21 @@ impl TimerMap { } async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) { - let _ = send_to_loop - .send(KernelMessage { - id, - source: Address { - node: our_node.to_string(), - process: TIMER_PROCESS_ID.clone(), + KernelMessage::builder() + .id(id) + .source((our_node, TIMER_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( + Response { + inherit: false, + body: vec![], + metadata: None, + capabilities: vec![], }, - target, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: vec![], - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) + None, + ))) + .build() + .unwrap() + .send(send_to_loop) .await; } diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index c1e5d8d94..bed0f695b 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -36,15 +36,15 @@ pub async fn vfs( while let Some(km) = recv_from_loop.recv().await { if *our_node != km.source.node { - let _ = send_to_terminal - .send(Printout { - verbosity: 1, - content: format!( - "vfs: got request from {}, but requests must come from our node {our_node}\r", - km.source.node, + Printout::new( + 1, + format!( + "vfs: got request from {}, but requests must come from our node {our_node}", + km.source.node ), - }) - .await; + ) + .send(&send_to_terminal) + .await; continue; } @@ -81,20 +81,10 @@ pub async fn vfs( ) .await { - let _ = send_to_terminal - .send(Printout { - verbosity: 1, - content: format!("vfs: {e}\r"), - }) - .await; - let _ = send_to_loop - .send(make_error_message( - our_node.to_string(), - km_id, - km_source, - e, - )) + Printout::new(1, format!("vfs: {e}")) + .send(&send_to_terminal) .await; + make_error_message(&our_node, km_id, km_source, e, &send_to_loop).await; } } }); @@ -152,15 +142,11 @@ async fn handle_request( entries.push(dir_entry); } - let response = KernelMessage { - id: km.id, - source: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), - }, - target: km.source, - rsvp: None, - message: Message::Response(( + KernelMessage::builder() + .id(km.id) + .source((our_node, VFS_PROCESS_ID.clone())) + .target(km.source) + .message(Message::Response(( Response { inherit: false, body: serde_json::to_vec(&VfsResponse::ReadDir(entries)).unwrap(), @@ -168,11 +154,11 @@ async fn handle_request( capabilities: vec![], }, None, - )), - lazy_load_blob: None, - }; - - let _ = send_to_loop.send(response).await; + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; return Ok(()); } else { return Err(VfsError::NoCap { @@ -451,29 +437,26 @@ async fn handle_request( process: km.source.process, }) }) { - let _ = send_to_loop - .send(KernelMessage { - id: km.id, - source: Address { - node: our_node.to_string(), - process: VFS_PROCESS_ID.clone(), + KernelMessage::builder() + .id(km.id) + .source((our_node, VFS_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&response_body).unwrap(), + metadata, + capabilities: vec![], }, - target, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&response_body).unwrap(), - metadata, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: bytes.map(|bytes| LazyLoadBlob { - mime: Some("application/octet-stream".into()), - bytes, - }), - }) + None, + ))) + .lazy_load_blob(bytes.map(|bytes| LazyLoadBlob { + mime: Some("application/octet-stream".into()), + bytes, + })) + .build() + .unwrap() + .send(send_to_loop) .await; } @@ -822,21 +805,18 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf { base.join(extension_path) } -fn make_error_message( - our_node: String, +async fn make_error_message( + our_node: &str, id: u64, source: Address, error: VfsError, -) -> KernelMessage { - KernelMessage { - id, - source: Address { - node: our_node, - process: VFS_PROCESS_ID.clone(), - }, - target: source, - rsvp: None, - message: Message::Response(( + send_to_loop: &MessageSender, +) { + KernelMessage::builder() + .id(id) + .source((our_node, VFS_PROCESS_ID.clone())) + .target(source) + .message(Message::Response(( Response { inherit: false, body: serde_json::to_vec(&VfsResponse::Err(error)).unwrap(), @@ -844,7 +824,9 @@ fn make_error_message( capabilities: vec![], }, None, - )), - lazy_load_blob: None, - } + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; } diff --git a/lib/src/core.rs b/lib/src/core.rs index a675e778f..b97a7c61a 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1138,6 +1138,75 @@ pub struct KernelMessage { pub lazy_load_blob: Option, } +impl KernelMessage { + pub fn builder() -> KernelMessageBuilder { + KernelMessageBuilder::default() + } + + pub async fn send(self, sender: &MessageSender) { + sender.send(self).await.expect("kernel message sender died"); + } +} + +#[derive(Default)] +pub struct KernelMessageBuilder { + id: u64, + source: Option
, + target: Option
, + rsvp: Rsvp, + message: Option, + lazy_load_blob: Option, +} + +impl KernelMessageBuilder { + pub fn id(mut self, id: u64) -> Self { + self.id = id; + self + } + + pub fn source(mut self, source: T) -> Self + where + T: Into
, + { + self.source = Some(source.into()); + self + } + + pub fn target(mut self, target: T) -> Self + where + T: Into
, + { + self.target = Some(target.into()); + self + } + + pub fn rsvp(mut self, rsvp: Rsvp) -> Self { + self.rsvp = rsvp; + self + } + + pub fn message(mut self, message: Message) -> Self { + self.message = Some(message); + self + } + + pub fn lazy_load_blob(mut self, blob: Option) -> Self { + self.lazy_load_blob = blob; + self + } + + pub fn build(self) -> Result { + Ok(KernelMessage { + id: self.id, + source: self.source.ok_or("Source address is required")?, + target: self.target.ok_or("Target address is required")?, + rsvp: self.rsvp, + message: self.message.ok_or("Message is required")?, + lazy_load_blob: self.lazy_load_blob, + }) + } +} + impl std::fmt::Display for KernelMessage { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( @@ -1173,6 +1242,22 @@ pub struct Printout { pub content: String, } +impl Printout { + pub fn new(verbosity: u8, content: T) -> Self + where + T: Into, + { + Self { + verbosity, + content: content.into(), + } + } + + pub async fn send(self, sender: &PrintSender) { + sender.send(self).await.expect("print sender died"); + } +} + // kernel sets in case, e.g., // A requests response from B does not request response from C // -> kernel sets `Some(A) = Rsvp` for B's request to C From c7329d90e27fbe89f5bdbf80cfecd60d1765c7f6 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 25 Jun 2024 12:13:41 +0200 Subject: [PATCH 4/7] refactor kv, sqlite for clean --- kinode/src/kv.rs | 214 +++++++++++++++++++------------------- kinode/src/sqlite.rs | 238 +++++++++++++++++++++---------------------- kinode/src/vfs.rs | 15 +-- 3 files changed, 235 insertions(+), 232 deletions(-) diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index b78bee0ae..48482777d 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -1,13 +1,15 @@ -use anyhow::Result; use dashmap::DashMap; -// use rocksdb::checkpoint::Checkpoint; +use lib::types::core::{ + Address, CapMessage, CapMessageSender, Capability, KernelMessage, KvAction, KvError, KvRequest, + KvResponse, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, + Printout, ProcessId, Request, Response, KV_PROCESS_ID, +}; use rocksdb::OptimisticTransactionDB; -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; -use tokio::fs; -use tokio::sync::Mutex; - -use lib::types::core::*; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; +use tokio::{fs, sync::Mutex}; pub async fn kv( our_node: String, @@ -17,85 +19,88 @@ pub async fn kv( send_to_caps_oracle: CapMessageSender, home_directory_path: String, ) -> anyhow::Result<()> { - let kv_path = format!("{}/kv", &home_directory_path); - - if let Err(e) = fs::create_dir_all(&kv_path).await { - panic!("failed creating kv dir! {:?}", e); + let kv_path = Arc::new(format!("{home_directory_path}/kv")); + if let Err(e) = fs::create_dir_all(&*kv_path).await { + panic!("failed creating kv dir! {e:?}"); } let open_kvs: Arc> = Arc::new(DashMap::new()); let txs: Arc>)>>> = Arc::new(DashMap::new()); - let mut process_queues: HashMap>>> = - HashMap::new(); - - loop { - tokio::select! { - Some(km) = recv_from_loop.recv() => { - if our_node.clone() != km.source.node { - println!( - "kv: request must come from our_node={}, got: {}", - our_node, - km.source.node, - ); - continue; - } + let process_queues: HashMap>>> = HashMap::new(); - let queue = process_queues - .entry(km.source.process.clone()) - .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - .clone(); + while let Some(km) = recv_from_loop.recv().await { + if *our_node != km.source.node { + Printout::new( + 1, + format!( + "kv: got request from {}, but requests must come from our node {our_node}", + km.source.node + ), + ) + .send(&send_to_terminal) + .await; + continue; + } + + let queue = process_queues + .get(&km.source.process) + .cloned() + .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); + + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km); + } + // clone Arcs + let our_node = our_node.clone(); + let send_to_loop = send_to_loop.clone(); + let send_to_terminal = send_to_terminal.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); + let open_kvs = open_kvs.clone(); + let txs = txs.clone(); + let kv_path = kv_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + let (km_id, km_rsvp) = + (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); + + if let Err(e) = handle_request( + &our_node, + km, + open_kvs, + txs, + &send_to_loop, + &send_to_terminal, + &send_to_caps_oracle, + &kv_path, + ) + .await { - let mut queue_lock = queue.lock().await; - queue_lock.push_back(km.clone()); + Printout::new(1, format!("kv: {e}")) + .send(&send_to_terminal) + .await; + make_error_message(&our_node, km_id, km_rsvp, e, &send_to_loop).await; } - - // clone Arcs - let our_node = our_node.clone(); - let send_to_caps_oracle = send_to_caps_oracle.clone(); - let send_to_terminal = send_to_terminal.clone(); - let send_to_loop = send_to_loop.clone(); - let open_kvs = open_kvs.clone(); - let txs = txs.clone(); - let kv_path = kv_path.clone(); - - tokio::spawn(async move { - let mut queue_lock = queue.lock().await; - if let Some(km) = queue_lock.pop_front() { - if let Err(e) = handle_request( - our_node.clone(), - km.clone(), - open_kvs.clone(), - txs.clone(), - send_to_loop.clone(), - send_to_terminal.clone(), - send_to_caps_oracle.clone(), - kv_path.clone(), - ) - .await - { - let _ = send_to_loop - .send(make_error_message(our_node.clone(), &km, e)) - .await; - } - } - }); } - } + }); } + Ok(()) } async fn handle_request( - our_node: String, + our_node: &str, km: KernelMessage, open_kvs: Arc>, txs: Arc>)>>>, - send_to_loop: MessageSender, - send_to_terminal: PrintSender, - send_to_caps_oracle: CapMessageSender, - kv_path: String, + send_to_loop: &MessageSender, + send_to_terminal: &PrintSender, + send_to_caps_oracle: &CapMessageSender, + kv_path: &str, ) -> Result<(), KvError> { let KernelMessage { id, @@ -103,13 +108,13 @@ async fn handle_request( message, lazy_load_blob: blob, .. - } = km.clone(); + } = km; let Message::Request(Request { body, expects_response, metadata, .. - }) = message.clone() + }) = message else { return Err(KvError::InputError { error: "not a request".into(), @@ -127,12 +132,12 @@ async fn handle_request( }; check_caps( - our_node.clone(), - source.clone(), - open_kvs.clone(), - send_to_caps_oracle.clone(), + our_node, + &source, + &open_kvs, + send_to_caps_oracle, &request, - kv_path.clone(), + kv_path, ) .await?; @@ -282,14 +287,14 @@ async fn handle_request( if let Some(target) = km.rsvp.or_else(|| { expects_response.map(|_| Address { - node: our_node.clone(), + node: our_node.to_string(), process: source.process.clone(), }) }) { let response = KernelMessage { id, source: Address { - node: our_node.clone(), + node: our_node.to_string(), process: KV_PROCESS_ID.clone(), }, target, @@ -327,12 +332,12 @@ async fn handle_request( } async fn check_caps( - our_node: String, - source: Address, - open_kvs: Arc>, - mut send_to_caps_oracle: CapMessageSender, + our_node: &str, + source: &Address, + open_kvs: &Arc>, + send_to_caps_oracle: &CapMessageSender, request: &KvRequest, - kv_path: String, + kv_path: &str, ) -> Result<(), KvError> { let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); let src_package_id = PackageId::new(source.process.package(), source.process.publisher()); @@ -347,7 +352,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: KV_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -373,7 +378,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: KV_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -405,7 +410,7 @@ async fn check_caps( &request.db.to_string(), &our_node, &source, - &mut send_to_caps_oracle, + send_to_caps_oracle, ) .await?; add_capability( @@ -413,7 +418,7 @@ async fn check_caps( &request.db.to_string(), &our_node, &source, - &mut send_to_caps_oracle, + send_to_caps_oracle, ) .await?; @@ -451,7 +456,7 @@ async fn add_capability( db: &str, our_node: &str, source: &Address, - send_to_caps_oracle: &mut CapMessageSender, + send_to_caps_oracle: &CapMessageSender, ) -> Result<(), KvError> { let cap = Capability { issuer: Address { @@ -472,19 +477,18 @@ async fn add_capability( Ok(()) } -fn make_error_message(our_name: String, km: &KernelMessage, error: KvError) -> KernelMessage { - KernelMessage { - id: km.id, - source: Address { - node: our_name.clone(), - process: KV_PROCESS_ID.clone(), - }, - target: match &km.rsvp { - None => km.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - rsvp: None, - message: Message::Response(( +async fn make_error_message( + our_node: &str, + id: u64, + source: Address, + error: KvError, + send_to_loop: &MessageSender, +) { + KernelMessage::builder() + .id(id) + .source((our_node, KV_PROCESS_ID.clone())) + .target(source) + .message(Message::Response(( Response { inherit: false, body: serde_json::to_vec(&KvResponse::Err { error }).unwrap(), @@ -492,9 +496,11 @@ fn make_error_message(our_name: String, km: &KernelMessage, error: KvError) -> K capabilities: vec![], }, None, - )), - lazy_load_blob: None, - } + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; } fn rocks_to_kv_err(error: rocksdb::Error) -> KvError { diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index 2fb53ccfc..93c1952da 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -1,32 +1,23 @@ -use anyhow::Result; use base64::{engine::general_purpose::STANDARD as base64_standard, Engine}; use dashmap::DashMap; +use lib::types::core::{ + Address, CapMessage, CapMessageSender, Capability, KernelMessage, LazyLoadBlob, Message, + MessageReceiver, MessageSender, PackageId, PrintSender, Printout, ProcessId, Request, Response, + SqlValue, SqliteAction, SqliteError, SqliteRequest, SqliteResponse, SQLITE_PROCESS_ID, +}; use rusqlite::Connection; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::Arc; -use tokio::fs; -use tokio::sync::Mutex; - -use lib::types::core::*; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + sync::Arc, +}; +use tokio::{fs, sync::Mutex}; lazy_static::lazy_static! { - static ref READ_KEYWORDS: HashSet = { - let mut set = HashSet::new(); - let keywords = ["ANALYZE", "ATTACH", "BEGIN", "EXPLAIN", "PRAGMA", "SELECT", "VALUES", "WITH"]; - for &keyword in &keywords { - set.insert(keyword.to_string()); - } - set - }; + static ref READ_KEYWORDS: HashSet<&'static str> = + HashSet::from(["ANALYZE", "ATTACH", "BEGIN", "EXPLAIN", "PRAGMA", "SELECT", "VALUES", "WITH"]); - static ref WRITE_KEYWORDS: HashSet = { - let mut set = HashSet::new(); - let keywords = ["ALTER", "ANALYZE", "COMMIT", "CREATE", "DELETE", "DETACH", "DROP", "END", "INSERT", "REINDEX", "RELEASE", "RENAME", "REPLACE", "ROLLBACK", "SAVEPOINT", "UPDATE", "VACUUM"]; - for &keyword in &keywords { - set.insert(keyword.to_string()); - } - set - }; + static ref WRITE_KEYWORDS: HashSet<&'static str> = + HashSet::from(["ALTER", "ANALYZE", "COMMIT", "CREATE", "DELETE", "DETACH", "DROP", "END", "INSERT", "REINDEX", "RELEASE", "RENAME", "REPLACE", "ROLLBACK", "SAVEPOINT", "UPDATE", "VACUUM"]); } pub async fn sqlite( @@ -37,85 +28,87 @@ pub async fn sqlite( send_to_caps_oracle: CapMessageSender, home_directory_path: String, ) -> anyhow::Result<()> { - let sqlite_path = format!("{}/sqlite", &home_directory_path); - - if let Err(e) = fs::create_dir_all(&sqlite_path).await { - panic!("failed creating sqlite dir! {:?}", e); + let sqlite_path = Arc::new(format!("{home_directory_path}/sqlite")); + if let Err(e) = fs::create_dir_all(&*sqlite_path).await { + panic!("failed creating sqlite dir! {e:?}"); } let open_dbs: Arc>> = Arc::new(DashMap::new()); let txs: Arc)>>> = Arc::new(DashMap::new()); - let mut process_queues: HashMap>>> = - HashMap::new(); - - loop { - tokio::select! { - Some(km) = recv_from_loop.recv() => { - if our_node.clone() != km.source.node { - println!( - "sqlite: request must come from our_node={}, got: {}", - our_node, - km.source.node, - ); - continue; - } + let process_queues: HashMap>>> = HashMap::new(); + + while let Some(km) = recv_from_loop.recv().await { + if *our_node != km.source.node { + Printout::new( + 1, + format!( + "sqlite: got request from {}, but requests must come from our node {our_node}", + km.source.node + ), + ) + .send(&send_to_terminal) + .await; + continue; + } - let queue = process_queues - .entry(km.source.process.clone()) - .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - .clone(); + let queue = process_queues + .get(&km.source.process) + .cloned() + .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km); + } + + // clone Arcs + let our_node = our_node.clone(); + let send_to_loop = send_to_loop.clone(); + let send_to_terminal = send_to_terminal.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); + let open_dbs = open_dbs.clone(); + let txs = txs.clone(); + let sqlite_path = sqlite_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + let (km_id, km_rsvp) = + (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); + + if let Err(e) = handle_request( + &our_node, + km, + open_dbs, + txs, + &send_to_loop, + &send_to_terminal, + &send_to_caps_oracle, + &sqlite_path, + ) + .await { - let mut queue_lock = queue.lock().await; - queue_lock.push_back(km.clone()); + Printout::new(1, format!("sqlite: {e}")) + .send(&send_to_terminal) + .await; + make_error_message(&our_node, km_id, km_rsvp, e, &send_to_loop).await; } - - // clone Arcs - let our_node = our_node.clone(); - let send_to_caps_oracle = send_to_caps_oracle.clone(); - let send_to_terminal = send_to_terminal.clone(); - let send_to_loop = send_to_loop.clone(); - let open_dbs = open_dbs.clone(); - - let txs = txs.clone(); - let sqlite_path = sqlite_path.clone(); - - tokio::spawn(async move { - let mut queue_lock = queue.lock().await; - if let Some(km) = queue_lock.pop_front() { - if let Err(e) = handle_request( - our_node.clone(), - km.clone(), - open_dbs.clone(), - txs.clone(), - send_to_loop.clone(), - send_to_terminal.clone(), - send_to_caps_oracle.clone(), - sqlite_path.clone(), - ) - .await - { - let _ = send_to_loop - .send(make_error_message(our_node.clone(), &km, e)) - .await; - } - } - }); } - } + }); } + Ok(()) } async fn handle_request( - our_node: String, + our_node: &str, km: KernelMessage, open_dbs: Arc>>, txs: Arc)>>>, - send_to_loop: MessageSender, - send_to_terminal: PrintSender, - send_to_caps_oracle: CapMessageSender, - sqlite_path: String, + send_to_loop: &MessageSender, + send_to_terminal: &PrintSender, + send_to_caps_oracle: &CapMessageSender, + sqlite_path: &str, ) -> Result<(), SqliteError> { let KernelMessage { id, @@ -123,13 +116,13 @@ async fn handle_request( message, lazy_load_blob: blob, .. - } = km.clone(); + } = km; let Message::Request(Request { body, expects_response, metadata, .. - }) = message.clone() + }) = message else { return Err(SqliteError::InputError { error: "not a request".into(), @@ -147,12 +140,12 @@ async fn handle_request( }; check_caps( - our_node.clone(), - source.clone(), - open_dbs.clone(), - send_to_caps_oracle.clone(), + our_node, + &source, + &open_dbs, + send_to_caps_oracle, &request, - sqlite_path.clone(), + sqlite_path, ) .await?; @@ -178,7 +171,7 @@ async fn handle_request( .next() .map(|word| word.to_uppercase()) .unwrap_or("".to_string()); - if !READ_KEYWORDS.contains(&first_word) { + if !READ_KEYWORDS.contains(first_word.as_str()) { return Err(SqliteError::NotAReadKeyword); } @@ -236,7 +229,7 @@ async fn handle_request( .map(|word| word.to_uppercase()) .unwrap_or("".to_string()); - if !WRITE_KEYWORDS.contains(&first_word) { + if !WRITE_KEYWORDS.contains(first_word.as_str()) { return Err(SqliteError::NotAWriteKeyword); } @@ -306,14 +299,14 @@ async fn handle_request( if let Some(target) = km.rsvp.or_else(|| { expects_response.map(|_| Address { - node: our_node.clone(), + node: our_node.to_string(), process: source.process.clone(), }) }) { let response = KernelMessage { id, source: Address { - node: our_node.clone(), + node: our_node.to_string(), process: SQLITE_PROCESS_ID.clone(), }, target, @@ -351,12 +344,12 @@ async fn handle_request( } async fn check_caps( - our_node: String, - source: Address, - open_dbs: Arc>>, - mut send_to_caps_oracle: CapMessageSender, + our_node: &str, + source: &Address, + open_dbs: &Arc>>, + send_to_caps_oracle: &CapMessageSender, request: &SqliteRequest, - sqlite_path: String, + sqlite_path: &str, ) -> Result<(), SqliteError> { let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); let src_package_id = PackageId::new(source.process.package(), source.process.publisher()); @@ -368,7 +361,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: SQLITE_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -394,7 +387,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: SQLITE_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -426,7 +419,7 @@ async fn check_caps( &request.db.to_string(), &our_node, &source, - &mut send_to_caps_oracle, + send_to_caps_oracle, ) .await?; add_capability( @@ -434,7 +427,7 @@ async fn check_caps( &request.db.to_string(), &our_node, &source, - &mut send_to_caps_oracle, + send_to_caps_oracle, ) .await?; @@ -481,7 +474,7 @@ async fn add_capability( db: &str, our_node: &str, source: &Address, - send_to_caps_oracle: &mut CapMessageSender, + send_to_caps_oracle: &CapMessageSender, ) -> Result<(), SqliteError> { let cap = Capability { issuer: Address { @@ -544,19 +537,18 @@ fn get_json_params(blob: Option) -> Result, SqliteEr } } -fn make_error_message(our_name: String, km: &KernelMessage, error: SqliteError) -> KernelMessage { - KernelMessage { - id: km.id, - source: Address { - node: our_name.clone(), - process: SQLITE_PROCESS_ID.clone(), - }, - target: match &km.rsvp { - None => km.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - rsvp: None, - message: Message::Response(( +async fn make_error_message( + our_node: &str, + id: u64, + source: Address, + error: SqliteError, + send_to_loop: &MessageSender, +) { + KernelMessage::builder() + .id(id) + .source((our_node, SQLITE_PROCESS_ID.clone())) + .target(source) + .message(Message::Response(( Response { inherit: false, body: serde_json::to_vec(&SqliteResponse::Err { error }).unwrap(), @@ -564,7 +556,9 @@ fn make_error_message(our_name: String, km: &KernelMessage, error: SqliteError) capabilities: vec![], }, None, - )), - lazy_load_blob: None, - } + ))) + .build() + .unwrap() + .send(send_to_loop) + .await; } diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index bed0f695b..6e7dfad7d 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -5,10 +5,12 @@ use lib::types::core::{ Printout, ProcessId, Request, Response, VfsAction, VfsError, VfsRequest, VfsResponse, KERNEL_PROCESS_ID, VFS_PROCESS_ID, }; -use std::collections::{HashMap, VecDeque}; -use std::io::Read; -use std::path::{Component, Path, PathBuf}; -use std::sync::Arc; +use std::{ + collections::{HashMap, VecDeque}, + io::Read, + path::{Component, Path, PathBuf}, + sync::Arc, +}; use tokio::{ fs, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, @@ -69,7 +71,8 @@ pub async fn vfs( tokio::spawn(async move { let mut queue_lock = queue.lock().await; if let Some(km) = queue_lock.pop_front() { - let (km_id, km_source) = (km.id.clone(), km.source.clone()); + let (km_id, km_rsvp) = + (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); if let Err(e) = handle_request( &our_node, @@ -84,7 +87,7 @@ pub async fn vfs( Printout::new(1, format!("vfs: {e}")) .send(&send_to_terminal) .await; - make_error_message(&our_node, km_id, km_source, e, &send_to_loop).await; + make_error_message(&our_node, km_id, km_rsvp, e, &send_to_loop).await; } } }); From dc6c7fe76e3a2a1cb58d5aed85538e96b224807c Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 25 Jun 2024 15:15:52 +0200 Subject: [PATCH 5/7] cleanup state modules more --- kinode/src/kv.rs | 46 ++++------ kinode/src/main.rs | 6 +- kinode/src/sqlite.rs | 47 ++++------ kinode/src/state.rs | 209 +++++++++++++++++++------------------------ kinode/src/vfs.rs | 49 ++++------ 5 files changed, 152 insertions(+), 205 deletions(-) diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index 48482777d..27db68348 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -12,7 +12,7 @@ use std::{ use tokio::{fs, sync::Mutex}; pub async fn kv( - our_node: String, + our_node: Arc, send_to_loop: MessageSender, send_to_terminal: PrintSender, mut recv_from_loop: MessageReceiver, @@ -84,7 +84,23 @@ pub async fn kv( Printout::new(1, format!("kv: {e}")) .send(&send_to_terminal) .await; - make_error_message(&our_node, km_id, km_rsvp, e, &send_to_loop).await; + KernelMessage::builder() + .id(km_id) + .source((our_node.as_str(), KV_PROCESS_ID.clone())) + .target(km_rsvp) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&KvResponse::Err { error: e }).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&send_to_loop) + .await; } } }); @@ -477,32 +493,6 @@ async fn add_capability( Ok(()) } -async fn make_error_message( - our_node: &str, - id: u64, - source: Address, - error: KvError, - send_to_loop: &MessageSender, -) { - KernelMessage::builder() - .id(id) - .source((our_node, KV_PROCESS_ID.clone())) - .target(source) - .message(Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&KvResponse::Err { error }).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - ))) - .build() - .unwrap() - .send(send_to_loop) - .await; -} - fn rocks_to_kv_err(error: rocksdb::Error) -> KvError { KvError::RocksDBError { action: "".into(), diff --git a/kinode/src/main.rs b/kinode/src/main.rs index 930aa60c9..b4695b5b8 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -333,7 +333,7 @@ async fn main() { *matches.get_one::("reveal-ip").unwrap_or(&true), )); tasks.spawn(state::state_sender( - our.name.clone(), + our_name_arc.clone(), kernel_message_sender.clone(), print_sender.clone(), state_receiver, @@ -341,7 +341,7 @@ async fn main() { home_directory_path.clone(), )); tasks.spawn(kv::kv( - our.name.clone(), + our_name_arc.clone(), kernel_message_sender.clone(), print_sender.clone(), kv_receiver, @@ -349,7 +349,7 @@ async fn main() { home_directory_path.clone(), )); tasks.spawn(sqlite::sqlite( - our.name.clone(), + our_name_arc.clone(), kernel_message_sender.clone(), print_sender.clone(), sqlite_receiver, diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index 93c1952da..b29404a21 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -21,7 +21,7 @@ lazy_static::lazy_static! { } pub async fn sqlite( - our_node: String, + our_node: Arc, send_to_loop: MessageSender, send_to_terminal: PrintSender, mut recv_from_loop: MessageReceiver, @@ -92,7 +92,24 @@ pub async fn sqlite( Printout::new(1, format!("sqlite: {e}")) .send(&send_to_terminal) .await; - make_error_message(&our_node, km_id, km_rsvp, e, &send_to_loop).await; + KernelMessage::builder() + .id(km_id) + .source((our_node.as_str(), SQLITE_PROCESS_ID.clone())) + .target(km_rsvp) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&SqliteResponse::Err { error: e }) + .unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&send_to_loop) + .await; } } }); @@ -536,29 +553,3 @@ fn get_json_params(blob: Option) -> Result, SqliteEr }, } } - -async fn make_error_message( - our_node: &str, - id: u64, - source: Address, - error: SqliteError, - send_to_loop: &MessageSender, -) { - KernelMessage::builder() - .id(id) - .source((our_node, SQLITE_PROCESS_ID.clone())) - .target(source) - .message(Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&SqliteResponse::Err { error }).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - ))) - .build() - .unwrap() - .send(send_to_loop) - .await; -} diff --git a/kinode/src/state.rs b/kinode/src/state.rs index 819f6cd31..c0ecb3d2d 100644 --- a/kinode/src/state.rs +++ b/kinode/src/state.rs @@ -1,16 +1,18 @@ -use anyhow::Result; +use lib::types::core::{ + Address, Capability, Erc721Metadata, KernelMessage, LazyLoadBlob, Message, MessageReceiver, + MessageSender, NetworkErrorSender, OnExit, PackageManifestEntry, PersistedProcess, PrintSender, + Printout, ProcessId, ProcessMap, Request, Response, ReverseCapIndex, StateAction, StateError, + StateResponse, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID, +}; use ring::signature; -use rocksdb::checkpoint::Checkpoint; -use rocksdb::{Options, DB}; -use std::collections::{HashMap, VecDeque}; -use std::io::Read; -use std::path::Path; -use std::sync::Arc; -use tokio::fs; -use tokio::io::AsyncWriteExt; -use tokio::sync::Mutex; - -use lib::types::core::*; +use rocksdb::{checkpoint::Checkpoint, Options, DB}; +use std::{ + collections::{HashMap, VecDeque}, + io::Read, + path::Path, + sync::Arc, +}; +use tokio::{fs, io::AsyncWriteExt, sync::Mutex}; include!("bootstrapped_processes.rs"); @@ -20,27 +22,19 @@ pub async fn load_state( home_directory_path: String, runtime_extensions: Vec<(ProcessId, MessageSender, Option, bool)>, ) -> Result<(ProcessMap, DB, ReverseCapIndex), StateError> { - let state_path = format!("{}/kernel", &home_directory_path); - + let state_path = format!("{home_directory_path}/kernel"); if let Err(e) = fs::create_dir_all(&state_path).await { - panic!("failed creating kernel state dir! {:?}", e); + panic!("failed creating kernel state dir! {e:?}"); } - // more granular kernel_state in column families - - // let mut options = Option::default().unwrap(); - // options.create_if_missing(true); - //let db = DB::open_default(&state_directory_path_str).unwrap(); let mut opts = Options::default(); opts.create_if_missing(true); - // let cf_name = "kernel_state"; - // let cf_descriptor = ColumnFamilyDescriptor::new(cf_name, Options::default()); let db = DB::open_default(state_path).unwrap(); let mut process_map: ProcessMap = HashMap::new(); let mut reverse_cap_index: ReverseCapIndex = HashMap::new(); - let kernel_id = process_to_vec(KERNEL_PROCESS_ID.clone()); - match db.get(&kernel_id) { + let kernel_id_vec = process_to_vec(KERNEL_PROCESS_ID.clone()); + match db.get(&kernel_id_vec) { Ok(Some(value)) => { process_map = bincode::deserialize::(&value).unwrap(); // if our networking key changed, we need to re-sign all local caps @@ -56,11 +50,11 @@ pub async fn load_state( }); } Ok(None) => { - db.put(&kernel_id, bincode::serialize(&process_map).unwrap()) + db.put(&kernel_id_vec, bincode::serialize(&process_map).unwrap()) .unwrap(); } Err(e) => { - panic!("failed to load kernel state from db: {:?}", e); + panic!("failed to load kernel state from db: {e:?}"); } } @@ -71,7 +65,7 @@ pub async fn load_state( bootstrap( &our_name, keypair, - home_directory_path.clone(), + home_directory_path, runtime_extensions, &mut process_map, &mut reverse_cap_index, @@ -83,7 +77,7 @@ pub async fn load_state( } pub async fn state_sender( - our_name: String, + our_node: Arc, send_to_loop: MessageSender, send_to_terminal: PrintSender, mut recv_state: MessageReceiver, @@ -91,68 +85,83 @@ pub async fn state_sender( home_directory_path: String, ) -> Result<(), anyhow::Error> { let db = Arc::new(db); + let home_directory_path = Arc::new(home_directory_path); - let mut process_queues: HashMap>>> = - HashMap::new(); - - loop { - tokio::select! { - Some(km) = recv_state.recv() => { - if our_name != km.source.node { - println!( - "state: request must come from our_name={}, got: {}", - our_name, &km, - ); - continue; - } + let process_queues: HashMap>>> = HashMap::new(); - let queue = process_queues - .entry(km.source.process.clone()) - .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - .clone(); + while let Some(km) = recv_state.recv().await { + if *our_node != km.source.node { + Printout::new( + 1, + format!( + "state: got request from {}, but requests must come from our node {our_node}", + km.source.node + ), + ) + .send(&send_to_terminal) + .await; + continue; + } + let queue = process_queues + .get(&km.source.process) + .cloned() + .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); + + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km); + } + + let our_node = our_node.clone(); + let db_clone = db.clone(); + let send_to_loop = send_to_loop.clone(); + let send_to_terminal = send_to_terminal.clone(); + let home_directory_path = home_directory_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + let (km_id, km_rsvp) = + (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); + + if let Err(e) = + handle_request(&our_node, km, db_clone, &send_to_loop, &home_directory_path) + .await { - let mut queue_lock = queue.lock().await; - queue_lock.push_back(km.clone()); + Printout::new(1, format!("state: {e}")) + .send(&send_to_terminal) + .await; + KernelMessage::builder() + .id(km_id) + .source((our_node.as_str(), STATE_PROCESS_ID.clone())) + .target(km_rsvp) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&StateResponse::Err(e)).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&send_to_loop) + .await; } - - let db_clone = db.clone(); - let send_to_loop = send_to_loop.clone(); - let send_to_terminal = send_to_terminal.clone(); - let our_name = our_name.clone(); - let home_directory_path = home_directory_path.clone(); - - tokio::spawn(async move { - let mut queue_lock = queue.lock().await; - if let Some(km) = queue_lock.pop_front() { - if let Err(e) = handle_request( - our_name.clone(), - km.clone(), - db_clone, - send_to_loop.clone(), - send_to_terminal, - home_directory_path, - ) - .await - { - let _ = send_to_loop - .send(make_error_message(our_name.clone(), &km, e)) - .await; - } - } - }); } - } + }); } + Ok(()) } async fn handle_request( - our_name: String, + our_node: &str, kernel_message: KernelMessage, db: Arc, - send_to_loop: MessageSender, - _send_to_terminal: PrintSender, - home_directory_path: String, + send_to_loop: &MessageSender, + home_directory_path: &str, ) -> Result<(), StateError> { let KernelMessage { id, @@ -178,7 +187,7 @@ async fn handle_request( Ok(r) => r, Err(e) => { return Err(StateError::BadJson { - error: format!("parse into StateAction failed: {:?}", e), + error: format!("parse into StateAction failed: {e:?}"), }) } }; @@ -214,7 +223,6 @@ async fn handle_request( }); } Err(e) => { - println!("get state error: {:?}", e); return Err(StateError::RocksDBError { action: "GetState".into(), error: e.to_string(), @@ -230,7 +238,6 @@ async fn handle_request( None, ), Err(e) => { - println!("delete state error: {:?}", e); return Err(StateError::RocksDBError { action: "DeleteState".into(), error: e.to_string(), @@ -239,7 +246,7 @@ async fn handle_request( } } StateAction::Backup => { - let checkpoint_dir = format!("{}/kernel/backup", &home_directory_path); + let checkpoint_dir = format!("{home_directory_path}/kernel/backup"); if Path::new(&checkpoint_dir).exists() { fs::remove_dir_all(&checkpoint_dir).await?; @@ -262,14 +269,14 @@ async fn handle_request( if let Some(target) = rsvp.or_else(|| { expects_response.map(|_| Address { - node: our_name.clone(), + node: our_node.to_string(), process: source.process.clone(), }) }) { let response = KernelMessage { id, source: Address { - node: our_name.clone(), + node: our_node.to_string(), process: STATE_PROCESS_ID.clone(), }, target, @@ -310,9 +317,7 @@ async fn bootstrap( runtime_extensions: Vec<(ProcessId, MessageSender, Option, bool)>, process_map: &mut ProcessMap, reverse_cap_index: &mut ReverseCapIndex, -) -> Result<()> { - // println!("bootstrapping node...\r"); - +) -> anyhow::Result<()> { let mut runtime_caps: HashMap> = HashMap::new(); // kernel is a special case let k_cap = Capability { @@ -730,10 +735,7 @@ async fn get_zipped_packages() -> Vec<( if let Ok(metadata) = serde_json::from_slice::(metadata_bytes) { packages.push((metadata, zip)); } else { - println!( - "fs: metadata for package {} is not valid Erc721Metadata\r", - package_name - ); + println!("fs: metadata for package {package_name} is not valid Erc721Metadata!\r",); } } } @@ -741,31 +743,6 @@ async fn get_zipped_packages() -> Vec<( packages } -fn make_error_message(our_name: String, km: &KernelMessage, error: StateError) -> KernelMessage { - KernelMessage { - id: km.id, - source: Address { - node: our_name.clone(), - process: STATE_PROCESS_ID.clone(), - }, - target: match &km.rsvp { - None => km.source.clone(), - Some(rsvp) => rsvp.clone(), - }, - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&StateResponse::Err(error)).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - } -} - fn process_to_vec(process: ProcessId) -> Vec { process.to_string().as_bytes().to_vec() } diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 6e7dfad7d..e420ab515 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -87,7 +87,23 @@ pub async fn vfs( Printout::new(1, format!("vfs: {e}")) .send(&send_to_terminal) .await; - make_error_message(&our_node, km_id, km_rsvp, e, &send_to_loop).await; + KernelMessage::builder() + .id(km_id) + .source((our_node.as_str(), VFS_PROCESS_ID.clone())) + .target(km_rsvp) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&VfsResponse::Err(e)).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&send_to_loop) + .await; } } }); @@ -218,7 +234,6 @@ async fn handle_request( // open file opens an existing file, or creates a new one if create is true let file = open_file(open_files, path, create, false).await?; let mut file = file.lock().await; - // extra in the case file was just created, todo refactor out. file.seek(SeekFrom::Start(0)).await?; (VfsResponse::Ok, None) } @@ -525,7 +540,7 @@ async fn open_file>( ) -> Result>, VfsError> { let path = path.as_ref().to_path_buf(); Ok(match open_files.get(&path) { - Some(file) => Arc::clone(file.value()), + Some(file) => file.value().clone(), None => { let file = Arc::new(Mutex::new( tokio::fs::OpenOptions::new() @@ -540,7 +555,7 @@ async fn open_file>( path: path.display().to_string(), })?, )); - open_files.insert(path.clone(), Arc::clone(&file)); + open_files.insert(path, file.clone()); file } }) @@ -807,29 +822,3 @@ fn join_paths_safely(base: &PathBuf, extension: &str) -> PathBuf { let extension_path = Path::new(extension_str); base.join(extension_path) } - -async fn make_error_message( - our_node: &str, - id: u64, - source: Address, - error: VfsError, - send_to_loop: &MessageSender, -) { - KernelMessage::builder() - .id(id) - .source((our_node, VFS_PROCESS_ID.clone())) - .target(source) - .message(Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&VfsResponse::Err(error)).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - ))) - .build() - .unwrap() - .send(send_to_loop) - .await; -} From 8a5eae4f22591f69038ba1fa901528fa5e56aa4e Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 25 Jun 2024 16:53:35 +0200 Subject: [PATCH 6/7] more cleanup of imports, use kernelMessage builder --- kinode/build.rs | 2 +- kinode/src/fakenet/mod.rs | 7 +- kinode/src/kernel/process.rs | 10 +-- kinode/src/kernel/standard_host.rs | 3 +- kinode/src/kernel/standard_host_v0.rs | 3 +- kinode/src/keygen.rs | 26 ++++--- kinode/src/kv.rs | 47 ++++-------- kinode/src/main.rs | 43 +++++------ kinode/src/net/mod.rs | 44 +++++------ kinode/src/net/utils.rs | 78 +++++++++---------- kinode/src/register.rs | 36 +++++---- kinode/src/sqlite.rs | 47 ++++-------- kinode/src/state.rs | 34 ++++----- kinode/src/terminal/mod.rs | 41 +++++----- kinode/src/timer.rs | 106 ++++++++++++++------------ kinode/src/vfs.rs | 8 +- 16 files changed, 230 insertions(+), 305 deletions(-) diff --git a/kinode/build.rs b/kinode/build.rs index cd9264ac7..72c43ce35 100644 --- a/kinode/build.rs +++ b/kinode/build.rs @@ -1,4 +1,4 @@ -use rayon::prelude::*; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{ collections::HashSet, fs::{self, File}, diff --git a/kinode/src/fakenet/mod.rs b/kinode/src/fakenet/mod.rs index 0a75c81d0..10e228663 100644 --- a/kinode/src/fakenet/mod.rs +++ b/kinode/src/fakenet/mod.rs @@ -1,3 +1,4 @@ +use crate::{keygen, KNS_ADDRESS}; use alloy::network::{eip2718::Encodable2718, EthereumWallet, TransactionBuilder}; use alloy::providers::{Provider, ProviderBuilder, RootProvider}; use alloy::pubsub::PubSubFrontend; @@ -9,11 +10,7 @@ use alloy_sol_types::{SolCall, SolValue}; use lib::core::{Identity, NodeRouting}; use std::str::FromStr; -pub mod helpers; - -use crate::{keygen, KNS_ADDRESS}; -pub use helpers::RegisterHelpers::*; -pub use helpers::*; +mod helpers; const FAKE_DOTDEV: &str = "0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9"; diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index bd5ef1cf2..2e03cb815 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -1,5 +1,4 @@ use crate::KERNEL_PROCESS_ID; -use anyhow::Result; use lib::types::core as t; pub use lib::v0::ProcessV0; pub use lib::Process; @@ -9,8 +8,7 @@ use std::sync::Arc; use tokio::fs; use tokio::task::JoinHandle; use wasi_common::sync::Dir; -use wasmtime::component::ResourceTable as Table; -use wasmtime::component::*; +use wasmtime::component::{Component, Linker, ResourceTable as Table}; use wasmtime::{Engine, Store}; use wasmtime_wasi::{ pipe::MemoryOutputPipe, DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView, @@ -90,7 +88,7 @@ async fn make_component( wasm_bytes: &[u8], home_directory_path: String, process_state: ProcessState, -) -> Result<(Process, Store, MemoryOutputPipe)> { +) -> anyhow::Result<(Process, Store, MemoryOutputPipe)> { let component = Component::new(&engine, wasm_bytes.to_vec()) .expect("make_process_loop: couldn't read file"); @@ -170,7 +168,7 @@ async fn make_component_v0( wasm_bytes: &[u8], home_directory_path: String, process_state: ProcessState, -) -> Result<(ProcessV0, Store, MemoryOutputPipe)> { +) -> anyhow::Result<(ProcessV0, Store, MemoryOutputPipe)> { let component = Component::new(&engine, wasm_bytes.to_vec()) .expect("make_process_loop: couldn't read file"); @@ -257,7 +255,7 @@ pub async fn make_process_loop( caps_oracle: t::CapMessageSender, engine: Engine, home_directory_path: String, -) -> Result<()> { +) -> anyhow::Result<()> { // before process can be instantiated, need to await 'run' message from kernel let mut pre_boot_queue = Vec::>::new(); while let Some(message) = recv_in_process.recv().await { diff --git a/kinode/src/kernel/standard_host.rs b/kinode/src/kernel/standard_host.rs index d11b09917..84e7b2aaf 100644 --- a/kinode/src/kernel/standard_host.rs +++ b/kinode/src/kernel/standard_host.rs @@ -1,7 +1,6 @@ use crate::kernel::process; -use crate::KERNEL_PROCESS_ID; -use crate::VFS_PROCESS_ID; use anyhow::Result; +use lib::core::{KERNEL_PROCESS_ID, VFS_PROCESS_ID}; use lib::types::core::{self as t, STATE_PROCESS_ID}; pub use lib::wit; pub use lib::wit::Host as StandardHost; diff --git a/kinode/src/kernel/standard_host_v0.rs b/kinode/src/kernel/standard_host_v0.rs index adbb462ee..8b75914c9 100644 --- a/kinode/src/kernel/standard_host_v0.rs +++ b/kinode/src/kernel/standard_host_v0.rs @@ -1,7 +1,6 @@ use crate::kernel::process; -use crate::KERNEL_PROCESS_ID; -use crate::VFS_PROCESS_ID; use anyhow::Result; +use lib::core::{KERNEL_PROCESS_ID, VFS_PROCESS_ID}; use lib::types::core::{self as t, STATE_PROCESS_ID}; pub use lib::v0::wit; pub use lib::v0::wit::Host as StandardHost; diff --git a/kinode/src/keygen.rs b/kinode/src/keygen.rs index 945238b0e..2180c8415 100644 --- a/kinode/src/keygen.rs +++ b/kinode/src/keygen.rs @@ -2,23 +2,15 @@ use aes_gcm::{ aead::{Aead, AeadCore, KeyInit, OsRng}, Aes256Gcm, Key, }; -use alloy_primitives::keccak256; -use anyhow::Result; -use generic_array::GenericArray; -use hmac::Hmac; -use jwt::SignWithKey; use lib::types::core::Keyfile; use ring::pbkdf2; -use ring::pkcs8::Document; use ring::rand::SystemRandom; use ring::signature::{self, KeyPair}; -use ring::{digest as ring_digest, rand::SecureRandom}; -use sha2::Sha256; use std::num::NonZeroU32; type DiskKey = [u8; CREDENTIAL_LEN]; -pub const CREDENTIAL_LEN: usize = ring_digest::SHA256_OUTPUT_LEN; +pub const CREDENTIAL_LEN: usize = ring::digest::SHA256_OUTPUT_LEN; pub const ITERATIONS: u32 = 1_000_000; pub static PBKDF2_ALG: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256; // TODO maybe look into Argon2 @@ -30,8 +22,9 @@ pub fn encode_keyfile( jwt: &[u8], file_key: &[u8], ) -> Vec { - let mut disk_key: DiskKey = [0u8; CREDENTIAL_LEN]; + use ring::rand::SecureRandom; + let mut disk_key: DiskKey = [0u8; CREDENTIAL_LEN]; let rng = SystemRandom::new(); let mut salt = [0u8; 32]; // generate a unique salt rng.fill(&mut salt).unwrap(); @@ -67,6 +60,8 @@ pub fn encode_keyfile( } pub fn decode_keyfile(keyfile: &[u8], password: &str) -> Result { + use generic_array::GenericArray; + let (username, routers, salt, key_enc, jwt_enc, file_enc) = bincode::deserialize::<(String, Vec, Vec, Vec, Vec, Vec)>(keyfile) .map_err(|_| "failed to deserialize keyfile")?; @@ -117,8 +112,11 @@ pub fn generate_jwt( username: &str, subdomain: &Option, ) -> Option { - let jwt_secret: Hmac = Hmac::new_from_slice(jwt_secret_bytes).ok()?; + use hmac::Hmac; + use jwt::SignWithKey; + use sha2::Sha256; + let jwt_secret: Hmac = Hmac::new_from_slice(jwt_secret_bytes).ok()?; let subdomain = match subdomain.clone().unwrap_or_default().as_str() { "" => None, subdomain => Some(subdomain.to_string()), @@ -143,6 +141,8 @@ pub fn get_username_and_routers(keyfile: &[u8]) -> Result<(String, Vec), } pub fn namehash(name: &str) -> Vec { + use alloy_primitives::keccak256; + let mut node = vec![0u8; 32]; if name.is_empty() { return node; @@ -158,6 +158,8 @@ pub fn namehash(name: &str) -> Vec { /// randomly generated key to encrypt file chunks, pub fn generate_file_key() -> Vec { + use ring::rand::SecureRandom; + let mut key = [0u8; 32]; let rng = SystemRandom::new(); rng.fill(&mut key).unwrap(); @@ -166,7 +168,7 @@ pub fn generate_file_key() -> Vec { /// # Returns /// a pair of (public key (encoded as a hex string), serialized key as a pkcs8 Document) -pub fn generate_networking_key() -> (String, Document) { +pub fn generate_networking_key() -> (String, ring::pkcs8::Document) { let seed = SystemRandom::new(); let doc = signature::Ed25519KeyPair::generate_pkcs8(&seed).unwrap(); let keys = signature::Ed25519KeyPair::from_pkcs8(doc.as_ref()).unwrap(); diff --git a/kinode/src/kv.rs b/kinode/src/kv.rs index 27db68348..b8b74b7d1 100644 --- a/kinode/src/kv.rs +++ b/kinode/src/kv.rs @@ -75,7 +75,6 @@ pub async fn kv( open_kvs, txs, &send_to_loop, - &send_to_terminal, &send_to_caps_oracle, &kv_path, ) @@ -114,7 +113,6 @@ async fn handle_request( open_kvs: Arc>, txs: Arc>)>>>, send_to_loop: &MessageSender, - send_to_terminal: &PrintSender, send_to_caps_oracle: &CapMessageSender, kv_path: &str, ) -> Result<(), KvError> { @@ -301,21 +299,12 @@ async fn handle_request( } }; - if let Some(target) = km.rsvp.or_else(|| { - expects_response.map(|_| Address { - node: our_node.to_string(), - process: source.process.clone(), - }) - }) { - let response = KernelMessage { - id, - source: Address { - node: our_node.to_string(), - process: KV_PROCESS_ID.clone(), - }, - target, - rsvp: None, - message: Message::Response(( + if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) { + KernelMessage::builder() + .id(id) + .source((our_node, KV_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( Response { inherit: false, body, @@ -323,25 +312,15 @@ async fn handle_request( capabilities: vec![], }, None, - )), - lazy_load_blob: bytes.map(|bytes| LazyLoadBlob { + ))) + .lazy_load_blob(bytes.map(|bytes| LazyLoadBlob { mime: Some("application/octet-stream".into()), bytes, - }), - }; - - let _ = send_to_loop.send(response).await; - } else { - send_to_terminal - .send(Printout { - verbosity: 2, - content: format!( - "kv: not sending response: {:?}", - serde_json::from_slice::(&body) - ), - }) - .await - .unwrap(); + })) + .build() + .unwrap() + .send(send_to_loop) + .await; } Ok(()) diff --git a/kinode/src/main.rs b/kinode/src/main.rs index b4695b5b8..c4a1b0e81 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -2,7 +2,12 @@ #![feature(btree_extract_if)] use anyhow::Result; use clap::{arg, value_parser, Command}; -use lib::types::core::*; +use lib::types::core::{ + CapMessageReceiver, CapMessageSender, DebugReceiver, DebugSender, Identity, KernelCommand, + KernelMessage, Keyfile, Message, MessageReceiver, MessageSender, NetworkErrorReceiver, + NetworkErrorSender, NodeRouting, PrintReceiver, PrintSender, ProcessId, Request, + KERNEL_PROCESS_ID, +}; #[cfg(feature = "simulation-mode")] use ring::{rand::SystemRandom, signature, signature::KeyPair}; use std::env; @@ -421,34 +426,24 @@ async fn main() { verbose_mode, ) => { match quit { - Ok(_) => match kernel_message_sender - .send(KernelMessage { - id: rand::random(), - source: Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - target: Address { - node: our.name.clone(), - process: KERNEL_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { + Ok(()) => { + KernelMessage::builder() + .id(rand::random()) + .source((our.name.as_str(), KERNEL_PROCESS_ID.clone())) + .target((our.name.as_str(), KERNEL_PROCESS_ID.clone())) + .message(Message::Request(Request { inherit: false, expects_response: None, body: serde_json::to_vec(&KernelCommand::Shutdown).unwrap(), metadata: None, capabilities: vec![], - }), - lazy_load_blob: None, - }) - .await - { - Ok(()) => "graceful exit".into(), - Err(_) => { - "failed to gracefully shut down kernel".into() - } - }, + })) + .build() + .unwrap() + .send(&kernel_message_sender) + .await; + "graceful exit".into() + } Err(e) => e.to_string(), } } diff --git a/kinode/src/net/mod.rs b/kinode/src/net/mod.rs index fb5610a09..b0888aad9 100644 --- a/kinode/src/net/mod.rs +++ b/kinode/src/net/mod.rs @@ -1,6 +1,6 @@ use lib::types::core::{ - Address, Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse, - NetworkErrorSender, NodeRouting, PrintSender, ProcessId, + Identity, KernelMessage, MessageReceiver, MessageSender, NetAction, NetResponse, + NetworkErrorSender, NodeRouting, PrintSender, }; use types::{ IdentityExt, NetData, OnchainPKI, PKINames, Peers, PendingPassthroughs, TCP_PROTOCOL, @@ -271,29 +271,25 @@ async fn handle_local_request( return; } }; - ext.kernel_message_tx - .send(KernelMessage { - id: km.id, - source: Address { - node: ext.our.name.clone(), - process: ProcessId::new(Some("net"), "distro", "sys"), + KernelMessage::builder() + .id(km.id) + .source((ext.our.name.as_str(), "net", "distro", "sys")) + .target(km.rsvp.as_ref().unwrap_or(&km.source).clone()) + .message(lib::core::Message::Response(( + lib::core::Response { + inherit: false, + body: rmp_serde::to_vec(&response_body) + .expect("net: failed to serialize response"), + metadata: None, + capabilities: vec![], }, - target: km.rsvp.as_ref().unwrap_or(&km.source).clone(), - rsvp: None, - message: lib::core::Message::Response(( - lib::core::Response { - inherit: false, - body: rmp_serde::to_vec(&response_body) - .expect("net: failed to serialize response"), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: response_blob, - }) - .await - .expect("net: kernel channel was dropped"); + None, + ))) + .lazy_load_blob(response_blob) + .build() + .unwrap() + .send(&ext.kernel_message_tx) + .await; } } } diff --git a/kinode/src/net/utils.rs b/kinode/src/net/utils.rs index 22d87abc6..e997641f7 100644 --- a/kinode/src/net/utils.rs +++ b/kinode/src/net/utils.rs @@ -3,9 +3,9 @@ use crate::net::types::{ RoutingRequest, TCP_PROTOCOL, WS_PROTOCOL, }; use lib::types::core::{ - Address, Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, - NetworkErrorSender, NodeRouting, PrintSender, Printout, ProcessId, Request, Response, - SendError, SendErrorKind, WrappedSendError, + Identity, KernelMessage, KnsUpdate, Message, MessageSender, NetAction, NetworkErrorSender, + NodeRouting, PrintSender, Printout, Request, Response, SendError, SendErrorKind, + WrappedSendError, }; use { futures::{SinkExt, StreamExt}, @@ -94,26 +94,21 @@ pub async fn create_passthrough( } // send their net:distro:sys process a message, notifying it to create a *matching* // passthrough request, which we can pair with this pending one. - target_peer.sender.send(KernelMessage { - id: rand::random(), - source: Address { - node: our.name.clone(), - process: ProcessId::new(Some("net"), "distro", "sys"), - }, - target: Address { - node: target_id.name.clone(), - process: ProcessId::new(Some("net"), "distro", "sys"), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: Some(5), - body: rmp_serde::to_vec(&NetAction::ConnectionRequest(from_id.name.clone()))?, - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - })?; + target_peer.sender.send( + KernelMessage::builder() + .id(rand::random()) + .source((our.name.as_str(), "net", "distro", "sys")) + .target((target_id.name.as_str(), "net", "distro", "sys")) + .message(Message::Request(Request { + inherit: false, + expects_response: Some(5), + body: rmp_serde::to_vec(&NetAction::ConnectionRequest(from_id.name.clone()))?, + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap(), + )?; // we'll remove this either if the above message gets a negative response, // or if the target node connects to us with a matching passthrough. // TODO it is currently possible to have dangling passthroughs in the map @@ -335,28 +330,23 @@ pub async fn parse_hello_message( ), ) .await; - kernel_message_tx - .send(KernelMessage { - id: km.id, - source: Address { - node: our.name.clone(), - process: ProcessId::new(Some("net"), "distro", "sys"), + KernelMessage::builder() + .id(km.id) + .source((our.name.as_str(), "net", "distro", "sys")) + .target(km.rsvp.as_ref().unwrap_or(&km.source).clone()) + .message(Message::Response(( + Response { + inherit: false, + body: "delivered".as_bytes().to_vec(), + metadata: None, + capabilities: vec![], }, - target: km.rsvp.as_ref().unwrap_or(&km.source).clone(), - rsvp: None, - message: Message::Response(( - Response { - inherit: false, - body: "delivered".as_bytes().to_vec(), - metadata: None, - capabilities: vec![], - }, - None, - )), - lazy_load_blob: None, - }) - .await - .expect("net: kernel_message_tx was dropped"); + None, + ))) + .build() + .unwrap() + .send(kernel_message_tx) + .await; } /// Create a terminal printout at verbosity level 0. diff --git a/kinode/src/register.rs b/kinode/src/register.rs index db512a6e0..9389267f2 100644 --- a/kinode/src/register.rs +++ b/kinode/src/register.rs @@ -1,22 +1,25 @@ -use crate::keygen; -use crate::KNS_ADDRESS; -use alloy::providers::{Provider, ProviderBuilder, RootProvider}; -use alloy::pubsub::PubSubFrontend; -use alloy::rpc::client::WsConnect; -use alloy::rpc::types::eth::{TransactionInput, TransactionRequest}; -use alloy::signers::Signature; +use crate::{keygen, KNS_ADDRESS}; +use alloy::{ + providers::{Provider, ProviderBuilder, RootProvider}, + pubsub::PubSubFrontend, + rpc::client::WsConnect, + rpc::types::eth::{TransactionInput, TransactionRequest}, + signers::Signature, +}; use alloy_primitives::{Address as EthAddress, Bytes, FixedBytes, U256}; use alloy_sol_macro::sol; use alloy_sol_types::{SolCall, SolValue}; use base64::{engine::general_purpose::STANDARD as base64_standard, Engine}; -use lib::types::core::*; -use ring::rand::SystemRandom; -use ring::signature; -use ring::signature::KeyPair; -use static_dir::static_dir; -use std::str::FromStr; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use lib::types::core::{ + BootInfo, Identity, ImportKeyfileInfo, Keyfile, KeyfileVet, KeyfileVetted, LoginAndResetInfo, + LoginInfo, NodeRouting, UnencryptedIdentity, +}; +use ring::{rand::SystemRandom, signature, signature::KeyPair}; +use std::{ + str::FromStr, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; use tokio::sync::{mpsc, oneshot}; use warp::{ http::{ @@ -104,7 +107,8 @@ pub async fn register( let ws_port = warp::any().map(move || (ws_port, ws_flag_used)); let tcp_port = warp::any().map(move || (tcp_port, tcp_flag_used)); - let static_files = warp::path("assets").and(static_dir!("src/register-ui/build/assets/")); + let static_files = + warp::path("assets").and(static_dir::static_dir!("src/register-ui/build/assets/")); let react_app = warp::path::end() .or(warp::path("login")) diff --git a/kinode/src/sqlite.rs b/kinode/src/sqlite.rs index b29404a21..ee03b5cdd 100644 --- a/kinode/src/sqlite.rs +++ b/kinode/src/sqlite.rs @@ -83,7 +83,6 @@ pub async fn sqlite( open_dbs, txs, &send_to_loop, - &send_to_terminal, &send_to_caps_oracle, &sqlite_path, ) @@ -123,7 +122,6 @@ async fn handle_request( open_dbs: Arc>>, txs: Arc)>>>, send_to_loop: &MessageSender, - send_to_terminal: &PrintSender, send_to_caps_oracle: &CapMessageSender, sqlite_path: &str, ) -> Result<(), SqliteError> { @@ -314,21 +312,12 @@ async fn handle_request( } }; - if let Some(target) = km.rsvp.or_else(|| { - expects_response.map(|_| Address { - node: our_node.to_string(), - process: source.process.clone(), - }) - }) { - let response = KernelMessage { - id, - source: Address { - node: our_node.to_string(), - process: SQLITE_PROCESS_ID.clone(), - }, - target, - rsvp: None, - message: Message::Response(( + if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) { + KernelMessage::builder() + .id(id) + .source((our_node, SQLITE_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( Response { inherit: false, body, @@ -336,25 +325,15 @@ async fn handle_request( capabilities: vec![], }, None, - )), - lazy_load_blob: bytes.map(|bytes| LazyLoadBlob { + ))) + .lazy_load_blob(bytes.map(|bytes| LazyLoadBlob { mime: Some("application/octet-stream".into()), bytes, - }), - }; - - let _ = send_to_loop.send(response).await; - } else { - send_to_terminal - .send(Printout { - verbosity: 2, - content: format!( - "sqlite: not sending response: {:?}", - serde_json::from_slice::(&body) - ), - }) - .await - .unwrap(); + })) + .build() + .unwrap() + .send(send_to_loop) + .await; } Ok(()) diff --git a/kinode/src/state.rs b/kinode/src/state.rs index c0ecb3d2d..5b8307786 100644 --- a/kinode/src/state.rs +++ b/kinode/src/state.rs @@ -267,21 +267,12 @@ async fn handle_request( } }; - if let Some(target) = rsvp.or_else(|| { - expects_response.map(|_| Address { - node: our_node.to_string(), - process: source.process.clone(), - }) - }) { - let response = KernelMessage { - id, - source: Address { - node: our_node.to_string(), - process: STATE_PROCESS_ID.clone(), - }, - target, - rsvp: None, - message: Message::Response(( + if let Some(target) = rsvp.or_else(|| expects_response.map(|_| source)) { + KernelMessage::builder() + .id(id) + .source((our_node, STATE_PROCESS_ID.clone())) + .target(target) + .message(Message::Response(( Response { inherit: false, body, @@ -289,14 +280,15 @@ async fn handle_request( capabilities: vec![], }, None, - )), - lazy_load_blob: bytes.map(|bytes| LazyLoadBlob { + ))) + .lazy_load_blob(bytes.map(|bytes| LazyLoadBlob { mime: Some("application/octet-stream".into()), bytes, - }), - }; - - let _ = send_to_loop.send(response).await; + })) + .build() + .unwrap() + .send(send_to_loop) + .await; }; Ok(()) diff --git a/kinode/src/terminal/mod.rs b/kinode/src/terminal/mod.rs index 998a8c083..8b8326e90 100644 --- a/kinode/src/terminal/mod.rs +++ b/kinode/src/terminal/mod.rs @@ -8,8 +8,8 @@ use crossterm::{ }; use futures::{future::FutureExt, StreamExt}; use lib::types::core::{ - Address, DebugCommand, DebugSender, Identity, KernelMessage, Message, MessageSender, - PrintReceiver, PrintSender, Printout, Request, TERMINAL_PROCESS_ID, + DebugCommand, DebugSender, Identity, KernelMessage, Message, MessageSender, PrintReceiver, + PrintSender, Printout, Request, TERMINAL_PROCESS_ID, }; use std::{ fs::{read_to_string, OpenOptions}, @@ -558,28 +558,21 @@ pub async fn terminal( command_history.add(command.clone()); cursor_col = prompt_len as u16; line_col = prompt_len; - event_loop.send( - KernelMessage { - id: rand::random(), - source: Address { - node: our.name.clone(), - process: TERMINAL_PROCESS_ID.clone(), - }, - target: Address { - node: our.name.clone(), - process: TERMINAL_PROCESS_ID.clone(), - }, - rsvp: None, - message: Message::Request(Request { - inherit: false, - expects_response: None, - body: command.into_bytes(), - metadata: None, - capabilities: vec![], - }), - lazy_load_blob: None, - } - ).await.expect("terminal: couldn't execute command!"); + KernelMessage::builder() + .id(rand::random()) + .source((our.name.as_str(), TERMINAL_PROCESS_ID.clone())) + .target((our.name.as_str(), TERMINAL_PROCESS_ID.clone())) + .message(Message::Request(Request { + inherit: false, + expects_response: None, + body: command.into_bytes(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&event_loop) + .await; }, _ => { // some keycode we don't care about, yet diff --git a/kinode/src/timer.rs b/kinode/src/timer.rs index 2a01282f2..109a96c21 100644 --- a/kinode/src/timer.rs +++ b/kinode/src/timer.rs @@ -1,18 +1,39 @@ -use anyhow::Result; use lib::types::core::{ Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout, Response, TimerAction, TIMER_PROCESS_ID, }; use serde::{Deserialize, Serialize}; +#[derive(Serialize, Deserialize, Debug)] +struct TimerMap { + // key: the unix timestamp in milliseconds at which the timer pops + // value: a vector of KernelMessage ids and who to send Response to + // this is because multiple processes can set timers for the same time + timers: nohash_hasher::IntMap>, +} + +impl TimerMap { + fn insert(&mut self, pop_time: u64, id: u64, addr: Address) { + self.timers.entry(pop_time).or_default().push((id, addr)); + } + + fn contains(&mut self, pop_time: u64) -> bool { + self.timers.contains_key(&pop_time) + } + + fn remove(&mut self, pop_time: u64) -> Option> { + self.timers.remove(&pop_time) + } +} + /// A runtime module that allows processes to set timers. Interacting with the /// timer is done with a simple Request/Response pattern, and the timer module /// is public, so it can be used by any local process. It will not respond to /// requests made by other nodes. /// /// The interface of the timer module is as follows: -/// One kind of request is accepted: TimerAction::SetTimer(u64), where the u64 is the time to wait -/// in milliseconds. This request should always expect a Response. +/// One kind of request is accepted: TimerAction::SetTimer(u64), where the u64 is the +/// time to wait in milliseconds. This request should always expect a Response. /// If the request does not expect a Response, the timer will not be set. /// /// A proper Request will trigger the timer module to send a Response. The Response will be @@ -24,8 +45,7 @@ pub async fn timer_service( kernel_message_sender: MessageSender, mut timer_message_receiver: MessageReceiver, print_tx: PrintSender, -) -> Result<()> { - // if we have a persisted state file, load it +) -> anyhow::Result<()> { let mut timer_map = TimerMap { timers: nohash_hasher::IntMap::default(), }; @@ -60,7 +80,22 @@ pub async fn timer_service( .as_millis() as u64; let pop_time = now + timer_millis; if timer_millis == 0 { - send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; + KernelMessage::builder() + .id(km.id) + .source((our.as_str(), TIMER_PROCESS_ID.clone())) + .target(km.rsvp.unwrap_or(km.source)) + .message(Message::Response(( + Response { + inherit: false, + body: vec![], + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&kernel_message_sender).await; continue } Printout::new(3, format!("set timer to pop in {timer_millis}ms")).send(&print_tx).await; @@ -79,51 +114,24 @@ pub async fn timer_service( // the timer(s), and then remove it from our persisted map let Some(timers) = timer_map.remove(time) else { continue }; for (id, addr) in timers { - send_response(&our, id, addr, &kernel_message_sender).await; + KernelMessage::builder() + .id(id) + .source((our.as_str(), TIMER_PROCESS_ID.clone())) + .target(addr) + .message(Message::Response(( + Response { + inherit: false, + body: vec![], + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&kernel_message_sender).await; } } } } } - -#[derive(Serialize, Deserialize, Debug)] -struct TimerMap { - // key: the unix timestamp in milliseconds at which the timer pops - // value: a vector of KernelMessage ids and who to send Response to - // this is because multiple processes can set timers for the same time - timers: nohash_hasher::IntMap>, -} - -impl TimerMap { - fn insert(&mut self, pop_time: u64, id: u64, addr: Address) { - self.timers.entry(pop_time).or_default().push((id, addr)); - } - - fn contains(&mut self, pop_time: u64) -> bool { - self.timers.contains_key(&pop_time) - } - - fn remove(&mut self, pop_time: u64) -> Option> { - self.timers.remove(&pop_time) - } -} - -async fn send_response(our_node: &str, id: u64, target: Address, send_to_loop: &MessageSender) { - KernelMessage::builder() - .id(id) - .source((our_node, TIMER_PROCESS_ID.clone())) - .target(target) - .message(Message::Response(( - Response { - inherit: false, - body: vec![], - metadata: None, - capabilities: vec![], - }, - None, - ))) - .build() - .unwrap() - .send(send_to_loop) - .await; -} diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index e420ab515..1f45a8374 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -326,7 +326,6 @@ async fn handle_request( VfsAction::Seek { seek_from } => { let file = open_file(open_files, path, false, false).await?; let mut file = file.lock().await; - // same type, rust tingz let seek_from = match seek_from { lib::types::core::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset), lib::types::core::SeekFrom::End(offset) => std::io::SeekFrom::End(offset), @@ -449,12 +448,7 @@ async fn handle_request( } }; - if let Some(target) = km.rsvp.or_else(|| { - expects_response.map(|_| Address { - node: our_node.to_string(), - process: km.source.process, - }) - }) { + if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| km.source)) { KernelMessage::builder() .id(km.id) .source((our_node, VFS_PROCESS_ID.clone())) From 3fff94c57e5921eee070345da723afefaac54ac1 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Tue, 25 Jun 2024 17:25:58 +0200 Subject: [PATCH 7/7] simulation-mode compiling --- kinode/src/fakenet/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kinode/src/fakenet/mod.rs b/kinode/src/fakenet/mod.rs index 10e228663..85277e49b 100644 --- a/kinode/src/fakenet/mod.rs +++ b/kinode/src/fakenet/mod.rs @@ -1,3 +1,6 @@ +use crate::fakenet::helpers::RegisterHelpers::{ + ipCall, multicallCall, ownerOfCall, registerCall, setAllIpCall, setKeyCall, +}; use crate::{keygen, KNS_ADDRESS}; use alloy::network::{eip2718::Encodable2718, EthereumWallet, TransactionBuilder}; use alloy::providers::{Provider, ProviderBuilder, RootProvider}; @@ -10,7 +13,7 @@ use alloy_sol_types::{SolCall, SolValue}; use lib::core::{Identity, NodeRouting}; use std::str::FromStr; -mod helpers; +pub mod helpers; const FAKE_DOTDEV: &str = "0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9"; @@ -39,8 +42,8 @@ pub async fn register_local( let provider: RootProvider = ProviderBuilder::default().on_ws(ws).await?; - let fqdn = dns_encode_fqdn(name); - let namehash = encode_namehash(name); + let fqdn = helpers::dns_encode_fqdn(name); + let namehash = helpers::encode_namehash(name); // todo: find a better way? let namehash_bint: B256 = namehash.into(); let namehash_uint: U256 = namehash_bint.into();