Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 216 additions & 18 deletions hyperdrive/packages/app-store/downloads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
//! Note: While this process coordinates file transfers, the actual chunked transfer
//! mechanism is implemented in the FT worker for improved modularity and performance.
//!
use crate::hyperware::process::downloads::{
AutoDownloadCompleteRequest, AutoDownloadError, AutoUpdateRequest, DirEntry,
DownloadCompleteRequest, DownloadError, DownloadRequest, DownloadResponse, Entry, FileEntry,
HashMismatch, LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest,
use crate::hyperware::process::{
chain::{ChainRequest, ChainResponse},
downloads::{
AutoDownloadCompleteRequest, AutoDownloadError, AutoUpdateRequest, DirEntry,
DownloadCompleteRequest, DownloadError, DownloadRequest, DownloadResponse, Entry,
FileEntry, HashMismatch, LocalDownloadRequest, RemoteDownloadRequest, RemoveFileRequest,
},
};
use ft_worker_lib::{spawn_receive_transfer, spawn_send_transfer};
use hyperware::process::downloads::AutoDownloadSuccess;
Expand Down Expand Up @@ -72,6 +75,7 @@ wit_bindgen::generate!({
mod ft_worker_lib;

pub const VFS_TIMEOUT: u64 = 5; // 5s
pub const CHAIN_TIMEOUT: u64 = 60; // 60s

#[derive(Debug, Serialize, Deserialize, process_macros::SerdeJsonInto)]
#[serde(untagged)] // untagged as a meta-type for all incoming responses
Expand All @@ -89,6 +93,15 @@ pub struct AutoUpdateStatus {

type AutoUpdates = HashMap<(PackageId, String), AutoUpdateStatus>;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ManualDownloadStatus {
mirrors_left: Vec<String>, // vec(node/url)
mirrors_failed: Vec<(String, DownloadError)>, // vec(node/url, error)
active_mirror: String, // (node/url)
}

type ManualDownloads = HashMap<(PackageId, String), ManualDownloadStatus>;

#[derive(Debug, Serialize, Deserialize)]
pub struct State {
// persisted metadata about which packages we are mirroring
Expand Down Expand Up @@ -128,6 +141,8 @@ fn init(our: Address) {

// metadata for in-flight auto-updates
let mut auto_updates: AutoUpdates = HashMap::new();
// metadata for in-flight manual downloads (used for mirror retries)
let mut manual_downloads: ManualDownloads = HashMap::new();

loop {
match await_message() {
Expand All @@ -139,6 +154,7 @@ fn init(our: Address) {
&mut downloads,
// &mut tmp,
&mut auto_updates,
&mut manual_downloads,
) {
print_to_terminal(1, &format!("error handling message: {e:?}"));
}
Expand All @@ -163,6 +179,17 @@ fn init(our: Address) {
// Then remove and get metadata
if let Some(metadata) = auto_updates.remove(&key) {
try_next_mirror(metadata, key, &mut auto_updates, error);
} else if let Some(metadata) = manual_downloads.remove(&key) {
if let Err(err) =
try_next_manual_mirror(metadata, key, &mut manual_downloads, error)
{
print_to_terminal(
1,
&format!(
"downloads: failed manual mirror retry on send error: {err:?}"
),
);
}
}
}
}
Expand All @@ -182,6 +209,7 @@ fn handle_message(
downloads: &mut Directory,
// _tmp: &mut Directory,
auto_updates: &mut AutoUpdates,
manual_downloads: &mut ManualDownloads,
) -> anyhow::Result<()> {
if message.is_request() {
match message.body().try_into()? {
Expand All @@ -208,6 +236,18 @@ fn handle_message(
desired_version_hash,
} = download_request.clone();

let key = (
package_id.clone().to_process_lib(),
desired_version_hash.clone(),
);

if !download_from.starts_with("http")
&& !auto_updates.contains_key(&key)
&& !manual_downloads.contains_key(&key)
{
build_manual_mirror_status(&download_request, manual_downloads)?;
}

if download_from.starts_with("http") {
// use http-client to GET it
Request::to(("our", "http-client", "distro", "sys"))
Expand Down Expand Up @@ -310,6 +350,16 @@ fn handle_message(
DownloadError::InvalidManifest,
);
}
return Ok(());
}

if let Some(err) = req.err {
if let Some(metadata) = manual_downloads.remove(&key) {
try_next_manual_mirror(metadata, key, manual_downloads, err)?;
return Ok(());
}
} else {
manual_downloads.remove(&key);
}
}
DownloadRequest::GetFiles(maybe_id) => {
Expand Down Expand Up @@ -534,26 +584,31 @@ fn handle_message(

if let Some(context) = message.context() {
let download_request = serde_json::from_slice::<LocalDownloadRequest>(context)?;
let key = (
download_request.package_id.clone().to_process_lib(),
download_request.desired_version_hash.clone(),
);
match download_response {
DownloadResponse::Err(e) => {
print_to_terminal(1, &format!("downloads: got error response: {e:?}"));
let key = (
download_request.package_id.clone().to_process_lib(),
download_request.desired_version_hash.clone(),
);

if let Some(metadata) = auto_updates.remove(&key) {
try_next_mirror(metadata, key, auto_updates, e);
} else {
// If not an auto-update, forward error normally
Request::to(("our", "main", "app-store", "sys"))
.body(DownloadCompleteRequest {
package_id: download_request.package_id,
version_hash: download_request.desired_version_hash,
err: Some(e),
})
.send()?;
return Ok(());
}

if let Some(metadata) = manual_downloads.remove(&key) {
try_next_manual_mirror(metadata, key, manual_downloads, e)?;
return Ok(());
}

// If not handled by retry logic, forward error normally
Request::to(("our", "main", "app-store", "sys"))
.body(DownloadCompleteRequest {
package_id: download_request.package_id,
version_hash: download_request.desired_version_hash,
err: Some(e),
})
.send()?;
}
DownloadResponse::Success => {
// todo: maybe we do something here.
Expand Down Expand Up @@ -646,6 +701,149 @@ fn handle_message(
Ok(())
}

fn build_manual_mirror_status(
download_request: &LocalDownloadRequest,
manual_downloads: &mut ManualDownloads,
) -> anyhow::Result<()> {
let key = (
download_request.package_id.clone().to_process_lib(),
download_request.desired_version_hash.clone(),
);

if manual_downloads.contains_key(&key) {
return Ok(());
}

let mut mirror_candidates = match fetch_mirror_candidates(&key.0) {
Ok(candidates) => candidates,
Err(err) => {
print_to_terminal(
1,
&format!("downloads: failed to fetch mirrors for manual download: {err:?}"),
);
Vec::new()
}
};

// ensure requested mirror is first
match mirror_candidates
.iter()
.position(|m| m == &download_request.download_from)
{
Some(idx) => {
let requested = mirror_candidates.remove(idx);
mirror_candidates.insert(0, requested);
}
None => mirror_candidates.insert(0, download_request.download_from.clone()),
}

if mirror_candidates.len() <= 1 {
return Ok(());
}

manual_downloads.insert(
key,
ManualDownloadStatus {
mirrors_left: mirror_candidates[1..].to_vec(),
mirrors_failed: Vec::new(),
active_mirror: mirror_candidates[0].clone(),
},
);

Ok(())
}

fn fetch_mirror_candidates(package_id: &PackageId) -> anyhow::Result<Vec<String>> {
let resp = Request::to(("our", "chain", "app-store", "sys"))
.body(serde_json::to_vec(&ChainRequest::GetApp(
crate::hyperware::process::main::PackageId::from_process_lib(package_id.clone()),
))?)
.send_and_await_response(CHAIN_TIMEOUT)??;

let msg = serde_json::from_slice::<ChainResponse>(resp.body())?;

if let ChainResponse::GetApp(Some(app)) = msg {
if let Some(metadata) = app.metadata {
let mut seen = HashSet::new();
let mut mirror_candidates: Vec<String> = Vec::new();

if !metadata.properties.publisher.is_empty()
&& seen.insert(metadata.properties.publisher.clone())
{
mirror_candidates.push(metadata.properties.publisher);
}

for mirror in metadata.properties.mirrors {
if mirror.is_empty() {
continue;
}
if seen.insert(mirror.clone()) {
mirror_candidates.push(mirror);
}
}

return Ok(mirror_candidates);
}
}

Ok(Vec::new())
}

fn try_next_manual_mirror(
mut metadata: ManualDownloadStatus,
key: (PackageId, String),
manual_downloads: &mut ManualDownloads,
error: DownloadError,
) -> anyhow::Result<()> {
print_to_terminal(
0,
&format!(
"manual_download: got error from mirror {mirror:?} {error:?}, trying next mirror: {next_mirror:?}",
mirror = metadata.active_mirror,
error = error,
next_mirror = metadata.mirrors_left.iter().next().cloned(),
),
);

let (package_id, version_hash) = key.clone();

match metadata.mirrors_left.first().cloned() {
Some(next_mirror) => {
metadata
.mirrors_failed
.push((metadata.active_mirror.clone(), error));
metadata.mirrors_left.remove(0);
metadata.active_mirror = next_mirror.clone();
manual_downloads.insert(key, metadata);

Request::to(("our", "downloads", "app-store", "sys"))
.body(serde_json::to_vec(&DownloadRequest::LocalDownload(
LocalDownloadRequest {
package_id: crate::hyperware::process::main::PackageId::from_process_lib(
package_id,
),
download_from: next_mirror.clone(),
desired_version_hash: version_hash,
},
))?)
.send()?;
}
None => {
Request::to(("our", "main", "app-store", "sys"))
.body(DownloadCompleteRequest {
package_id: crate::hyperware::process::main::PackageId::from_process_lib(
package_id,
),
version_hash,
err: Some(error),
})
.send()?;
}
}

Ok(())
}

/// Try the next available mirror for a download, recording the current mirror's failure
fn try_next_mirror(
mut metadata: AutoUpdateStatus,
Expand Down