diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 6dba6cfb6..50b6a5e17 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -31,57 +31,55 @@ pub async fn vfs( HashMap::new(); loop { - tokio::select! { - Some(km) = recv_from_loop.recv() => { - if our_node.clone() != km.source.node { - println!( - "vfs: request must come from our_node={}, got: {}", - our_node, - km.source.node, - ); - continue; - } - - let queue = process_queues - .entry(km.source.process.clone()) - .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - .clone(); - + let Some(km) = recv_from_loop.recv().await else { + continue; + }; + if our_node.clone() != km.source.node { + println!( + "vfs: request must come from our_node={}, got: {}", + our_node, km.source.node, + ); + continue; + } + + let queue = process_queues + .entry(km.source.process.clone()) + .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) + .clone(); + + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km.clone()); + } + + // clone Arcs + let our_node = our_node.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); + let send_to_terminal = send_to_terminal.clone(); + let send_to_loop = send_to_loop.clone(); + let open_files = open_files.clone(); + let vfs_path = vfs_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + if let Err(e) = handle_request( + our_node.clone(), + km.clone(), + open_files.clone(), + send_to_loop.clone(), + send_to_terminal.clone(), + send_to_caps_oracle.clone(), + vfs_path.clone(), + ) + .await { - let mut queue_lock = queue.lock().await; - queue_lock.push_back(km.clone()); + let _ = send_to_loop + .send(make_error_message(our_node.clone(), km.id, km.source, e)) + .await; } - - // clone Arcs - let our_node = our_node.clone(); - let send_to_caps_oracle = send_to_caps_oracle.clone(); - let send_to_terminal = send_to_terminal.clone(); - let send_to_loop = send_to_loop.clone(); - let open_files = open_files.clone(); - let vfs_path = vfs_path.clone(); - - tokio::spawn(async move { - let mut queue_lock = queue.lock().await; - if let Some(km) = queue_lock.pop_front() { - if let Err(e) = handle_request( - our_node.clone(), - km.clone(), - open_files.clone(), - send_to_loop.clone(), - send_to_terminal.clone(), - send_to_caps_oracle.clone(), - vfs_path.clone(), - ) - .await - { - let _ = send_to_loop - .send(make_error_message(our_node.clone(), km.id, km.source, e)) - .await; - } - } - }); } - } + }); } }