Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion kinode/packages/terminal/cat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@ fn init(_our: Address) {
println!("no file found at {}", file_path);
return;
};
println!("{}", String::from_utf8(blob.bytes).unwrap());
match String::from_utf8(blob.bytes) {
Ok(s) => println!("{s}"),
Err(_e) => println!("error: file at {file_path} could not be parsed as utf-8 string!"),
}
}
125 changes: 58 additions & 67 deletions kinode/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub async fn vfs(
send_to_caps_oracle: CapMessageSender,
home_directory_path: String,
) -> anyhow::Result<()> {
let vfs_path = format!("{}/vfs", &home_directory_path);
let our_node = Arc::new(our_node);
let vfs_path = format!("{home_directory_path}/vfs");

if let Err(e) = fs::create_dir_all(&vfs_path).await {
panic!("failed creating vfs dir! {:?}", e);
Expand All @@ -30,15 +31,15 @@ pub async fn vfs(
let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::new();

loop {
let Some(km) = recv_from_loop.recv().await else {
continue;
};
if our_node.clone() != km.source.node {
println!(
"vfs: request must come from our_node={}, got: {}",
our_node, km.source.node,
);
while let Some(km) = recv_from_loop.recv().await {
if *our_node != km.source.node {
let _ = send_to_terminal.send(Printout {
verbosity: 1,
content: format!(
"vfs: got request from {}, but requests must come from our node {our_node}\r",
km.source.node,
),
});
continue;
}

Expand All @@ -63,48 +64,49 @@ pub async fn vfs(
tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
if let Some(km) = queue_lock.pop_front() {
let (km_id, km_source) = (km.id.clone(), km.source.clone());

if let Err(e) = handle_request(
our_node.clone(),
km.clone(),
&our_node,
km,
open_files.clone(),
send_to_loop.clone(),
send_to_terminal.clone(),
send_to_caps_oracle.clone(),
vfs_path.clone(),
&send_to_loop,
&send_to_terminal,
&send_to_caps_oracle,
&vfs_path,
)
.await
{
let _ = send_to_loop
.send(make_error_message(our_node.clone(), km.id, km.source, e))
.send(make_error_message(
our_node.to_string(),
km_id,
km_source,
e,
))
.await;
}
}
});
}
Ok(())
}

async fn handle_request(
our_node: String,
our_node: &str,
km: KernelMessage,
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
send_to_loop: MessageSender,
send_to_terminal: PrintSender,
send_to_caps_oracle: CapMessageSender,
vfs_path: PathBuf,
send_to_loop: &MessageSender,
send_to_terminal: &PrintSender,
send_to_caps_oracle: &CapMessageSender,
vfs_path: &PathBuf,
) -> Result<(), VfsError> {
let KernelMessage {
id,
source,
message,
lazy_load_blob: blob,
..
} = km.clone();
let Message::Request(Request {
body,
expects_response,
metadata,
..
}) = message.clone()
}) = km.message
else {
return Err(VfsError::BadRequest {
error: "not a request".into(),
Expand All @@ -114,7 +116,6 @@ async fn handle_request(
let request: VfsRequest = match serde_json::from_slice(&body) {
Ok(r) => r,
Err(e) => {
println!("vfs: got invalid Request: {}", e);
return Err(VfsError::BadJson {
error: e.to_string(),
});
Expand All @@ -127,10 +128,10 @@ async fn handle_request(
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
on: km.source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
Expand Down Expand Up @@ -159,12 +160,12 @@ async fn handle_request(
}

let response = KernelMessage {
id,
id: km.id,
source: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
target: source,
target: km.source,
rsvp: None,
message: Message::Response((
Response {
Expand Down Expand Up @@ -194,10 +195,10 @@ async fn handle_request(
let drive = format!("/{}/{}", package_id, drive);
let path = PathBuf::from(request.path.clone());

if km.source.process != *KERNEL_PROCESS_ID {
if &km.source.process != &*KERNEL_PROCESS_ID {
check_caps(
our_node.clone(),
source.clone(),
our_node,
km.source.clone(),
send_to_caps_oracle.clone(),
&request,
path.clone(),
Expand Down Expand Up @@ -249,7 +250,7 @@ async fn handle_request(
}
VfsAction::WriteAll => {
// doesn't create a file, writes at exact cursor.
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for WriteAll".into(),
});
Expand All @@ -260,7 +261,7 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Write => {
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for Write".into(),
});
Expand All @@ -269,7 +270,7 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Append => {
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for Append".into(),
});
Expand Down Expand Up @@ -428,7 +429,7 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Hash(hash)).unwrap(), None)
}
VfsAction::AddZip => {
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for AddZip".into(),
});
Expand Down Expand Up @@ -470,7 +471,6 @@ async fn handle_request(
} else if is_dir {
fs::create_dir_all(local_path).await?;
} else {
println!("vfs: zip with non-file non-dir");
return Err(VfsError::CreateDirError {
path: path.display().to_string(),
error: "vfs: zip with non-file non-dir".into(),
Expand All @@ -483,14 +483,14 @@ async fn handle_request(

if let Some(target) = km.rsvp.or_else(|| {
expects_response.map(|_| Address {
node: our_node.clone(),
process: source.process.clone(),
node: our_node.to_string(),
process: km.source.process.clone(),
})
}) {
let response = KernelMessage {
id,
id: km.id,
source: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
target,
Expand All @@ -512,7 +512,6 @@ async fn handle_request(

let _ = send_to_loop.send(response).await;
} else {
println!("vfs: not sending response: ");
send_to_terminal
.send(Printout {
verbosity: 2,
Expand All @@ -538,23 +537,15 @@ async fn parse_package_and_drive(
let normalized_path = normalize_path(&joined_path);
if !normalized_path.starts_with(vfs_path) {
return Err(VfsError::BadRequest {
error: format!(
"input path tries to escape parent vfs directory: {:?}",
path
)
.into(),
error: format!("input path tries to escape parent vfs directory: {path}"),
})?;
}

// extract original path.
let path = normalized_path
.strip_prefix(vfs_path)
.map_err(|_| VfsError::BadRequest {
error: format!(
"input path tries to escape parent vfs directory: {:?}",
path
)
.into(),
error: format!("input path tries to escape parent vfs directory: {path}"),
})?
.display()
.to_string();
Expand All @@ -567,7 +558,7 @@ async fn parse_package_and_drive(
if parts.len() < 2 {
return Err(VfsError::ParseError {
error: "malformed path".into(),
path: path.to_string(),
path,
});
}

Expand All @@ -576,7 +567,7 @@ async fn parse_package_and_drive(
Err(e) => {
return Err(VfsError::ParseError {
error: e.to_string(),
path: path.to_string(),
path,
})
}
};
Expand Down Expand Up @@ -617,7 +608,7 @@ async fn open_file<P: AsRef<Path>>(
}

async fn check_caps(
our_node: String,
our_node: &str,
source: Address,
mut send_to_caps_oracle: CapMessageSender,
request: &VfsRequest,
Expand All @@ -635,7 +626,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
Expand Down Expand Up @@ -676,7 +667,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
Expand Down Expand Up @@ -718,7 +709,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
Expand Down Expand Up @@ -761,7 +752,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
Expand Down Expand Up @@ -792,7 +783,7 @@ async fn check_caps(
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
Expand Down
15 changes: 11 additions & 4 deletions lib/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,22 @@ pub enum ProcessIdParseError {

impl std::fmt::Display for ProcessIdParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_string())
write!(
f,
"{}",
match self {
ProcessIdParseError::TooManyColons => "Too many colons",
ProcessIdParseError::MissingField => "Missing field",
}
)
}
}

impl std::error::Error for ProcessIdParseError {
fn description(&self) -> &str {
match self {
ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string",
ProcessIdParseError::MissingField => "Missing field in ProcessId string",
ProcessIdParseError::TooManyColons => "Too many colons",
ProcessIdParseError::MissingField => "Missing field",
}
}
}
Expand Down Expand Up @@ -1514,7 +1521,7 @@ pub enum VfsError {
BadBytes { action: String, path: String },
#[error("vfs: bad request error: {error}")]
BadRequest { error: String },
#[error("vfs: error parsing path: {path}, error: {error}")]
#[error("vfs: error parsing path: {path}: {error}")]
ParseError { error: String, path: String },
#[error("vfs: IO error: {error}, at path {path}")]
IOError { error: String, path: String },
Expand Down