From 7ecc3762f31e6142b8bf98a0edc6ae2c19bae5b5 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Mon, 17 Nov 2025 04:47:00 -0800 Subject: [PATCH] app-store: robustify manual download --- .../packages/app-store/downloads/src/lib.rs | 234 ++++++++++++++++-- 1 file changed, 216 insertions(+), 18 deletions(-) diff --git a/hyperdrive/packages/app-store/downloads/src/lib.rs b/hyperdrive/packages/app-store/downloads/src/lib.rs index 76af53d67..f42e9e98a 100644 --- a/hyperdrive/packages/app-store/downloads/src/lib.rs +++ b/hyperdrive/packages/app-store/downloads/src/lib.rs @@ -40,10 +40,13 @@ //! Note: While this process coordinates file transfers, the actual chunked transfer //! mechanism is implemented in the FT worker for improved modularity and performance. //! -use crate::hyperware::process::downloads::{ - AutoDownloadCompleteRequest, AutoDownloadError, AutoUpdateRequest, DirEntry, - DownloadCompleteRequest, DownloadError, DownloadRequest, DownloadResponse, Entry, FileEntry, - HashMismatch, LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest, +use crate::hyperware::process::{ + chain::{ChainRequest, ChainResponse}, + downloads::{ + AutoDownloadCompleteRequest, AutoDownloadError, AutoUpdateRequest, DirEntry, + DownloadCompleteRequest, DownloadError, DownloadRequest, DownloadResponse, Entry, + FileEntry, HashMismatch, LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest, + }, }; use ft_worker_lib::{spawn_receive_transfer, spawn_send_transfer}; use hyperware::process::downloads::AutoDownloadSuccess; @@ -72,6 +75,7 @@ wit_bindgen::generate!({ mod ft_worker_lib; pub const VFS_TIMEOUT: u64 = 5; // 5s +pub const CHAIN_TIMEOUT: u64 = 60; // 60s #[derive(Debug, Serialize, Deserialize, process_macros::SerdeJsonInto)] #[serde(untagged)] // untagged as a meta-type for all incoming responses @@ -89,6 +93,15 @@ pub struct AutoUpdateStatus { type AutoUpdates = HashMap<(PackageId, String), AutoUpdateStatus>; +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ManualDownloadStatus { + mirrors_left: Vec, // vec(node/url) + mirrors_failed: Vec<(String, DownloadError)>, // vec(node/url, error) + active_mirror: String, // (node/url) +} + +type ManualDownloads = HashMap<(PackageId, String), ManualDownloadStatus>; + #[derive(Debug, Serialize, Deserialize)] pub struct State { // persisted metadata about which packages we are mirroring @@ -128,6 +141,8 @@ fn init(our: Address) { // metadata for in-flight auto-updates let mut auto_updates: AutoUpdates = HashMap::new(); + // metadata for in-flight manual downloads (used for mirror retries) + let mut manual_downloads: ManualDownloads = HashMap::new(); loop { match await_message() { @@ -139,6 +154,7 @@ fn init(our: Address) { &mut downloads, // &mut tmp, &mut auto_updates, + &mut manual_downloads, ) { print_to_terminal(1, &format!("error handling message: {e:?}")); } @@ -163,6 +179,17 @@ fn init(our: Address) { // Then remove and get metadata if let Some(metadata) = auto_updates.remove(&key) { try_next_mirror(metadata, key, &mut auto_updates, error); + } else if let Some(metadata) = manual_downloads.remove(&key) { + if let Err(err) = + try_next_manual_mirror(metadata, key, &mut manual_downloads, error) + { + print_to_terminal( + 1, + &format!( + "downloads: failed manual mirror retry on send error: {err:?}" + ), + ); + } } } } @@ -182,6 +209,7 @@ fn handle_message( downloads: &mut Directory, // _tmp: &mut Directory, auto_updates: &mut AutoUpdates, + manual_downloads: &mut ManualDownloads, ) -> anyhow::Result<()> { if message.is_request() { match message.body().try_into()? { @@ -208,6 +236,18 @@ fn handle_message( desired_version_hash, } = download_request.clone(); + let key = ( + package_id.clone().to_process_lib(), + desired_version_hash.clone(), + ); + + if !download_from.starts_with("http") + && !auto_updates.contains_key(&key) + && !manual_downloads.contains_key(&key) + { + build_manual_mirror_status(&download_request, manual_downloads)?; + } + if download_from.starts_with("http") { // use http-client to GET it Request::to(("our", "http-client", "distro", "sys")) @@ -310,6 +350,16 @@ fn handle_message( DownloadError::InvalidManifest, ); } + return Ok(()); + } + + if let Some(err) = req.err { + if let Some(metadata) = manual_downloads.remove(&key) { + try_next_manual_mirror(metadata, key, manual_downloads, err)?; + return Ok(()); + } + } else { + manual_downloads.remove(&key); } } DownloadRequest::GetFiles(maybe_id) => { @@ -534,26 +584,31 @@ fn handle_message( if let Some(context) = message.context() { let download_request = serde_json::from_slice::(context)?; + let key = ( + download_request.package_id.clone().to_process_lib(), + download_request.desired_version_hash.clone(), + ); match download_response { DownloadResponse::Err(e) => { print_to_terminal(1, &format!("downloads: got error response: {e:?}")); - let key = ( - download_request.package_id.clone().to_process_lib(), - download_request.desired_version_hash.clone(), - ); - if let Some(metadata) = auto_updates.remove(&key) { try_next_mirror(metadata, key, auto_updates, e); - } else { - // If not an auto-update, forward error normally - Request::to(("our", "main", "app-store", "sys")) - .body(DownloadCompleteRequest { - package_id: download_request.package_id, - version_hash: download_request.desired_version_hash, - err: Some(e), - }) - .send()?; + return Ok(()); + } + + if let Some(metadata) = manual_downloads.remove(&key) { + try_next_manual_mirror(metadata, key, manual_downloads, e)?; + return Ok(()); } + + // If not handled by retry logic, forward error normally + Request::to(("our", "main", "app-store", "sys")) + .body(DownloadCompleteRequest { + package_id: download_request.package_id, + version_hash: download_request.desired_version_hash, + err: Some(e), + }) + .send()?; } DownloadResponse::Success => { // todo: maybe we do something here. @@ -646,6 +701,149 @@ fn handle_message( Ok(()) } +fn build_manual_mirror_status( + download_request: &LocalDownloadRequest, + manual_downloads: &mut ManualDownloads, +) -> anyhow::Result<()> { + let key = ( + download_request.package_id.clone().to_process_lib(), + download_request.desired_version_hash.clone(), + ); + + if manual_downloads.contains_key(&key) { + return Ok(()); + } + + let mut mirror_candidates = match fetch_mirror_candidates(&key.0) { + Ok(candidates) => candidates, + Err(err) => { + print_to_terminal( + 1, + &format!("downloads: failed to fetch mirrors for manual download: {err:?}"), + ); + Vec::new() + } + }; + + // ensure requested mirror is first + match mirror_candidates + .iter() + .position(|m| m == &download_request.download_from) + { + Some(idx) => { + let requested = mirror_candidates.remove(idx); + mirror_candidates.insert(0, requested); + } + None => mirror_candidates.insert(0, download_request.download_from.clone()), + } + + if mirror_candidates.len() <= 1 { + return Ok(()); + } + + manual_downloads.insert( + key, + ManualDownloadStatus { + mirrors_left: mirror_candidates[1..].to_vec(), + mirrors_failed: Vec::new(), + active_mirror: mirror_candidates[0].clone(), + }, + ); + + Ok(()) +} + +fn fetch_mirror_candidates(package_id: &PackageId) -> anyhow::Result> { + let resp = Request::to(("our", "chain", "app-store", "sys")) + .body(serde_json::to_vec(&ChainRequest::GetApp( + crate::hyperware::process::main::PackageId::from_process_lib(package_id.clone()), + ))?) + .send_and_await_response(CHAIN_TIMEOUT)??; + + let msg = serde_json::from_slice::(resp.body())?; + + if let ChainResponse::GetApp(Some(app)) = msg { + if let Some(metadata) = app.metadata { + let mut seen = HashSet::new(); + let mut mirror_candidates: Vec = Vec::new(); + + if !metadata.properties.publisher.is_empty() + && seen.insert(metadata.properties.publisher.clone()) + { + mirror_candidates.push(metadata.properties.publisher); + } + + for mirror in metadata.properties.mirrors { + if mirror.is_empty() { + continue; + } + if seen.insert(mirror.clone()) { + mirror_candidates.push(mirror); + } + } + + return Ok(mirror_candidates); + } + } + + Ok(Vec::new()) +} + +fn try_next_manual_mirror( + mut metadata: ManualDownloadStatus, + key: (PackageId, String), + manual_downloads: &mut ManualDownloads, + error: DownloadError, +) -> anyhow::Result<()> { + print_to_terminal( + 0, + &format!( + "manual_download: got error from mirror {mirror:?} {error:?}, trying next mirror: {next_mirror:?}", + mirror = metadata.active_mirror, + error = error, + next_mirror = metadata.mirrors_left.iter().next().cloned(), + ), + ); + + let (package_id, version_hash) = key.clone(); + + match metadata.mirrors_left.first().cloned() { + Some(next_mirror) => { + metadata + .mirrors_failed + .push((metadata.active_mirror.clone(), error)); + metadata.mirrors_left.remove(0); + metadata.active_mirror = next_mirror.clone(); + manual_downloads.insert(key, metadata); + + Request::to(("our", "downloads", "app-store", "sys")) + .body(serde_json::to_vec(&DownloadRequest::LocalDownload( + LocalDownloadRequest { + package_id: crate::hyperware::process::main::PackageId::from_process_lib( + package_id, + ), + download_from: next_mirror.clone(), + desired_version_hash: version_hash, + }, + ))?) + .send()?; + } + None => { + Request::to(("our", "main", "app-store", "sys")) + .body(DownloadCompleteRequest { + package_id: crate::hyperware::process::main::PackageId::from_process_lib( + package_id, + ), + version_hash, + err: Some(error), + }) + .send()?; + } + } + + Ok(()) +} + /// Try the next available mirror for a download, recording the current mirror's failure fn try_next_mirror( mut metadata: AutoUpdateStatus,