Skip to content

Commit

Permalink
feat: downloader max retry and chunks tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Nov 27, 2023
1 parent adbcd24 commit 8bd5ddf
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 55 deletions.
10 changes: 5 additions & 5 deletions docs/feature_checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- [x] later, take a list of files, use File hasher to hash all files and get root hashes
- [x] Construct a subfile manifest with metainfo using YAML builder
- [x] vectorize
- [ ] May include a status endpoint for the "canonical" publisher, but recognize the endpoint may change later on
- [x] May include a status endpoint for the "canonical" publisher, but recognize the endpoint may change later on
- [x] Publish subfile to IPFS, receive a IPFS hash for the subfile
- [x] IPFS client
- [x] Connect to an IPFS gateway
Expand All @@ -37,7 +37,7 @@
- [x] Route `/` for "Ready to roll!"
- [x] Route `/operator` for operator info
- [x] Route `/status` for availability
- [ ] verification for availability
- [x] verification for availability
- [x] Route `/subfiles/id/:id` for a subfile using IPFS hash with range requests
- [x] Route `/health` for general health
- [x] Route `/version` for subfile server version
Expand Down Expand Up @@ -74,9 +74,9 @@
- [ ] Parallelize requests
- [ ] Multiple connections (HTTPS over HTTP2)
- [x] Wait for the responses (For now, assume that the response chunks correspond with the verifiable chunks)
- [x] Keeps track of the downloaded and missing pieces, continually requesting missing pieces until the complete file is obtained
- [x] Keeps track of the downloaded and missing pieces,
- [ ] continually requesting missing pieces until the complete file is obtained
- [x] Upon receiving a response, verify the chunk data in the chunk_file
- [ ] if failed, blacklist the indexer
- [ ] Once all chunks for a file has been received, verify the file in subfile (should be vacuously true)
- [x] if failed, blacklist the indexer
- [x] Once all file has been received and verified, terminate

9 changes: 9 additions & 0 deletions subfile-exchange/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ pub struct DownloaderArgs {
help = "Auth token that to query for free"
)]
pub free_query_auth_token: Option<String>,

#[arg(
long,
value_name = "MAX_RETRY",
default_value = "10",
env = "MAX_RETRY",
help = "Maximum retry for each chunk"
)]
pub max_retry: u64,
}

/// Publisher should take the files, generate subfiles, and publish to IPFS
Expand Down
141 changes: 91 additions & 50 deletions subfile-exchange/src/subfile_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
// #![deny(warnings)]
// use hyper;

// use std::env;
// use std::io::{self, Write};
// use hyper::Client;
// use hyper::header::{Connection, Range, ByteRangeSpec};

use anyhow::anyhow;
use bytes::Bytes;
use futures::{stream, StreamExt};
use http::header::{AUTHORIZATION, CONTENT_RANGE};
use rand::seq::SliceRandom;
use reqwest::Client;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::path::Path;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use tokio::sync::Mutex;

use crate::config::DownloaderArgs;
use crate::file_hasher::verify_chunk;
use crate::ipfs::IpfsClient;
use crate::publisher::FileMetaInfo;
use crate::publisher::{FileMetaInfo, SubfileManifest};
use crate::subfile_reader::{fetch_chunk_file_from_ipfs, fetch_subfile_from_ipfs};
use crate::types::Operator;

Expand All @@ -40,6 +33,9 @@ pub struct SubfileDownloader {
output_dir: String,
free_query_auth_token: Option<String>,
indexer_blocklist: Arc<StdMutex<HashSet<String>>>,
// key is the chunk file identifier (IPFS hash) and value is a HashSet of downloaded chunk indices
chunks_to_download: Arc<Mutex<HashMap<String, HashSet<u64>>>>,
chunk_max_retry: u64,
}

impl SubfileDownloader {
Expand All @@ -55,6 +51,8 @@ impl SubfileDownloader {
output_dir: args.output_dir,
free_query_auth_token: args.free_query_auth_token,
indexer_blocklist: Arc::new(StdMutex::new(HashSet::new())),
chunks_to_download: Arc::new(Mutex::new(HashMap::new())),
chunk_max_retry: args.max_retry,
}
}

Expand Down Expand Up @@ -104,6 +102,24 @@ impl SubfileDownloader {
blocklist.insert(endpoint);
}

/// Read manifest to prepare chunks download
pub async fn chunks_to_download(&self) -> Result<SubfileManifest, anyhow::Error> {
let subfile = fetch_subfile_from_ipfs(&self.ipfs_client, &self.ipfs_hash).await?;
for chunk_file in &subfile.files {
let mut chunks_to_download = self.chunks_to_download.lock().await;
let chunks_set = chunks_to_download
.entry(chunk_file.hash.clone())
.or_insert_with(HashSet::new);
let chunk_file =
fetch_chunk_file_from_ipfs(&self.ipfs_client, &chunk_file.hash).await?;
let chunk_size = chunk_file.chunk_size;
for i in 0..(chunk_file.total_bytes / chunk_size + 1) {
chunks_set.insert(i);
}
}
Ok(subfile)
}

/// Read subfile manifiest and download the individual chunk files
//TODO: update once there is payment
pub async fn download_subfile(&self) -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -174,6 +190,8 @@ impl SubfileDownloader {
let chunk_size = chunk_file.chunk_size;
let hashes = chunk_file.chunk_hashes.clone();
let mut handles = Vec::new();

//TODO: use chunks_to_download indices
for i in 0..(chunk_file.total_bytes / chunk_size + 1) {
tracing::trace!(i, "Download chunk index");
let start = i * chunk_size;
Expand All @@ -186,7 +204,7 @@ impl SubfileDownloader {
operator,
url,
chunk = i,
chunk_file = chunk_file_info.name,
chunk_file = chunk_file.file_name,
"Querying operator"
);
url
Expand All @@ -196,15 +214,17 @@ impl SubfileDownloader {
return Err(anyhow!(err_msg));
};
let url = endpoint + "/subfiles/id/" + &self.ipfs_hash;
let chunk_file_hash = chunk_file_info.hash.to_string();
let client = self.http_client.clone();
let auth_token = self.free_query_auth_token.clone();
let file_name = chunk_file_info.name.clone();
let chunk_hash = hashes[i as usize].clone();
let block_list = self.indexer_blocklist.clone();

let chunks_to_download = self.chunks_to_download.clone();
let max_retry = self.chunk_max_retry;
// Spawn a new asynchronous task for each range request
let handle = tokio::spawn(async move {
download_chunk_and_write_to_file(
match download_chunk_and_write_to_file(
&client,
&url,
auth_token,
Expand All @@ -213,21 +233,34 @@ impl SubfileDownloader {
end,
&chunk_hash,
file_clone,
max_retry,
)
.await
.map_err(|e| {
// If the download fails, add the URL to the indexer_blocklist
block_list
.lock()
.expect("Cannot access blocklist")
.insert(url);
e
})
{
Ok(r) => {
// Update downloaded status
chunks_to_download
.lock()
.await
.entry(chunk_file_hash)
.or_insert_with(HashSet::new)
.remove(&i);
Ok(r)
}
Err(e) => {
// If the download fails, add the URL to the indexer_blocklist
//TODO: with Error enum, add blocklist based on the error
block_list
.lock()
.expect("Cannot access blocklist")
.insert(url);
Err(e)
}
}
});

handles.push(handle);
}

// Wait for all tasks to complete and collect the results
let mut failed = vec![];
for handle in handles {
Expand Down Expand Up @@ -338,38 +371,46 @@ async fn download_chunk_and_write_to_file(
end: u64,
chunk_hash: &str,
file: Arc<Mutex<File>>,
max_retry: u64,
) -> Result<Arc<Mutex<File>>, anyhow::Error> {
// Make the range request to download the chunk
let data = request_chunk(
http_client,
query_endpoint,
auth_token,
file_name,
start,
end,
)
.await?;

// Verify the chunk by reading the chunk file and
if !verify_chunk(&data, chunk_hash) {
tracing::warn!(query_endpoint, "Failed to validate a chunk from indexer");
return Err(anyhow!("Terminate the download, blacklist the indexer"));
}

// Lock the file for writing
let mut file_lock = file.lock().await;

// Seek to the start position of this chunk
file_lock.seek(SeekFrom::Start(start)).unwrap();

// Write the chunk to the file
file_lock.write_all(&data).unwrap();
let mut attempts = 0;

loop {
// Make the range request to download the chunk
match request_chunk(
http_client,
query_endpoint,
auth_token.clone(),
file_name,
start,
end,
)
.await
{
Ok(data) => {
// Verify the chunk
if verify_chunk(&data, chunk_hash) {
// Lock the file for writing
let mut file_lock = file.lock().await;
file_lock.seek(SeekFrom::Start(start))?;
file_lock.write_all(&data)?;
drop(file_lock);
return Ok(file); // Successfully written the chunk
} else {
tracing::warn!(query_endpoint, "Failed to validate received chunk");
}
}
Err(e) => tracing::error!("Chunk download error: {:?}", e),
}

drop(file_lock);
attempts += 1;
if attempts >= max_retry {
return Err(anyhow!("Max retry attempts reached for chunk download"));
}

Ok(file)
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

/// Make range request for a file to the subfile server
async fn request_chunk(
http_client: &Client,
Expand Down

0 comments on commit 8bd5ddf

Please sign in to comment.