Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ name = "encrypted_upload_test"
path = "examples/encrypted_upload_test.rs"

[workspace.package]
version = "0.5.0"
version = "0.5.1"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/functionland/fula-api"
Expand Down
10 changes: 9 additions & 1 deletion crates/fula-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,15 @@ impl std::fmt::Debug for Config {
.field("users_index_chain_rpc_url", &self.users_index_chain_rpc_url)
.field("users_index_anchor_address", &self.users_index_anchor_address)
.field("users_index_ipns_name", &self.users_index_ipns_name)
.field("users_index_user_key", &self.users_index_user_key)
// Per-user routing key (`BLAKE3("fula:user_id:" || sha256(email))[..16]`).
// Stable per-account, used to route the cold-start resolver to a
// specific user's bucketsIndex CBOR. Not a secret, but a persistent
// user-identity correlator — redacted to match the `access_token`
// pattern above and avoid linking log lines to a specific user.
.field(
"users_index_user_key",
&self.users_index_user_key.as_ref().map(|_| "<redacted>"),
)
.field("users_index_ipns_gateway_urls", &self.users_index_ipns_gateway_urls)
.field("users_index_ipfs_gateway_urls", &self.users_index_ipfs_gateway_urls)
.field("walkable_v8_writer_enabled", &self.walkable_v8_writer_enabled)
Expand Down
387 changes: 307 additions & 80 deletions crates/fula-client/src/encryption.rs

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions crates/fula-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,27 @@ pub enum ClientError {
context: String,
postcard_error: String,
},

/// **D6 audit fix** — a multipart upload would require more than
/// the S3 hard limit of 10,000 parts at the configured
/// `multipart_chunk_size`. Surfaced as a typed pre-condition error
/// before any HTTP traffic, so callers see a clear actionable
/// signal ("increase chunk size to N bytes") instead of an opaque
/// S3 error at part #10001.
///
/// `computed_parts` is what the upload would need at the current
/// chunk size; `max` is the S3-enforced ceiling (10,000); the
/// `suggested_chunk_size` is the smallest chunk size that fits the
/// file under the cap.
#[error(
"multipart upload requires {computed_parts} parts which exceeds the S3 limit \
of {max}; increase multipart_chunk_size to at least {suggested_chunk_size} bytes"
)]
PartCountExceeded {
computed_parts: u64,
max: u64,
suggested_chunk_size: u64,
},
}

/// **#81** — custom `From<CryptoError>` (replaces the prior `#[from]`
Expand Down
17 changes: 17 additions & 0 deletions crates/fula-client/src/gateway_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,25 @@ pub(crate) async fn fetch_one(
timeout: Duration,
) -> Result<Bytes, FetchError> {
let url = gateway.url_for(cid);
// IPIP-412: request raw IPLD bytes. Path-style gateways like
// dweb.link return HTML directory listings without this header,
// which then fail `verify_cid_against_bytes` on the caller side.
// Subdomain-style gateways (`<cid>.ipfs.<host>`) usually serve raw
// bytes by default but accept the header harmlessly. Sending
// unconditionally is the safest, most portable choice.
// Multi-value Accept: this fetcher serves both raw blocks
// (`bafkr4i...` codec 0x55) AND dag-cbor blocks (`bafyrei...` codec
// 0x71) — Phase 2.4 cold-walk uses the same gateway race for shard
// manifest pages (dag-cbor) and HAMT internal nodes (raw). A single
// `Accept: application/vnd.ipld.raw` gets 406 from gateways that
// serve dag-cbor as typed content. Including both typed forms lets
// the gateway pick the codec matching the CID.
let resp = http
.get(&url)
.header(
reqwest::header::ACCEPT,
"application/vnd.ipld.raw, application/vnd.ipld.dag-cbor, application/cbor, */*;q=0.1",
)
.timeout(timeout)
.send()
.await
Expand Down
129 changes: 121 additions & 8 deletions crates/fula-client/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ pub async fn upload_large_file(
) -> Result<String> {
let chunk_size = client.config().multipart_chunk_size as usize;
let total_size = data.len() as u64;
let total_parts = ((data.len() + chunk_size - 1) / chunk_size) as u32;

// **D6 audit fix — S3 10,000-part precondition.**
let total_parts = check_part_count_within_s3_limit(total_size, chunk_size as u64)?;

let upload = MultipartUpload::start(Arc::clone(&client), bucket, key).await?;

Expand Down Expand Up @@ -594,17 +596,37 @@ async fn complete_upload(
let status = response.status().as_u16();
let text = response.text().await.unwrap_or_default();
let err = ClientError::from_s3_xml(&text, status);
// Treat NoSuchUpload as success-equivalent: a previous call likely
// completed the upload and the response was lost; replaying here
// yields NoSuchUpload once the server has already dropped the
// in-progress upload state. Caller can proceed.
// **D2 audit fix.** Pre-fix the SDK turned `NoSuchUpload` into
// `Ok(String::new())`, treating a server rejection as a successful
// upload with an empty etag. Callers (notably the encrypted-SDK's
// forest writer) would then store the empty etag in a
// `ForestFileEntry`, treating the file as uploaded. Subsequent
// GETs return 404 because the object never actually exists on
// master — silent data loss on every multipart upload that
// happens to hit a NoSuchUpload completion. The "prior success"
// assumption was unsound: NoSuchUpload from S3 means *the
// upload state was dropped*, not *the upload completed*.
//
// Surface the failure as a typed error so the caller can
// re-initiate the multipart upload from scratch (which is the
// correct recovery; CompleteMultipartUpload is idempotent only
// on the server's own retries, not after server-side state
// eviction). The wrapping `ClientError::UploadFailed` carries
// the upload_id so operators can correlate logs.
if let ClientError::S3Error { code, .. } = &err {
if code == "NoSuchUpload" {
tracing::info!(
tracing::warn!(
%bucket, %key, %upload_id,
"multipart: CompleteMultipartUpload returned NoSuchUpload; treating as prior success"
"multipart: CompleteMultipartUpload returned NoSuchUpload — \
upload state dropped on master; surfacing as UploadFailed \
so caller re-initiates instead of recording an empty etag"
);
return Ok(String::new());
return Err(ClientError::UploadFailed(format!(
"multipart upload {} not found on master (NoSuchUpload) — \
upload state was dropped or never completed; restart upload \
from scratch instead of treating empty etag as success",
upload_id,
)));
}
}
return Err(err);
Expand Down Expand Up @@ -664,6 +686,39 @@ async fn abort_upload(
.await
}

/// **D6 audit fix — S3 10,000-part precondition check.**
///
/// S3 enforces a hard limit of 10,000 parts per multipart upload.
/// Pre-fix the SDK silently uploaded as many parts as it computed and
/// then failed at part #10001 with an opaque S3 error. For a 1 TB file
/// at the default 256 KB chunk size, the upload would need ~4 million
/// parts — way over the limit. This helper surfaces the failure as a
/// typed `PartCountExceeded` error before any HTTP traffic, with a
/// suggested chunk size that would fit under the cap.
///
/// Returns the part count as `u32` on success (always ≤ 10,000 so the
/// downcast is safe).
fn check_part_count_within_s3_limit(total_size: u64, chunk_size: u64) -> Result<u32> {
const S3_MAX_PARTS: u64 = 10_000;
if chunk_size == 0 {
return Err(ClientError::Config(
"multipart_chunk_size must be > 0".into(),
));
}
let computed_parts = (total_size + chunk_size - 1) / chunk_size;
if computed_parts > S3_MAX_PARTS {
// Smallest chunk size that fits the file into ≤ 10000 parts.
let suggested_chunk_size = (total_size + S3_MAX_PARTS - 1) / S3_MAX_PARTS;
return Err(ClientError::PartCountExceeded {
computed_parts,
max: S3_MAX_PARTS,
suggested_chunk_size,
});
}
// Safe: computed_parts ≤ 10_000 < u32::MAX.
Ok(computed_parts as u32)
}

fn extract_xml_value(xml: &str, element: &str) -> Option<String> {
let start_tag = format!("<{}>", element);
let end_tag = format!("</{}>", element);
Expand All @@ -682,6 +737,64 @@ fn extract_xml_value(xml: &str, element: &str) -> Option<String> {
mod tests {
use super::*;

// ─────────────────────────────────────────────────────────────────
// D6 audit fix: S3 10,000-part precondition.
// ─────────────────────────────────────────────────────────────────

#[test]
fn d6_part_count_check_under_limit_returns_count() {
// 1 GB / 256 KB = 4096 parts (well under 10,000).
let total = 1u64 << 30; // 1 GiB
let chunk = 256 * 1024;
let parts = check_part_count_within_s3_limit(total, chunk).expect("under limit");
assert_eq!(parts, 4096);
}

#[test]
fn d6_part_count_check_over_limit_errors_with_suggestion() {
// 1 TB / 256 KB = ~4 million parts (way over 10,000).
let total = 1u64 << 40; // 1 TiB
let chunk = 256 * 1024;
let err = check_part_count_within_s3_limit(total, chunk)
.expect_err("must reject 4M-part request");
match err {
ClientError::PartCountExceeded {
computed_parts,
max,
suggested_chunk_size,
} => {
assert_eq!(max, 10_000);
assert_eq!(computed_parts, 4_194_304);
// Suggested chunk size must fit the file in ≤ 10000 parts.
assert!(
(total + suggested_chunk_size - 1) / suggested_chunk_size <= 10_000,
"suggested chunk size {} doesn't fit {} bytes in 10000 parts",
suggested_chunk_size, total
);
}
other => panic!("expected PartCountExceeded, got: {:?}", other),
}
}

#[test]
fn d6_part_count_check_exactly_at_limit_succeeds() {
// 10,000 parts exactly = OK.
let chunk: u64 = 1024;
let total: u64 = chunk * 10_000;
let parts = check_part_count_within_s3_limit(total, chunk).expect("exactly 10000 OK");
assert_eq!(parts, 10_000);

// 10,001 parts = error.
let err = check_part_count_within_s3_limit(total + 1, chunk).expect_err("over by 1");
assert!(matches!(err, ClientError::PartCountExceeded { .. }));
}

#[test]
fn d6_part_count_check_zero_chunk_size_errors() {
let err = check_part_count_within_s3_limit(1024, 0).expect_err("chunk_size 0 invalid");
assert!(matches!(err, ClientError::Config(_)));
}

#[test]
fn complete_xml_sorts_parts_by_part_number() {
// Parts recorded out of order (e.g. parallel upload_part calls
Expand Down
Loading
Loading