Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 46 additions & 48 deletions kinode/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,57 +31,55 @@ pub async fn vfs(
HashMap::new();

loop {
tokio::select! {
Some(km) = recv_from_loop.recv() => {
if our_node.clone() != km.source.node {
println!(
"vfs: request must come from our_node={}, got: {}",
our_node,
km.source.node,
);
continue;
}

let queue = process_queues
.entry(km.source.process.clone())
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone();

let Some(km) = recv_from_loop.recv().await else {
continue;
};
if our_node.clone() != km.source.node {
println!(
"vfs: request must come from our_node={}, got: {}",
our_node, km.source.node,
);
continue;
}

let queue = process_queues
.entry(km.source.process.clone())
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone();

{
let mut queue_lock = queue.lock().await;
queue_lock.push_back(km.clone());
}

// clone Arcs
let our_node = our_node.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_loop = send_to_loop.clone();
let open_files = open_files.clone();
let vfs_path = vfs_path.clone();

tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
if let Some(km) = queue_lock.pop_front() {
if let Err(e) = handle_request(
our_node.clone(),
km.clone(),
open_files.clone(),
send_to_loop.clone(),
send_to_terminal.clone(),
send_to_caps_oracle.clone(),
vfs_path.clone(),
)
.await
{
let mut queue_lock = queue.lock().await;
queue_lock.push_back(km.clone());
let _ = send_to_loop
.send(make_error_message(our_node.clone(), km.id, km.source, e))
.await;
}

// clone Arcs
let our_node = our_node.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_loop = send_to_loop.clone();
let open_files = open_files.clone();
let vfs_path = vfs_path.clone();

tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
if let Some(km) = queue_lock.pop_front() {
if let Err(e) = handle_request(
our_node.clone(),
km.clone(),
open_files.clone(),
send_to_loop.clone(),
send_to_terminal.clone(),
send_to_caps_oracle.clone(),
vfs_path.clone(),
)
.await
{
let _ = send_to_loop
.send(make_error_message(our_node.clone(), km.id, km.source, e))
.await;
}
}
});
}
}
});
}
}

Expand Down