Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 86 additions & 44 deletions kinode/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,34 @@ use std::{
io::Read,
path::{Component, Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
fs,
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
sync::Mutex,
time::interval,
};

// Constants for file cleanup
const FILE_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
const FILE_IDLE_TIMEOUT: Duration = Duration::from_secs(300);

/// The main VFS service function.
///
/// This function sets up the VFS, handles incoming requests, and manages file operations.
/// It also implements a file cleanup mechanism to close idle files.
///
/// # Arguments
/// * `our_node` - The identifier for the current node
/// * `send_to_loop` - Sender for kernel messages
/// * `send_to_terminal` - Sender for print messages
/// * `recv_from_loop` - Receiver for incoming messages
/// * `send_to_caps_oracle` - Sender for capability messages
/// * `home_directory_path` - Path to the home directory
///
/// # Returns
/// * `anyhow::Result<()>` - Should never return Ok, but will return fatal errors.
pub async fn vfs(
our_node: Arc<String>,
send_to_loop: MessageSender,
Expand All @@ -27,14 +48,27 @@ pub async fn vfs(
) -> anyhow::Result<()> {
let vfs_path = format!("{home_directory_path}/vfs");

if let Err(e) = fs::create_dir_all(&vfs_path).await {
panic!("failed creating vfs dir! {e:?}");
}
fs::create_dir_all(&vfs_path)
.await
.map_err(|e| anyhow::anyhow!("failed creating vfs dir! {e:?}"))?;
let vfs_path = Arc::new(fs::canonicalize(&vfs_path).await?);

let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
let open_files: Arc<DashMap<PathBuf, (Arc<Mutex<fs::File>>, Instant)>> =
Arc::new(DashMap::new());

let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> = HashMap::new();
let process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::default();

// Start the file cleanup task
let cleanup_open_files = open_files.clone();
tokio::spawn(async move {
let mut interval = interval(FILE_CLEANUP_INTERVAL);
loop {
interval.tick().await;
cleanup_open_files
.retain(|_, (_, last_accessed)| last_accessed.elapsed() < FILE_IDLE_TIMEOUT);
}
});

while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node {
Expand All @@ -60,7 +94,7 @@ pub async fn vfs(
queue_lock.push_back(km);
}

// clone Arcs
// Clone Arcs for the new task
let our_node = our_node.clone();
let send_to_loop = send_to_loop.clone();
let send_to_terminal = send_to_terminal.clone();
Expand Down Expand Up @@ -111,10 +145,24 @@ pub async fn vfs(
Ok(())
}

/// Handles individual VFS requests.
///
/// This function processes various VFS actions such as file operations, directory listings, etc.
///
/// # Arguments
/// * `our_node` - The identifier for the current node
/// * `km` - The incoming kernel message
/// * `open_files` - A map of currently open files
/// * `send_to_loop` - Sender for kernel messages
/// * `send_to_caps_oracle` - Sender for capability messages
/// * `vfs_path` - The base path for the VFS
///
/// # Returns
/// * `Result<(), VfsError>` - Result indicating success or a VFS-specific error
async fn handle_request(
our_node: &str,
km: KernelMessage,
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
open_files: Arc<DashMap<PathBuf, (Arc<Mutex<fs::File>>, Instant)>>,
send_to_loop: &MessageSender,
send_to_caps_oracle: &CapMessageSender,
vfs_path: &PathBuf,
Expand All @@ -131,14 +179,9 @@ async fn handle_request(
});
};

let request: VfsRequest = match serde_json::from_slice(&body) {
Ok(r) => r,
Err(e) => {
return Err(VfsError::BadJson {
error: e.to_string(),
});
}
};
let request: VfsRequest = serde_json::from_slice(&body).map_err(|e| VfsError::BadJson {
error: e.to_string(),
})?;

// special case for root reading list of all drives.
if request.action == VfsAction::ReadDir && request.path == "/" {
Expand Down Expand Up @@ -191,9 +234,9 @@ async fn handle_request(
let (package_id, drive, rest) = parse_package_and_drive(&request.path, &vfs_path).await?;
let drive = format!("/{package_id}/{drive}");
let action = request.action;
let path = PathBuf::from(request.path);
let path = PathBuf::from(&request.path);

if &km.source.process != &*KERNEL_PROCESS_ID {
if km.source.process != *KERNEL_PROCESS_ID {
check_caps(
our_node,
&km.source,
Expand All @@ -202,7 +245,7 @@ async fn handle_request(
&path,
&drive,
&package_id,
&vfs_path,
vfs_path,
)
.await?;
}
Expand All @@ -212,27 +255,26 @@ async fn handle_request(

let (response_body, bytes) = match action {
VfsAction::CreateDrive => {
let drive_path = join_paths_safely(&vfs_path, &drive);
let drive_path = join_paths_safely(vfs_path, &drive);
fs::create_dir_all(drive_path).await?;
(VfsResponse::Ok, None)
}
VfsAction::CreateDir => {
fs::create_dir(path).await?;
fs::create_dir(&path).await?;
(VfsResponse::Ok, None)
}
VfsAction::CreateDirAll => {
fs::create_dir_all(path).await?;
fs::create_dir_all(&path).await?;
(VfsResponse::Ok, None)
}
VfsAction::CreateFile => {
// create truncates any file that might've existed before
open_files.remove(&path);
let _file = open_file(open_files, path, true, true).await?;
let _file = open_file(open_files, &path, true, true).await?;
(VfsResponse::Ok, None)
}
VfsAction::OpenFile { create } => {
// open file opens an existing file, or creates a new one if create is true
let file = open_file(open_files, path, create, false).await?;
let file = open_file(open_files, &path, create, false).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::Start(0)).await?;
(VfsResponse::Ok, None)
Expand All @@ -249,7 +291,7 @@ async fn handle_request(
error: "blob needs to exist for WriteAll".into(),
});
};
let file = open_file(open_files, path, false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let mut file = file.lock().await;
file.write_all(&blob.bytes).await?;
(VfsResponse::Ok, None)
Expand All @@ -260,7 +302,7 @@ async fn handle_request(
error: "blob needs to exist for Write".into(),
});
};
fs::write(path, &blob.bytes).await?;
fs::write(&path, &blob.bytes).await?;
(VfsResponse::Ok, None)
}
VfsAction::Append => {
Expand All @@ -269,14 +311,14 @@ async fn handle_request(
error: "blob needs to exist for Append".into(),
});
};
let file = open_file(open_files, path, false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::End(0)).await?;
file.write_all(&blob.bytes).await?;
(VfsResponse::Ok, None)
}
VfsAction::SyncAll => {
let file = open_file(open_files, path, false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let file = file.lock().await;
file.sync_all().await?;
(VfsResponse::Ok, None)
Expand All @@ -286,25 +328,25 @@ async fn handle_request(
(VfsResponse::Read, Some(contents))
}
VfsAction::ReadToEnd => {
let file = open_file(open_files, path.clone(), false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let mut file = file.lock().await;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
(VfsResponse::Read, Some(contents))
}
VfsAction::ReadExact(length) => {
let file = open_file(open_files, path, false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let mut file = file.lock().await;
let mut contents = vec![0; length as usize];
file.read_exact(&mut contents).await?;
(VfsResponse::Read, Some(contents))
}
VfsAction::ReadDir => {
let mut dir = fs::read_dir(path).await?;
let mut dir = fs::read_dir(&path).await?;
let mut entries = Vec::new();
while let Some(entry) = dir.next_entry().await? {
let entry_path = entry.path();
let relative_path = entry_path.strip_prefix(&vfs_path).unwrap_or(&entry_path);
let relative_path = entry_path.strip_prefix(vfs_path).unwrap_or(&entry_path);

let metadata = entry.metadata().await?;
let file_type = get_file_type(&metadata);
Expand All @@ -317,14 +359,14 @@ async fn handle_request(
(VfsResponse::ReadDir(entries), None)
}
VfsAction::ReadToString => {
let file = open_file(open_files, path, false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let mut file = file.lock().await;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
(VfsResponse::ReadToString(contents), None)
}
VfsAction::Seek { seek_from } => {
let file = open_file(open_files, path, false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let mut file = file.lock().await;
let seek_from = match seek_from {
lib::types::core::SeekFrom::Start(offset) => std::io::SeekFrom::Start(offset),
Expand All @@ -340,21 +382,21 @@ async fn handle_request(
(VfsResponse::Ok, None)
}
VfsAction::RemoveDir => {
fs::remove_dir(path).await?;
fs::remove_dir(&path).await?;
(VfsResponse::Ok, None)
}
VfsAction::RemoveDirAll => {
fs::remove_dir_all(path).await?;
fs::remove_dir_all(&path).await?;
(VfsResponse::Ok, None)
}
VfsAction::Rename { new_path } => {
let new_path = join_paths_safely(&vfs_path, &new_path);
fs::rename(path, new_path).await?;
let new_path = join_paths_safely(vfs_path, &new_path);
fs::rename(&path, new_path).await?;
(VfsResponse::Ok, None)
}
VfsAction::CopyFile { new_path } => {
let new_path = join_paths_safely(&vfs_path, &new_path);
fs::copy(path, new_path).await?;
let new_path = join_paths_safely(vfs_path, &new_path);
fs::copy(&path, new_path).await?;
(VfsResponse::Ok, None)
}
VfsAction::Metadata => {
Expand All @@ -367,7 +409,7 @@ async fn handle_request(
(VfsResponse::Metadata(meta), None)
}
VfsAction::Len => {
let file = open_file(open_files, path, false, false).await?;
let file = open_file(open_files, &path, false, false).await?;
let file = file.lock().await;
let len = file.metadata().await?.len();
(VfsResponse::Len(len), None)
Expand Down Expand Up @@ -527,14 +569,14 @@ async fn parse_package_and_drive(
}

async fn open_file<P: AsRef<Path>>(
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
open_files: Arc<DashMap<PathBuf, (Arc<Mutex<fs::File>>, Instant)>>,
path: P,
create: bool,
truncate: bool,
) -> Result<Arc<Mutex<fs::File>>, VfsError> {
let path = path.as_ref().to_path_buf();
Ok(match open_files.get(&path) {
Some(file) => file.value().clone(),
Some(file) => file.value().0.clone(),
None => {
let file = Arc::new(Mutex::new(
tokio::fs::OpenOptions::new()
Expand All @@ -549,7 +591,7 @@ async fn open_file<P: AsRef<Path>>(
path: path.display().to_string(),
})?,
));
open_files.insert(path, file.clone());
open_files.insert(path, (file.clone(), Instant::now()));
file
}
})
Expand Down