diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 536eb9177..9baad6be0 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -6,22 +6,21 @@ use lib::types::core::{ KERNEL_PROCESS_ID, VFS_PROCESS_ID, }; use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, + hash::Hash, io::Read, path::{Component, Path, PathBuf}, sync::Arc, - time::{Duration, Instant}, + time::Instant, }; use tokio::{ fs, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, sync::Mutex, - time::interval, }; // Constants for file cleanup -const FILE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); -const FILE_IDLE_TIMEOUT: Duration = Duration::from_secs(300); +const MAX_OPEN_FILES: usize = 180; /// The main VFS service function. /// @@ -53,23 +52,11 @@ pub async fn vfs( .map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?; let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?); - let open_files: Arc>, Instant)>> = - Arc::new(DashMap::new()); + let files = Files::new(); let process_queues: HashMap>>> = HashMap::default(); - // Start the file cleanup task - let cleanup_open_files = open_files.clone(); - tokio::spawn(async move { - let mut interval = interval(FILE_CLEANUP_INTERVAL); - loop { - interval.tick().await; - cleanup_open_files - .retain(|_, (_, last_accessed)| last_accessed.elapsed() < FILE_IDLE_TIMEOUT); - } - }); - while let Some(km) = recv_from_loop.recv().await { if *our_node != km.source.node { Printout::new( @@ -98,7 +85,7 @@ pub async fn vfs( let our_node = our_node.clone(); let send_to_loop = send_to_loop.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone(); - let open_files = open_files.clone(); + let files = files.clone(); let vfs_path = vfs_path.clone(); tokio::spawn(async move { @@ -110,7 +97,7 @@ pub async fn vfs( if let Err(e) = handle_request( &our_node, km, - open_files, + files, &send_to_loop, &send_to_caps_oracle, &vfs_path, @@ -141,6 +128,115 @@ pub async fn vfs( Ok(()) } +/// Helper struct to manage open files, cursor positions, and access order. +#[derive(Clone)] +struct Files { + /// currently open files, with last access time + open_files: Arc>, + /// cursor positions for files closed to avoid too many open files OS error + cursor_positions: Arc>, + /// access order of files + access_order: Arc>>, +} + +struct FileEntry { + file: Arc>, + last_access: Instant, +} + +impl Files { + pub fn new() -> Self { + Self { + open_files: Arc::new(DashMap::new()), + cursor_positions: Arc::new(DashMap::new()), + access_order: Arc::new(Mutex::new(UniqueQueue::new())), + } + } + + pub async fn open_file>( + &self, + path: P, + create: bool, + truncate: bool, + ) -> Result>, VfsError> { + let path = path.as_ref().to_path_buf(); + + if let Some(mut entry) = self.open_files.get_mut(&path) { + entry.value_mut().last_access = Instant::now(); + self.update_access_order(&path).await; + return Ok(entry.value().file.clone()); + } + + if self.open_files.len() >= MAX_OPEN_FILES { + self.close_least_recently_used_files().await?; + } + + let mut file = self.try_open_file(&path, create, truncate).await?; + if let Some(position) = self.cursor_positions.get(&path) { + file.seek(SeekFrom::Start(*position)).await?; + } + let file = Arc::new(Mutex::new(file)); + self.open_files.insert( + path.clone(), + FileEntry { + file: file.clone(), + last_access: Instant::now(), + }, + ); + self.update_access_order(&path).await; + Ok(file) + } + + async fn update_access_order(&self, path: &Path) { + let mut access_order = self.access_order.lock().await; + access_order.push_back(path.to_path_buf()); + } + + async fn close_least_recently_used_files(&self) -> Result<(), VfsError> { + let mut access_order = self.access_order.lock().await; + let mut closed = 0; + let to_close = MAX_OPEN_FILES / 3; // close 33% of max open files + + while closed < to_close { + if let Some(path) = access_order.pop_front() { + if let Some((_, file_entry)) = self.open_files.remove(&path) { + if Arc::strong_count(&file_entry.file) == 1 { + let mut file = file_entry.file.lock().await; + if let Ok(position) = file.stream_position().await { + if position != 0 { + self.cursor_positions.insert(path, position); + } + } + closed += 1; + } else { + // file is still in use, put it back + self.open_files.insert(path.clone(), file_entry); + access_order.push_back(path); + } + } + } else { + break; // no more files to close + } + } + Ok(()) + } + + async fn try_open_file( + &self, + path: &Path, + create: bool, + truncate: bool, + ) -> Result { + tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .create(create) + .truncate(truncate) + .open(path) + .await + } +} + /// Handles individual VFS requests. /// /// This function processes various VFS actions such as file operations, directory listings, etc. @@ -148,7 +244,7 @@ pub async fn vfs( /// # Arguments /// * `our_node` - The identifier for the current node /// * `km` - The incoming kernel message -/// * `open_files` - A map of currently open files +/// * `files` - A struct containing open_files, cursor_positions, and access_order /// * `send_to_loop` - Sender for kernel messages /// * `send_to_caps_oracle` - Sender for capability messages /// * `vfs_path` - The base path for the VFS @@ -158,7 +254,7 @@ pub async fn vfs( async fn handle_request( our_node: &str, km: KernelMessage, - open_files: Arc>, Instant)>>, + files: Files, send_to_loop: &MessageSender, send_to_caps_oracle: &CapMessageSender, vfs_path: &PathBuf, @@ -265,19 +361,19 @@ async fn handle_request( } VfsAction::CreateFile => { // create truncates any file that might've existed before - open_files.remove(&path); - let _file = open_file(open_files, &path, true, true).await?; + files.open_files.remove(&path); + let _file = files.open_file(&path, true, true).await?; (VfsResponse::Ok, None) } VfsAction::OpenFile { create } => { - let file = open_file(open_files, &path, create, false).await?; + let file = files.open_file(&path, create, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::Start(0)).await?; (VfsResponse::Ok, None) } VfsAction::CloseFile => { // removes file from scope, resets file_handle and cursor. - open_files.remove(&path); + files.open_files.remove(&path); (VfsResponse::Ok, None) } VfsAction::WriteAll => { @@ -287,7 +383,7 @@ async fn handle_request( error: "blob needs to exist for WriteAll".into(), }); }; - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; file.write_all(&blob.bytes).await?; (VfsResponse::Ok, None) @@ -307,14 +403,14 @@ async fn handle_request( error: "blob needs to exist for Append".into(), }); }; - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::End(0)).await?; file.write_all(&blob.bytes).await?; (VfsResponse::Ok, None) } VfsAction::SyncAll => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let file = file.lock().await; file.sync_all().await?; (VfsResponse::Ok, None) @@ -324,14 +420,14 @@ async fn handle_request( (VfsResponse::Read, Some(contents)) } VfsAction::ReadToEnd => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let mut contents = Vec::new(); file.read_to_end(&mut contents).await?; (VfsResponse::Read, Some(contents)) } VfsAction::ReadExact(length) => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let mut contents = vec![0; length as usize]; file.read_exact(&mut contents).await?; @@ -355,14 +451,14 @@ async fn handle_request( (VfsResponse::ReadDir(entries), None) } VfsAction::ReadToString => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let mut contents = String::new(); file.read_to_string(&mut contents).await?; (VfsResponse::ReadToString(contents), None) } VfsAction::Seek { seek_from } => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let seek_from = match seek_from { lib::types::core::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset), @@ -374,7 +470,7 @@ async fn handle_request( } VfsAction::RemoveFile => { fs::remove_file(&path).await?; - open_files.remove(&path); + files.open_files.remove(&path); (VfsResponse::Ok, None) } VfsAction::RemoveDir => { @@ -418,7 +514,7 @@ async fn handle_request( (VfsResponse::Metadata(meta), None) } VfsAction::Len => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let file = file.lock().await; let len = file .metadata() @@ -431,7 +527,7 @@ async fn handle_request( (VfsResponse::Len(len), None) } VfsAction::SetLen(len) => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let file = file.lock().await; file.set_len(len).await.map_err(|e| VfsError::IOError { error: e.to_string(), @@ -441,7 +537,7 @@ async fn handle_request( } VfsAction::Hash => { use sha2::{Digest, Sha256}; - let file = open_file(open_files, path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::Start(0)).await?; let mut hasher = Sha256::new(); @@ -587,35 +683,6 @@ async fn parse_package_and_drive( Ok((package_id, drive, remaining_path)) } -async fn open_file>( - open_files: Arc>, Instant)>>, - path: P, - create: bool, - truncate: bool, -) -> Result>, VfsError> { - let path = path.as_ref().to_path_buf(); - Ok(match open_files.get(&path) { - Some(file) => file.value().0.clone(), - None => { - let file = Arc::new(Mutex::new( - tokio::fs::OpenOptions::new() - .read(true) - .write(true) - .create(create) - .truncate(truncate) - .open(&path) - .await - .map_err(|e| VfsError::IOError { - error: e.to_string(), - path: path.display().to_string(), - })?, - )); - open_files.insert(path, (file.clone(), Instant::now())); - file - } - }) -} - async fn check_caps( our_node: &str, source: &Address, @@ -834,6 +901,60 @@ fn get_file_type(metadata: &std::fs::Metadata) -> FileType { } } +/// helper cache for most recently used paths + +pub struct UniqueQueue +where + T: Eq + Hash, +{ + pub queue: VecDeque, + pub set: HashSet, +} + +#[allow(unused)] +impl UniqueQueue +where + T: Eq + Hash + Clone, +{ + pub fn new() -> Self { + UniqueQueue { + queue: VecDeque::new(), + set: HashSet::new(), + } + } + + pub fn push_back(&mut self, value: T) -> bool { + if self.set.insert(value.clone()) { + self.queue.push_back(value); + true + } else { + false + } + } + + pub fn pop_front(&mut self) -> Option { + if let Some(value) = self.queue.pop_front() { + self.set.remove(&value); + Some(value) + } else { + None + } + } + + pub fn contains(&self, value: &T) -> bool { + self.set.contains(value) + } + + pub fn remove(&mut self, value: &T) -> bool { + if self.set.remove(value) { + self.queue.retain(|x| x != value); + true + } else { + false + } + } +} + /// from rust/cargo/src/cargo/util/paths.rs /// to avoid using std::fs::canonicalize, which fails on non-existent paths. fn normalize_path(path: &Path) -> PathBuf {