Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 79 additions & 37 deletions kinode/packages/app_store/app_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ft_worker_lib::{
};
use kinode_process_lib::{
await_message, call_init, eth, get_blob, http, kimap, println, vfs, Address, LazyLoadBlob,
Message, NodeId, PackageId, Request, Response,
Message, PackageId, Request, Response,
};
use serde::{Deserialize, Serialize};
use state::{AppStoreLogError, PackageState, RequestedPackage, State};
Expand Down Expand Up @@ -79,6 +79,7 @@ pub enum Resp {
LocalResponse(LocalResponse),
RemoteResponse(RemoteResponse),
FTWorkerResult(FTWorkerResult),
HttpClient(Result<http::HttpClientResponse, http::HttpClientError>),
}

call_init!(init);
Expand Down Expand Up @@ -171,8 +172,28 @@ fn handle_message(state: &mut State, message: &Message) -> anyhow::Result<()> {
}
}
} else {
// the only kind of response we care to handle here!
handle_ft_worker_result(message.body(), message.context().unwrap_or(&vec![]))?;
match serde_json::from_slice::<Resp>(message.body())? {
Resp::HttpClient(resp) => {
let name = match message.context() {
Some(context) => std::str::from_utf8(context).unwrap_or_default(),
None => return Err(anyhow::anyhow!("http_client response without context")),
};
if let Ok(http::HttpClientResponse::Http(http::HttpResponse { status, .. })) = resp
{
if status == 200 {
handle_receive_download(state, &name)?;
}
} else {
println!("got http_client error: {resp:?}");
}
}
Resp::FTWorkerResult(ft_worker_result) => {
handle_ft_worker_result(ft_worker_result, message.context().unwrap_or(&vec![]))?;
}
Resp::LocalResponse(_) | Resp::RemoteResponse(_) => {
// don't need to handle these at the moment
}
}
}
Ok(())
}
Expand Down Expand Up @@ -370,10 +391,11 @@ pub fn rebuild_index(state: &mut State) -> LocalResponse {
LocalResponse::RebuildIndexResponse(RebuildIndexResponse::Success)
}

/// `from`: the node OR url to download from
pub fn start_download(
state: &mut State,
package_id: PackageId,
from: NodeId,
from: String,
mirror: bool,
auto_update: bool,
desired_version_hash: Option<String>,
Expand All @@ -382,24 +404,46 @@ pub fn start_download(
package_id: crate::kinode::process::main::PackageId::from_process_lib(package_id.clone()),
desired_version_hash: desired_version_hash.clone(),
};
if let Ok(Ok(Message::Response { body, .. })) =
Request::to((from.as_str(), state.our.process.clone()))
.body(serde_json::to_vec(&RemoteRequest::Download(download_request)).unwrap())
.send_and_await_response(VFS_TIMEOUT)
{
if let Ok(Resp::RemoteResponse(RemoteResponse::DownloadApproved)) =
serde_json::from_slice::<Resp>(&body)
// if `from` is a node, send a request to it
// but if it is a url, use http_client to GET it
if from.starts_with("http") {
// use http_client to GET it
Request::to(("our", "http_client", "distro", "sys"))
.body(
serde_json::to_vec(&http::HttpClientAction::Http(http::OutgoingHttpRequest {
method: "GET".to_string(),
version: None,
url: from.clone(),
headers: std::collections::HashMap::new(),
}))
.unwrap(),
)
.context(package_id.to_string().as_bytes())
.expects_response(60)
.send()
.unwrap();

return DownloadResponse::Started;
} else {
if let Ok(Ok(Message::Response { body, .. })) =
Request::to((from.as_str(), state.our.process.clone()))
.body(serde_json::to_vec(&RemoteRequest::Download(download_request)).unwrap())
.send_and_await_response(VFS_TIMEOUT)
{
state.requested_packages.insert(
package_id,
RequestedPackage {
from,
mirror,
auto_update,
desired_version_hash,
},
);
return DownloadResponse::Started;
if let Ok(Resp::RemoteResponse(RemoteResponse::DownloadApproved)) =
serde_json::from_slice::<Resp>(&body)
{
state.requested_packages.insert(
package_id,
RequestedPackage {
from,
mirror,
auto_update,
desired_version_hash,
},
);
return DownloadResponse::Started;
}
}
}
DownloadResponse::BadResponse
Expand Down Expand Up @@ -520,23 +564,21 @@ fn handle_receive_download_package(
Ok(())
}

fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> {
if let Ok(Resp::FTWorkerResult(ft_worker_result)) = serde_json::from_slice::<Resp>(body) {
let context = serde_json::from_slice::<FileTransferContext>(context)?;
if let FTWorkerResult::SendSuccess = ft_worker_result {
println!(
"successfully shared {} in {:.4}s",
context.file_name,
std::time::SystemTime::now()
.duration_since(context.start_time)
.unwrap()
.as_secs_f64(),
);
} else {
return Err(anyhow::anyhow!("failed to share app"));
}
fn handle_ft_worker_result(ft_worker_result: FTWorkerResult, context: &[u8]) -> anyhow::Result<()> {
let context = serde_json::from_slice::<FileTransferContext>(context)?;
if let FTWorkerResult::SendSuccess = ft_worker_result {
println!(
"successfully shared {} in {:.4}s",
context.file_name,
std::time::SystemTime::now()
.duration_since(context.start_time)
.unwrap()
.as_secs_f64(),
);
Ok(())
} else {
Err(anyhow::anyhow!("failed to share app"))
}
Ok(())
}

fn handle_eth_sub_event(
Expand Down
4 changes: 2 additions & 2 deletions kinode/packages/app_store/download/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::kinode::process::main::{DownloadResponse, LocalRequest, LocalResponse};
use kinode::process::main::DownloadRequest;
use kinode_process_lib::{
await_next_message_body, call_init, println, Address, Message, NodeId, PackageId, Request,
await_next_message_body, call_init, println, Address, Message, PackageId, Request,
};

wit_bindgen::generate!({
Expand All @@ -26,7 +26,7 @@ fn init(our: Address) {
return;
};

let download_from: NodeId = arg1.to_string();
let download_from: String = arg1.to_string();

let Ok(package_id) = arg2.parse::<PackageId>() else {
println!("download: invalid package id, make sure to include package name and publisher");
Expand Down