Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: download ipfs files in trustless mode #3244

Merged
merged 11 commits into from Jul 20, 2023
60 changes: 42 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -135,6 +135,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
] } # use rustls instead of native (openSSL) tls to drop the number of build dependencies
rs-car-ipfs = "0.3"
rustyline = "12"
semver = "1.0"
serde = { version = "1.0", default-features = false, features = ["derive"] }
Expand Down
43 changes: 40 additions & 3 deletions src/utils/net.rs
Expand Up @@ -3,11 +3,15 @@

use crate::utils::io::WithProgress;
use async_compression::tokio::bufread::ZstdDecoder;
use futures::TryStreamExt;
use std::io::ErrorKind;
use cid::Cid;
use futures::{AsyncWriteExt, TryStreamExt};
use std::{io::ErrorKind, path::Path};
use tap::Pipe;
use tokio::io::{AsyncBufReadExt, AsyncRead};
use tokio_util::either::Either::{Left, Right};
use tokio_util::{
compat::TokioAsyncReadCompatExt,
either::Either::{Left, Right},
};
use tracing::info;
use url::Url;

Expand All @@ -18,6 +22,39 @@ pub fn global_http_client() -> reqwest::Client {
CLIENT.clone()
}

/// Download a file via IPFS HTTP gateway in trustless mode.
/// See <https://github.com/ipfs/specs/blob/main/http-gateways/TRUSTLESS_GATEWAY.md>
pub async fn download_ipfs_file_trustlessly(
cid: &Cid,
gateway: Option<&str>,
destination: &Path,
) -> anyhow::Result<()> {
let url = {
// https://docs.ipfs.tech/concepts/ipfs-gateway/
const DEFAULT_IPFS_GATEWAY: &str = "https://ipfs.io/ipfs/";
let mut url =
Url::parse(gateway.unwrap_or(DEFAULT_IPFS_GATEWAY))?.join(&format!("{cid}"))?;
url.set_query(Some("format=car"));
Ok::<_, anyhow::Error>(url)
}?;

let tmp =
tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))?
.into_temp_path();
{
let mut reader = reader(url.as_str()).await?.compat();
let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?);
rs_car_ipfs::single_file::read_single_file_seek(&mut reader, &mut writer, Some(cid))
.await?;
writer.flush().await?;
writer.close().await?;
}

tmp.persist(destination)?;

Ok(())
}

/// `location` may be:
/// - a path to a local file
/// - a URL to a web resource
Expand Down
53 changes: 18 additions & 35 deletions src/utils/proofs_api/paramfetch.rs
Expand Up @@ -5,18 +5,17 @@ use std::{
fs::File as SyncFile,
io::{self, copy as sync_copy, BufReader as SyncBufReader, ErrorKind},
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};

use crate::shim::sector::SectorSize;
use crate::utils::net::global_http_client;
use crate::{shim::sector::SectorSize, utils::net::download_ipfs_file_trustlessly};
use ahash::HashMap;
use backoff::{future::retry, ExponentialBackoff};
use blake2b_simd::{Hash, State as Blake2b};
use futures::TryStreamExt;
use cid::Cid;
use serde::{Deserialize, Serialize};
use tap::Pipe as _;
use tokio::fs::{self, File};
use tokio::fs::{self};
use tracing::{debug, error, info, warn};

const GATEWAY: &str = "https://proofs.filecoin.io/ipfs/";
Expand Down Expand Up @@ -158,9 +157,8 @@ async fn fetch_verify_params(
info: Arc<ParameterData>,
) -> Result<(), anyhow::Error> {
let path: PathBuf = param_dir(data_dir).join(name);
let path: Arc<Path> = Arc::from(path.as_path());

match check_file(path.clone(), info.clone()).await {
match check_file(&path, &info).await {
Ok(()) => return Ok(()),
Err(e) => {
if e.kind() != ErrorKind::NotFound {
Expand All @@ -171,53 +169,38 @@ async fn fetch_verify_params(

fetch_params(&path, &info).await?;

check_file(path, info).await.map_err(|e| {
check_file(&path, &info).await.map_err(|e| {
// TODO remove invalid file
e.into()
})
}

async fn fetch_params(path: &Path, info: &ParameterData) -> Result<(), anyhow::Error> {
async fn fetch_params(path: &Path, info: &ParameterData) -> anyhow::Result<()> {
let cid = Cid::from_str(&info.cid)?;
let gw = std::env::var(GATEWAY_ENV).unwrap_or_else(|_| GATEWAY.to_owned());
info!("Fetching param file {:?} from {}", path, gw);
let url = format!("{}{}", gw, info.cid);
let result = retry(ExponentialBackoff::default(), || async {
Ok(fetch_params_inner(&url, path).await?)
Ok(download_ipfs_file_trustlessly(&cid, Some(GATEWAY), path).await?)
})
.await;
debug!("Done fetching param file {:?} from {}", path, gw);
result
}

async fn fetch_params_inner(url: impl AsRef<str>, path: &Path) -> Result<(), anyhow::Error> {
let mut dst = File::create(path).await?;

let mut src = global_http_client()
.get(url.as_ref())
.send()
.await?
.error_for_status()?
.bytes_stream()
.map_err(|reqwest_error| std::io::Error::new(ErrorKind::Other, reqwest_error))
.pipe(tokio_util::io::StreamReader::new);

tokio::io::copy(&mut src, &mut dst).await?;
Ok(())
}

async fn check_file(path: Arc<Path>, info: Arc<ParameterData>) -> Result<(), io::Error> {
async fn check_file(path: &Path, info: &ParameterData) -> Result<(), io::Error> {
if std::env::var(TRUST_PARAMS_ENV) == Ok("1".to_owned()) {
warn!("Assuming parameter files are okay. Do not use in production!");
return Ok(());
}

let cloned_path = path.clone();
let hash = tokio::task::spawn_blocking(move || -> Result<Hash, io::Error> {
let file = SyncFile::open(cloned_path.as_ref())?;
let mut reader = SyncBufReader::new(file);
let mut hasher = Blake2b::new();
sync_copy(&mut reader, &mut hasher)?;
Ok(hasher.finalize())
let hash = tokio::task::spawn_blocking({
let file = SyncFile::open(path)?;
move || -> Result<Hash, io::Error> {
let mut reader = SyncBufReader::new(file);
let mut hasher = Blake2b::new();
sync_copy(&mut reader, &mut hasher)?;
Ok(hasher.finalize())
}
})
.await??;

Expand Down