From eabe9c971c947606a4a27d374fbab00f985747a8 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Tue, 10 Sep 2024 15:45:52 +0300 Subject: [PATCH 1/3] vfs: add cleanup upon reaching max open files --- kinode/src/vfs.rs | 202 +++++++++++++++++++++++++--------------------- 1 file changed, 112 insertions(+), 90 deletions(-) diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 536eb9177..c923d2c5e 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -1,4 +1,4 @@ -use dashmap::DashMap; +use dashmap::{mapref::entry::Entry, DashMap}; use lib::types::core::{ Address, CapMessage, CapMessageSender, Capability, DirEntry, FileMetadata, FileType, KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, @@ -6,6 +6,7 @@ use lib::types::core::{ KERNEL_PROCESS_ID, VFS_PROCESS_ID, }; use std::{ + cmp::max, collections::{HashMap, VecDeque}, io::Read, path::{Component, Path, PathBuf}, @@ -21,7 +22,8 @@ use tokio::{ // Constants for file cleanup const FILE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); -const FILE_IDLE_TIMEOUT: Duration = Duration::from_secs(300); +const FILE_IDLE_TIMEOUT: Duration = Duration::from_secs(50); +const MAX_OPEN_FILES: usize = 180; /// The main VFS service function. /// @@ -59,86 +61,83 @@ pub async fn vfs( let process_queues: HashMap>>> = HashMap::default(); - // Start the file cleanup task - let cleanup_open_files = open_files.clone(); - tokio::spawn(async move { - let mut interval = interval(FILE_CLEANUP_INTERVAL); - loop { - interval.tick().await; - cleanup_open_files - .retain(|_, (_, last_accessed)| last_accessed.elapsed() < FILE_IDLE_TIMEOUT); - } - }); - - while let Some(km) = recv_from_loop.recv().await { - if *our_node != km.source.node { - Printout::new( - 1, - format!( - "vfs: got request from {}, but requests must come from our node {our_node}", - km.source.node - ), - ) - .send(&send_to_terminal) - .await; - continue; - } - - let queue = process_queues - .get(&km.source.process) - .cloned() - .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); - - { - let mut queue_lock = queue.lock().await; - queue_lock.push_back(km); - } - - // Clone Arcs for the new task - let our_node = our_node.clone(); - let send_to_loop = send_to_loop.clone(); - let send_to_caps_oracle = send_to_caps_oracle.clone(); - let open_files = open_files.clone(); - let vfs_path = vfs_path.clone(); - - tokio::spawn(async move { - let mut queue_lock = queue.lock().await; - if let Some(km) = queue_lock.pop_front() { - let (km_id, km_rsvp) = - (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); - - if let Err(e) = handle_request( - &our_node, - km, - open_files, - &send_to_loop, - &send_to_caps_oracle, - &vfs_path, - ) - .await - { - KernelMessage::builder() - .id(km_id) - .source((our_node.as_str(), VFS_PROCESS_ID.clone())) - .target(km_rsvp) - .message(Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&VfsResponse::Err(e)).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - ))) - .build() - .unwrap() - .send(&send_to_loop) - .await; + let mut cleanup_interval = interval(FILE_CLEANUP_INTERVAL); + + loop { + tokio::select! { + _ = cleanup_interval.tick() => { + let now = Instant::now(); + open_files.retain(|_, (_, last_accessed)| now.duration_since(*last_accessed) < FILE_IDLE_TIMEOUT); + } + Some(km) = recv_from_loop.recv() => { + if *our_node != km.source.node { + Printout::new( + 1, + format!( + "vfs: got request from {}, but requests must come from our node {our_node}", + km.source.node + ), + ) + .send(&send_to_terminal) + .await; + continue; } + let queue = process_queues + .get(&km.source.process) + .cloned() + .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); + + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km); } - }); + + // Clone Arcs for the new task + let our_node = our_node.clone(); + let send_to_loop = send_to_loop.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); + let open_files = open_files.clone(); + let vfs_path = vfs_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + let (km_id, km_rsvp) = + (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); + + if let Err(e) = handle_request( + &our_node, + km, + open_files, + &send_to_loop, + &send_to_caps_oracle, + &vfs_path, + ) + .await + { + KernelMessage::builder() + .id(km_id) + .source((our_node.as_str(), VFS_PROCESS_ID.clone())) + .target(km_rsvp) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&VfsResponse::Err(e)).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&send_to_loop) + .await; + } + } + }); + } + } } - Ok(()) } /// Handles individual VFS requests. @@ -594,9 +593,18 @@ async fn open_file>( truncate: bool, ) -> Result>, VfsError> { let path = path.as_ref().to_path_buf(); - Ok(match open_files.get(&path) { - Some(file) => file.value().0.clone(), - None => { + + if open_files.len() >= MAX_OPEN_FILES { + cleanup_files(&open_files); + } + + // avoid race conditions + match open_files.entry(path.clone()) { + Entry::Occupied(mut entry) => { + entry.get_mut().1 = Instant::now(); // update last access time + Ok(entry.get().0.clone()) + } + Entry::Vacant(entry) => { let file = Arc::new(Mutex::new( tokio::fs::OpenOptions::new() .read(true) @@ -604,16 +612,30 @@ async fn open_file>( .create(create) .truncate(truncate) .open(&path) - .await - .map_err(|e| VfsError::IOError { - error: e.to_string(), - path: path.display().to_string(), - })?, + .await?, )); - open_files.insert(path, (file.clone(), Instant::now())); - file + entry.insert((file.clone(), Instant::now())); + Ok(file) } - }) + } +} + +fn cleanup_files(open_files: &DashMap>, Instant)>) { + let current_count = open_files.len(); + let target_count = max(current_count / 2, MAX_OPEN_FILES / 2); + + // collect all entries, sort by last access time + let mut entries: Vec<_> = open_files + .iter() + .map(|entry| (entry.key().clone(), entry.value().1)) + .collect(); + entries.sort_by(|a, b| a.1.cmp(&b.1)); + + // remove oldest entries until we reach the target count + for (path, _) in entries.into_iter().take(current_count - target_count) { + // use remove_if_present to avoid potential race conditions + open_files.remove_if(&path, |_, _| true); + } } async fn check_caps( From 41998cf27d7e8b57bb2776a1cf54e9df91d2495a Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 11 Sep 2024 22:18:09 +0300 Subject: [PATCH 2/3] vfs: hybrid file pruning --- kinode/src/vfs.rs | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index c923d2c5e..ebedf0aff 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -622,19 +622,36 @@ async fn open_file>( fn cleanup_files(open_files: &DashMap>, Instant)>) { let current_count = open_files.len(); - let target_count = max(current_count / 2, MAX_OPEN_FILES / 2); + let target_count: usize = max(current_count / 2, MAX_OPEN_FILES / 2); - // collect all entries, sort by last access time - let mut entries: Vec<_> = open_files + if current_count <= MAX_OPEN_FILES { + return; + } + + // calculate average last access time + let now = Instant::now(); + let sum_duration: Duration = open_files .iter() - .map(|entry| (entry.key().clone(), entry.value().1)) - .collect(); - entries.sort_by(|a, b| a.1.cmp(&b.1)); - - // remove oldest entries until we reach the target count - for (path, _) in entries.into_iter().take(current_count - target_count) { - // use remove_if_present to avoid potential race conditions - open_files.remove_if(&path, |_, _| true); + .map(|entry| now.duration_since(entry.value().1)) + .sum(); + let avg_duration = sum_duration / current_count as u32; + + // first pass: remove files older than average + open_files.retain(|_, (file, last_access)| { + now.duration_since(*last_access) < avg_duration || Arc::strong_count(file) > 1 + }); + + // second pass: if we're still over target, remove oldest files + if open_files.len() > target_count { + let mut entries: Vec<_> = open_files + .iter() + .map(|entry| (entry.key().clone(), entry.value().1)) + .collect(); + entries.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by most recent first + + for (path, _) in entries.into_iter().skip(target_count) { + open_files.remove_if(&path, |_, (file, _)| Arc::strong_count(file) == 1); + } } } From 688dade8e18a52bbf9cf2cedde278833ae96b2da Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Thu, 12 Sep 2024 21:43:08 +0300 Subject: [PATCH 3/3] vfs: max file pruning improvements --- kinode/src/vfs.rs | 410 +++++++++++++++++++++++++++------------------- 1 file changed, 246 insertions(+), 164 deletions(-) diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index ebedf0aff..9baad6be0 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -1,4 +1,4 @@ -use dashmap::{mapref::entry::Entry, DashMap}; +use dashmap::DashMap; use lib::types::core::{ Address, CapMessage, CapMessageSender, Capability, DirEntry, FileMetadata, FileType, KernelMessage, LazyLoadBlob, Message, MessageReceiver, MessageSender, PackageId, PrintSender, @@ -6,23 +6,20 @@ use lib::types::core::{ KERNEL_PROCESS_ID, VFS_PROCESS_ID, }; use std::{ - cmp::max, - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, + hash::Hash, io::Read, path::{Component, Path, PathBuf}, sync::Arc, - time::{Duration, Instant}, + time::Instant, }; use tokio::{ fs, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, sync::Mutex, - time::interval, }; // Constants for file cleanup -const FILE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); -const FILE_IDLE_TIMEOUT: Duration = Duration::from_secs(50); const MAX_OPEN_FILES: usize = 180; /// The main VFS service function. @@ -55,88 +52,188 @@ pub async fn vfs( .map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?; let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?); - let open_files: Arc>, Instant)>> = - Arc::new(DashMap::new()); + let files = Files::new(); let process_queues: HashMap>>> = HashMap::default(); - let mut cleanup_interval = interval(FILE_CLEANUP_INTERVAL); - - loop { - tokio::select! { - _ = cleanup_interval.tick() => { - let now = Instant::now(); - open_files.retain(|_, (_, last_accessed)| now.duration_since(*last_accessed) < FILE_IDLE_TIMEOUT); - } - Some(km) = recv_from_loop.recv() => { - if *our_node != km.source.node { - Printout::new( - 1, - format!( - "vfs: got request from {}, but requests must come from our node {our_node}", - km.source.node - ), - ) - .send(&send_to_terminal) - .await; - continue; + while let Some(km) = recv_from_loop.recv().await { + if *our_node != km.source.node { + Printout::new( + 1, + format!( + "vfs: got request from {}, but requests must come from our node {our_node}", + km.source.node + ), + ) + .send(&send_to_terminal) + .await; + continue; + } + + let queue = process_queues + .get(&km.source.process) + .cloned() + .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); + + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km); + } + + // Clone Arcs for the new task + let our_node = our_node.clone(); + let send_to_loop = send_to_loop.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); + let files = files.clone(); + let vfs_path = vfs_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + let (km_id, km_rsvp) = + (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); + + if let Err(e) = handle_request( + &our_node, + km, + files, + &send_to_loop, + &send_to_caps_oracle, + &vfs_path, + ) + .await + { + KernelMessage::builder() + .id(km_id) + .source((our_node.as_str(), VFS_PROCESS_ID.clone())) + .target(km_rsvp) + .message(Message::Response(( + Response { + inherit: false, + body: serde_json::to_vec(&VfsResponse::Err(e)).unwrap(), + metadata: None, + capabilities: vec![], + }, + None, + ))) + .build() + .unwrap() + .send(&send_to_loop) + .await; } - let queue = process_queues - .get(&km.source.process) - .cloned() - .unwrap_or_else(|| Arc::new(Mutex::new(VecDeque::new()))); - - { - let mut queue_lock = queue.lock().await; - queue_lock.push_back(km); } + }); + } + Ok(()) +} + +/// Helper struct to manage open files, cursor positions, and access order. +#[derive(Clone)] +struct Files { + /// currently open files, with last access time + open_files: Arc>, + /// cursor positions for files closed to avoid too many open files OS error + cursor_positions: Arc>, + /// access order of files + access_order: Arc>>, +} - // Clone Arcs for the new task - let our_node = our_node.clone(); - let send_to_loop = send_to_loop.clone(); - let send_to_caps_oracle = send_to_caps_oracle.clone(); - let open_files = open_files.clone(); - let vfs_path = vfs_path.clone(); - - tokio::spawn(async move { - let mut queue_lock = queue.lock().await; - if let Some(km) = queue_lock.pop_front() { - let (km_id, km_rsvp) = - (km.id.clone(), km.rsvp.clone().unwrap_or(km.source.clone())); - - if let Err(e) = handle_request( - &our_node, - km, - open_files, - &send_to_loop, - &send_to_caps_oracle, - &vfs_path, - ) - .await - { - KernelMessage::builder() - .id(km_id) - .source((our_node.as_str(), VFS_PROCESS_ID.clone())) - .target(km_rsvp) - .message(Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&VfsResponse::Err(e)).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - ))) - .build() - .unwrap() - .send(&send_to_loop) - .await; +struct FileEntry { + file: Arc>, + last_access: Instant, +} + +impl Files { + pub fn new() -> Self { + Self { + open_files: Arc::new(DashMap::new()), + cursor_positions: Arc::new(DashMap::new()), + access_order: Arc::new(Mutex::new(UniqueQueue::new())), + } + } + + pub async fn open_file>( + &self, + path: P, + create: bool, + truncate: bool, + ) -> Result>, VfsError> { + let path = path.as_ref().to_path_buf(); + + if let Some(mut entry) = self.open_files.get_mut(&path) { + entry.value_mut().last_access = Instant::now(); + self.update_access_order(&path).await; + return Ok(entry.value().file.clone()); + } + + if self.open_files.len() >= MAX_OPEN_FILES { + self.close_least_recently_used_files().await?; + } + + let mut file = self.try_open_file(&path, create, truncate).await?; + if let Some(position) = self.cursor_positions.get(&path) { + file.seek(SeekFrom::Start(*position)).await?; + } + let file = Arc::new(Mutex::new(file)); + self.open_files.insert( + path.clone(), + FileEntry { + file: file.clone(), + last_access: Instant::now(), + }, + ); + self.update_access_order(&path).await; + Ok(file) + } + + async fn update_access_order(&self, path: &Path) { + let mut access_order = self.access_order.lock().await; + access_order.push_back(path.to_path_buf()); + } + + async fn close_least_recently_used_files(&self) -> Result<(), VfsError> { + let mut access_order = self.access_order.lock().await; + let mut closed = 0; + let to_close = MAX_OPEN_FILES / 3; // close 33% of max open files + + while closed < to_close { + if let Some(path) = access_order.pop_front() { + if let Some((_, file_entry)) = self.open_files.remove(&path) { + if Arc::strong_count(&file_entry.file) == 1 { + let mut file = file_entry.file.lock().await; + if let Ok(position) = file.stream_position().await { + if position != 0 { + self.cursor_positions.insert(path, position); + } + } + closed += 1; + } else { + // file is still in use, put it back + self.open_files.insert(path.clone(), file_entry); + access_order.push_back(path); } } - }); + } else { + break; // no more files to close } } + Ok(()) + } + + async fn try_open_file( + &self, + path: &Path, + create: bool, + truncate: bool, + ) -> Result { + tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .create(create) + .truncate(truncate) + .open(path) + .await } } @@ -147,7 +244,7 @@ pub async fn vfs( /// # Arguments /// * `our_node` - The identifier for the current node /// * `km` - The incoming kernel message -/// * `open_files` - A map of currently open files +/// * `files` - A struct containing open_files, cursor_positions, and access_order /// * `send_to_loop` - Sender for kernel messages /// * `send_to_caps_oracle` - Sender for capability messages /// * `vfs_path` - The base path for the VFS @@ -157,7 +254,7 @@ pub async fn vfs( async fn handle_request( our_node: &str, km: KernelMessage, - open_files: Arc>, Instant)>>, + files: Files, send_to_loop: &MessageSender, send_to_caps_oracle: &CapMessageSender, vfs_path: &PathBuf, @@ -264,19 +361,19 @@ async fn handle_request( } VfsAction::CreateFile => { // create truncates any file that might've existed before - open_files.remove(&path); - let _file = open_file(open_files, &path, true, true).await?; + files.open_files.remove(&path); + let _file = files.open_file(&path, true, true).await?; (VfsResponse::Ok, None) } VfsAction::OpenFile { create } => { - let file = open_file(open_files, &path, create, false).await?; + let file = files.open_file(&path, create, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::Start(0)).await?; (VfsResponse::Ok, None) } VfsAction::CloseFile => { // removes file from scope, resets file_handle and cursor. - open_files.remove(&path); + files.open_files.remove(&path); (VfsResponse::Ok, None) } VfsAction::WriteAll => { @@ -286,7 +383,7 @@ async fn handle_request( error: "blob needs to exist for WriteAll".into(), }); }; - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; file.write_all(&blob.bytes).await?; (VfsResponse::Ok, None) @@ -306,14 +403,14 @@ async fn handle_request( error: "blob needs to exist for Append".into(), }); }; - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::End(0)).await?; file.write_all(&blob.bytes).await?; (VfsResponse::Ok, None) } VfsAction::SyncAll => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let file = file.lock().await; file.sync_all().await?; (VfsResponse::Ok, None) @@ -323,14 +420,14 @@ async fn handle_request( (VfsResponse::Read, Some(contents)) } VfsAction::ReadToEnd => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let mut contents = Vec::new(); file.read_to_end(&mut contents).await?; (VfsResponse::Read, Some(contents)) } VfsAction::ReadExact(length) => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let mut contents = vec![0; length as usize]; file.read_exact(&mut contents).await?; @@ -354,14 +451,14 @@ async fn handle_request( (VfsResponse::ReadDir(entries), None) } VfsAction::ReadToString => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let mut contents = String::new(); file.read_to_string(&mut contents).await?; (VfsResponse::ReadToString(contents), None) } VfsAction::Seek { seek_from } => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; let seek_from = match seek_from { lib::types::core::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset), @@ -373,7 +470,7 @@ async fn handle_request( } VfsAction::RemoveFile => { fs::remove_file(&path).await?; - open_files.remove(&path); + files.open_files.remove(&path); (VfsResponse::Ok, None) } VfsAction::RemoveDir => { @@ -417,7 +514,7 @@ async fn handle_request( (VfsResponse::Metadata(meta), None) } VfsAction::Len => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let file = file.lock().await; let len = file .metadata() @@ -430,7 +527,7 @@ async fn handle_request( (VfsResponse::Len(len), None) } VfsAction::SetLen(len) => { - let file = open_file(open_files, &path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let file = file.lock().await; file.set_len(len).await.map_err(|e| VfsError::IOError { error: e.to_string(), @@ -440,7 +537,7 @@ async fn handle_request( } VfsAction::Hash => { use sha2::{Digest, Sha256}; - let file = open_file(open_files, path, false, false).await?; + let file = files.open_file(&path, false, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::Start(0)).await?; let mut hasher = Sha256::new(); @@ -586,75 +683,6 @@ async fn parse_package_and_drive( Ok((package_id, drive, remaining_path)) } -async fn open_file>( - open_files: Arc>, Instant)>>, - path: P, - create: bool, - truncate: bool, -) -> Result>, VfsError> { - let path = path.as_ref().to_path_buf(); - - if open_files.len() >= MAX_OPEN_FILES { - cleanup_files(&open_files); - } - - // avoid race conditions - match open_files.entry(path.clone()) { - Entry::Occupied(mut entry) => { - entry.get_mut().1 = Instant::now(); // update last access time - Ok(entry.get().0.clone()) - } - Entry::Vacant(entry) => { - let file = Arc::new(Mutex::new( - tokio::fs::OpenOptions::new() - .read(true) - .write(true) - .create(create) - .truncate(truncate) - .open(&path) - .await?, - )); - entry.insert((file.clone(), Instant::now())); - Ok(file) - } - } -} - -fn cleanup_files(open_files: &DashMap>, Instant)>) { - let current_count = open_files.len(); - let target_count: usize = max(current_count / 2, MAX_OPEN_FILES / 2); - - if current_count <= MAX_OPEN_FILES { - return; - } - - // calculate average last access time - let now = Instant::now(); - let sum_duration: Duration = open_files - .iter() - .map(|entry| now.duration_since(entry.value().1)) - .sum(); - let avg_duration = sum_duration / current_count as u32; - - // first pass: remove files older than average - open_files.retain(|_, (file, last_access)| { - now.duration_since(*last_access) < avg_duration || Arc::strong_count(file) > 1 - }); - - // second pass: if we're still over target, remove oldest files - if open_files.len() > target_count { - let mut entries: Vec<_> = open_files - .iter() - .map(|entry| (entry.key().clone(), entry.value().1)) - .collect(); - entries.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by most recent first - - for (path, _) in entries.into_iter().skip(target_count) { - open_files.remove_if(&path, |_, (file, _)| Arc::strong_count(file) == 1); - } - } -} - async fn check_caps( our_node: &str, source: &Address, @@ -873,6 +901,60 @@ fn get_file_type(metadata: &std::fs::Metadata) -> FileType { } } +/// helper cache for most recently used paths + +pub struct UniqueQueue +where + T: Eq + Hash, +{ + pub queue: VecDeque, + pub set: HashSet, +} + +#[allow(unused)] +impl UniqueQueue +where + T: Eq + Hash + Clone, +{ + pub fn new() -> Self { + UniqueQueue { + queue: VecDeque::new(), + set: HashSet::new(), + } + } + + pub fn push_back(&mut self, value: T) -> bool { + if self.set.insert(value.clone()) { + self.queue.push_back(value); + true + } else { + false + } + } + + pub fn pop_front(&mut self) -> Option { + if let Some(value) = self.queue.pop_front() { + self.set.remove(&value); + Some(value) + } else { + None + } + } + + pub fn contains(&self, value: &T) -> bool { + self.set.contains(value) + } + + pub fn remove(&mut self, value: &T) -> bool { + if self.set.remove(value) { + self.queue.retain(|x| x != value); + true + } else { + false + } + } +} + /// from rust/cargo/src/cargo/util/paths.rs /// to avoid using std::fs::canonicalize, which fails on non-existent paths. fn normalize_path(path: &Path) -> PathBuf {