From 44664807b0b2e1ea7d3dbacdff0b6a73f127d6f1 Mon Sep 17 00:00:00 2001 From: dr-frmr Date: Fri, 28 Jun 2024 12:54:29 +0200 Subject: [PATCH] vfs: add file cleanup --- kinode/src/vfs.rs | 130 ++++++++++++++++++++++++++++++---------------- 1 file changed, 86 insertions(+), 44 deletions(-) diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 1f45a8374..70212e072 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -10,13 +10,34 @@ use std::{ io::Read, path::{Component, Path, PathBuf}, sync::Arc, + time::{Duration, Instant}, }; use tokio::{ fs, io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, sync::Mutex, + time::interval, }; +// Constants for file cleanup +const FILE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60); +const FILE_IDLE_TIMEOUT: Duration = Duration::from_secs(300); + +/// The main VFS service function. +/// +/// This function sets up the VFS, handles incoming requests, and manages file operations. +/// It also implements a file cleanup mechanism to close idle files. +/// +/// # Arguments +/// * `our_node` - The identifier for the current node +/// * `send_to_loop` - Sender for kernel messages +/// * `send_to_terminal` - Sender for print messages +/// * `recv_from_loop` - Receiver for incoming messages +/// * `send_to_caps_oracle` - Sender for capability messages +/// * `home_directory_path` - Path to the home directory +/// +/// # Returns +/// * `anyhow::Result<()>` - Should never return Ok, but will return fatal errors. pub async fn vfs( our_node: Arc, send_to_loop: MessageSender, @@ -27,14 +48,27 @@ pub async fn vfs( ) -> anyhow::Result<()> { let vfs_path = format!("{home_directory_path}/vfs"); - if let Err(e) = fs::create_dir_all(&vfs_path).await { - panic!("failed creating vfs dir! {e:?}"); - } + fs::create_dir_all(&vfs_path) + .await + .map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?; let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?); - let open_files: Arc>>> = Arc::new(DashMap::new()); + let open_files: Arc>, Instant)>> = + Arc::new(DashMap::new()); - let process_queues: HashMap>>> = HashMap::new(); + let process_queues: HashMap>>> = + HashMap::default(); + + // Start the file cleanup task + let cleanup_open_files = open_files.clone(); + tokio::spawn(async move { + let mut interval = interval(FILE_CLEANUP_INTERVAL); + loop { + interval.tick().await; + cleanup_open_files + .retain(|_, (_, last_accessed)| last_accessed.elapsed() < FILE_IDLE_TIMEOUT); + } + }); while let Some(km) = recv_from_loop.recv().await { if *our_node != km.source.node { @@ -60,7 +94,7 @@ pub async fn vfs( queue_lock.push_back(km); } - // clone Arcs + // Clone Arcs for the new task let our_node = our_node.clone(); let send_to_loop = send_to_loop.clone(); let send_to_terminal = send_to_terminal.clone(); @@ -111,10 +145,24 @@ pub async fn vfs( Ok(()) } +/// Handles individual VFS requests. +/// +/// This function processes various VFS actions such as file operations, directory listings, etc. +/// +/// # Arguments +/// * `our_node` - The identifier for the current node +/// * `km` - The incoming kernel message +/// * `open_files` - A map of currently open files +/// * `send_to_loop` - Sender for kernel messages +/// * `send_to_caps_oracle` - Sender for capability messages +/// * `vfs_path` - The base path for the VFS +/// +/// # Returns +/// * `Result<(), VfsError>` - Result indicating success or a VFS-specific error async fn handle_request( our_node: &str, km: KernelMessage, - open_files: Arc>>>, + open_files: Arc>, Instant)>>, send_to_loop: &MessageSender, send_to_caps_oracle: &CapMessageSender, vfs_path: &PathBuf, @@ -131,14 +179,9 @@ async fn handle_request( }); }; - let request: VfsRequest = match serde_json::from_slice(&body) { - Ok(r) => r, - Err(e) => { - return Err(VfsError::BadJson { - error: e.to_string(), - }); - } - }; + let request: VfsRequest = serde_json::from_slice(&body).map_err(|e| VfsError::BadJson { + error: e.to_string(), + })?; // special case for root reading list of all drives. if request.action == VfsAction::ReadDir && request.path == "/" { @@ -191,9 +234,9 @@ async fn handle_request( let (package_id, drive, rest) = parse_package_and_drive(&request.path, &vfs_path).await?; let drive = format!("/{package_id}/{drive}"); let action = request.action; - let path = PathBuf::from(request.path); + let path = PathBuf::from(&request.path); - if &km.source.process != &*KERNEL_PROCESS_ID { + if km.source.process != *KERNEL_PROCESS_ID { check_caps( our_node, &km.source, @@ -202,7 +245,7 @@ async fn handle_request( &path, &drive, &package_id, - &vfs_path, + vfs_path, ) .await?; } @@ -212,27 +255,26 @@ async fn handle_request( let (response_body, bytes) = match action { VfsAction::CreateDrive => { - let drive_path = join_paths_safely(&vfs_path, &drive); + let drive_path = join_paths_safely(vfs_path, &drive); fs::create_dir_all(drive_path).await?; (VfsResponse::Ok, None) } VfsAction::CreateDir => { - fs::create_dir(path).await?; + fs::create_dir(&path).await?; (VfsResponse::Ok, None) } VfsAction::CreateDirAll => { - fs::create_dir_all(path).await?; + fs::create_dir_all(&path).await?; (VfsResponse::Ok, None) } VfsAction::CreateFile => { // create truncates any file that might've existed before open_files.remove(&path); - let _file = open_file(open_files, path, true, true).await?; + let _file = open_file(open_files, &path, true, true).await?; (VfsResponse::Ok, None) } VfsAction::OpenFile { create } => { - // open file opens an existing file, or creates a new one if create is true - let file = open_file(open_files, path, create, false).await?; + let file = open_file(open_files, &path, create, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::Start(0)).await?; (VfsResponse::Ok, None) @@ -249,7 +291,7 @@ async fn handle_request( error: "blob needs to exist for WriteAll".into(), }); }; - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let mut file = file.lock().await; file.write_all(&blob.bytes).await?; (VfsResponse::Ok, None) @@ -260,7 +302,7 @@ async fn handle_request( error: "blob needs to exist for Write".into(), }); }; - fs::write(path, &blob.bytes).await?; + fs::write(&path, &blob.bytes).await?; (VfsResponse::Ok, None) } VfsAction::Append => { @@ -269,14 +311,14 @@ async fn handle_request( error: "blob needs to exist for Append".into(), }); }; - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let mut file = file.lock().await; file.seek(SeekFrom::End(0)).await?; file.write_all(&blob.bytes).await?; (VfsResponse::Ok, None) } VfsAction::SyncAll => { - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let file = file.lock().await; file.sync_all().await?; (VfsResponse::Ok, None) @@ -286,25 +328,25 @@ async fn handle_request( (VfsResponse::Read, Some(contents)) } VfsAction::ReadToEnd => { - let file = open_file(open_files, path.clone(), false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let mut file = file.lock().await; let mut contents = Vec::new(); file.read_to_end(&mut contents).await?; (VfsResponse::Read, Some(contents)) } VfsAction::ReadExact(length) => { - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let mut file = file.lock().await; let mut contents = vec![0; length as usize]; file.read_exact(&mut contents).await?; (VfsResponse::Read, Some(contents)) } VfsAction::ReadDir => { - let mut dir = fs::read_dir(path).await?; + let mut dir = fs::read_dir(&path).await?; let mut entries = Vec::new(); while let Some(entry) = dir.next_entry().await? { let entry_path = entry.path(); - let relative_path = entry_path.strip_prefix(&vfs_path).unwrap_or(&entry_path); + let relative_path = entry_path.strip_prefix(vfs_path).unwrap_or(&entry_path); let metadata = entry.metadata().await?; let file_type = get_file_type(&metadata); @@ -317,14 +359,14 @@ async fn handle_request( (VfsResponse::ReadDir(entries), None) } VfsAction::ReadToString => { - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let mut file = file.lock().await; let mut contents = String::new(); file.read_to_string(&mut contents).await?; (VfsResponse::ReadToString(contents), None) } VfsAction::Seek { seek_from } => { - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let mut file = file.lock().await; let seek_from = match seek_from { lib::types::core::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset), @@ -340,21 +382,21 @@ async fn handle_request( (VfsResponse::Ok, None) } VfsAction::RemoveDir => { - fs::remove_dir(path).await?; + fs::remove_dir(&path).await?; (VfsResponse::Ok, None) } VfsAction::RemoveDirAll => { - fs::remove_dir_all(path).await?; + fs::remove_dir_all(&path).await?; (VfsResponse::Ok, None) } VfsAction::Rename { new_path } => { - let new_path = join_paths_safely(&vfs_path, &new_path); - fs::rename(path, new_path).await?; + let new_path = join_paths_safely(vfs_path, &new_path); + fs::rename(&path, new_path).await?; (VfsResponse::Ok, None) } VfsAction::CopyFile { new_path } => { - let new_path = join_paths_safely(&vfs_path, &new_path); - fs::copy(path, new_path).await?; + let new_path = join_paths_safely(vfs_path, &new_path); + fs::copy(&path, new_path).await?; (VfsResponse::Ok, None) } VfsAction::Metadata => { @@ -367,7 +409,7 @@ async fn handle_request( (VfsResponse::Metadata(meta), None) } VfsAction::Len => { - let file = open_file(open_files, path, false, false).await?; + let file = open_file(open_files, &path, false, false).await?; let file = file.lock().await; let len = file.metadata().await?.len(); (VfsResponse::Len(len), None) @@ -527,14 +569,14 @@ async fn parse_package_and_drive( } async fn open_file>( - open_files: Arc>>>, + open_files: Arc>, Instant)>>, path: P, create: bool, truncate: bool, ) -> Result>, VfsError> { let path = path.as_ref().to_path_buf(); Ok(match open_files.get(&path) { - Some(file) => file.value().clone(), + Some(file) => file.value().0.clone(), None => { let file = Arc::new(Mutex::new( tokio::fs::OpenOptions::new() @@ -549,7 +591,7 @@ async fn open_file>( path: path.display().to_string(), })?, )); - open_files.insert(path, file.clone()); + open_files.insert(path, (file.clone(), Instant::now())); file } })