Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ name = "encrypted_upload_test"
path = "examples/encrypted_upload_test.rs"

[workspace.package]
version = "0.4.9"
version = "0.5.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/functionland/fula-api"
Expand Down
8 changes: 8 additions & 0 deletions crates/fula-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ clap = { workspace = true }
# Storage
dashmap = { workspace = true }
parking_lot = { workspace = true }
# W.9.6 — durable pin queue. Same workspace dep that backs
# fula-client's BlockCache, so we get a known-good crash-safety story
# without inventing a new persistence layer.
redb = { workspace = true }
# Encoding for pin-queue records (pin_queue.rs). Same workspace dep
# fula-crypto already uses for HAMT wire types — deterministic,
# compact, and stable across Rust versions.
postcard = { workspace = true }

# HTTP client (for balance check API)
reqwest = { workspace = true }
Expand Down
14 changes: 14 additions & 0 deletions crates/fula-cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ pub struct GatewayConfig {
/// LRU block cache capacity in MB. 0 disables the cache.
#[serde(default = "default_block_cache_mb")]
pub block_cache_mb: usize,
/// W.9.6 — durable pin queue file path. When `Some`, every PUT
/// enqueues its master-cluster + user-external pin requests to
/// this redb-backed queue and a background drainer dispatches
/// them with bounded concurrency + exp backoff retry. Survives
/// master crashes — pending pins resume on the next startup.
///
/// When `None` (default for tests / minimal configs), the PUT
/// handler falls back to the legacy fire-and-forget pin path
/// (no retry, no crash safety). Production deploys MUST set
/// this; the fallback exists only to keep unit tests + dev
/// deployments lightweight.
#[serde(default)]
pub pin_queue_path: Option<String>,
}

fn default_block_cache_mb() -> usize {
Expand Down Expand Up @@ -73,6 +86,7 @@ impl Default for GatewayConfig {
admin_jwt_secret: None,
admin_api_enabled: false,
block_cache_mb: default_block_cache_mb(),
pin_queue_path: Some("/var/lib/fula-gateway/pin_queue.redb".to_string()),
}
}
}
Expand Down
74 changes: 60 additions & 14 deletions crates/fula-cli/src/handlers/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,25 +1070,71 @@ async fn sweep_one_bucket(
}
};

// Pin the new root locally (best-effort, fire-and-forget — same
// pattern as the put_object handler). No user JWT needed for the
// local kubo pin; the cluster will replicate via its existing
// pin-follower discipline. The OLD root is left as-is and becomes
// unreferenced once the registry persists; cluster GC reaps it
// eventually. We deliberately do NOT actively unpin the old root
// — if the registry persist fails, we'd want the old root still
// available for recovery.
// #65 — pin the new root through the durable queue (W.9.6
// pattern). Replaces the prior fire-and-forget `tokio::spawn`,
// which silently lost pins on master crash OR on operator
// cancel/restart of a slow sweep — the cancel/restart pattern is
// the load-bearing improvement here, not just full-crash
// durability. Mirror's object.rs:421-460's bucket-root path.
//
// Why `bearer_token: None`: admin sweep doesn't carry a user JWT
// (the comment block at line ~907 documents this). The drainer's
// dispatch path (`pin_drainer.rs:372`) reads `bearer_token` as
// `unwrap_or("")` and the empty-string short-circuit in
// `IpfsPinningBlockStore::pin_cid_with_token` (ipfs_pinning.rs:264)
// falls back to local-kubo `pin_cid` — byte-equivalent to today's
// `block_store.pin(...)` call. A single `warn!("Empty token ...")`
// log fires per dispatch; bounded by the sweep's bucket count.
//
// The OLD root is still left as-is (unreferenced; cluster GC
// reaps eventually). Active unpin would conflict with the
// recovery story if registry persist fails.
{
let block_store = Arc::clone(&state.block_store);
let pin_name = format!("bucket:{}", bucket_name);
let cid = new_root_cid;
tokio::spawn(async move {
if let Err(e) = block_store.pin(&cid, Some(&pin_name)).await {
warn!(cid = %cid, error = %e, "PII sweep: failed to pin new bucket root");
if let Some(queue) = state.pin_queue.as_ref() {
if let Err(e) = queue.enqueue(crate::pin_queue::PinRequest {
cid,
target: crate::pin_queue::PinTarget::MasterCluster,
kind: crate::pin_queue::PinKind::Add,
pin_name: Some(pin_name.clone()),
bearer_token: None,
pinning_endpoint: None,
}) {
// redb commit failed — fall back to fire-and-forget
// for this single record so the sweep doesn't fail
// hard. Operator alert for persistent failures.
warn!(
cid = %cid,
error = %e,
"PII sweep: pin_queue enqueue failed; falling back to fire-and-forget for this bucket root"
);
let block_store = Arc::clone(&state.block_store);
let pin_name_clone = pin_name.clone();
tokio::spawn(async move {
if let Err(e) = block_store.pin(&cid, Some(&pin_name_clone)).await {
warn!(cid = %cid, error = %e, "PII sweep: failed to pin new bucket root (queue-fallback path)");
}
});
} else {
info!(cid = %cid, bucket = %pin_name, "PII sweep: new bucket root pinned");
info!(
cid = %cid,
bucket = %pin_name,
"PII sweep: new bucket root enqueued for durable pin (#65)"
);
}
});
} else {
// Legacy fire-and-forget — no queue configured (tests +
// minimal dev configs only; production sets `pin_queue_path`).
let block_store = Arc::clone(&state.block_store);
tokio::spawn(async move {
if let Err(e) = block_store.pin(&cid, Some(&pin_name)).await {
warn!(cid = %cid, error = %e, "PII sweep: failed to pin new bucket root");
} else {
info!(cid = %cid, bucket = %pin_name, "PII sweep: new bucket root pinned");
}
});
}
}

report.buckets_rewritten += 1;
Expand Down
7 changes: 7 additions & 0 deletions crates/fula-cli/src/handlers/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ mod tests {
multipart_manager: Arc::new(crate::multipart::MultipartManager::new(60)),
lock_store: crate::handlers::locks::LockStore::new(),
users_index_publisher,
// W.9.6 pin queue not exercised by users-index-publisher
// tests; leaving None routes pinning back through the
// legacy fire-and-forget path which is fine for these
// tests (they don't trigger PUTs / pinning).
pin_queue: None,
})
}

Expand Down Expand Up @@ -468,6 +473,7 @@ mod tests {
// Publisher disabled — we expect 503, not 401 (no token)
// and not 403 (S3 auth would trigger if middleware leaked).
users_index_publisher: None,
pin_queue: None,
});

let _ = state_path; // silence unused; only here to mirror prod path layout
Expand Down Expand Up @@ -515,6 +521,7 @@ mod tests {
multipart_manager: Arc::new(crate::multipart::MultipartManager::new(60)),
lock_store: crate::handlers::locks::LockStore::new(),
users_index_publisher: None,
pin_queue: None,
});

let app = crate::routes::create_router(Arc::clone(&state));
Expand Down
103 changes: 92 additions & 11 deletions crates/fula-cli/src/handlers/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,26 +242,107 @@ pub async fn complete_multipart_upload(
tracing::warn!(error = %e, "Failed to persist bucket registry after complete_multipart_upload");
}

// Pin the BUCKET ROOT CID to ensure tree structure survives GC.
// This recursively pins all tree nodes AND all referenced object data (including parts).
// NOTE: Pinning is async (fire-and-forget) to avoid blocking the response.
{
// W.9.6 — pin the BUCKET ROOT CID through the durable queue.
// Mirrors the put_object handler's enqueue path so multipart
// uploads get the same crash-safety + retry guarantees as
// single-PUTs. Without this, every large-file upload would
// bypass the queue and silently regress to v0.5 fire-and-forget
// behaviour (load-bearing W.9.6 hole).
let pin_name = format!("bucket:{}", bucket);
if let Some(queue) = state.pin_queue.as_ref() {
if let Err(e) = queue.enqueue(crate::pin_queue::PinRequest {
cid: bucket_root_cid,
target: crate::pin_queue::PinTarget::MasterCluster,
kind: crate::pin_queue::PinKind::Add,
pin_name: Some(pin_name.clone()),
bearer_token: Some(session.jwt_token.clone()),
pinning_endpoint: None,
}) {
// Mirror put_object's enqueue-failed fallback: spawn the
// pin so the user's PUT doesn't fail. Operators see the
// warn; persistent failures are an alert.
tracing::warn!(
cid = %bucket_root_cid,
error = %e,
"pin_queue enqueue (multipart bucket-root) failed; falling back to fire-and-forget"
);
let block_store = Arc::clone(&state.block_store);
let jwt_token = session.jwt_token.clone();
let pn = pin_name.clone();
tokio::spawn(async move {
if let Err(e) = block_store
.pin_with_token(&bucket_root_cid, Some(&pn), &jwt_token)
.await
{
tracing::warn!(
cid = %bucket_root_cid,
error = %e,
"Failed to pin bucket root CID (multipart queue-fallback path)"
);
}
});
}
} else {
// Legacy fire-and-forget — no queue configured.
let block_store = Arc::clone(&state.block_store);
let pin_bucket = bucket.clone();
let jwt_token = session.jwt_token.clone();
let pn = pin_name.clone();
tokio::spawn(async move {
let pin_name = format!("bucket:{}", pin_bucket);
if let Err(e) = block_store.pin_with_token(&bucket_root_cid, Some(&pin_name), &jwt_token).await {
if let Err(e) = block_store
.pin_with_token(&bucket_root_cid, Some(&pn), &jwt_token)
.await
{
tracing::warn!(cid = %bucket_root_cid, error = %e, "Failed to pin bucket root CID");
} else {
tracing::info!(cid = %bucket_root_cid, bucket = %pin_name, "Bucket root CID pinned (recursive)");
tracing::info!(cid = %bucket_root_cid, bucket = %pn, "Bucket root CID pinned (recursive)");
}
});
}

// Also pin to user's external pinning service if credentials provided
// The session JWT is used as the default token if no X-Pinning-Token header is provided
pin_for_user(&headers, &first_part_cid, Some(&key), state.config.pinning_service_endpoint.as_deref(), Some(&session.jwt_token)).await;
// W.9.6 — user external pin via queue (or legacy fallback).
// Same routing as the put_object handler: when the queue is
// configured, durable + retry; otherwise legacy fire-and-forget
// via `pin_for_user`. The same `pin_for_user_via_queue` helper
// would be cleaner; for now we mirror its inline logic to
// avoid making `multipart.rs` depend on `object.rs`'s private
// helper.
if let Some(queue) = state.pin_queue.as_ref() {
if !session.jwt_token.is_empty() {
let creds = match state.config.pinning_service_endpoint.as_deref() {
Some(ep) => {
crate::pinning::PinningCredentials::from_jwt(&headers, &session.jwt_token, ep)
}
None => crate::pinning::PinningCredentials::from_headers(&headers),
};
if let Some(creds) = creds {
let pin_name_user = Some(key.clone()).or_else(|| creds.name.clone());
if let Err(e) = queue.enqueue(crate::pin_queue::PinRequest {
cid: first_part_cid,
target: crate::pin_queue::PinTarget::UserExternal,
kind: crate::pin_queue::PinKind::Add,
pin_name: pin_name_user,
bearer_token: Some(creds.token.clone()),
pinning_endpoint: Some(creds.endpoint.clone()),
}) {
tracing::warn!(
cid = %first_part_cid,
error = %e,
"pin_queue enqueue (multipart user-external) failed"
);
}
}
}
} else {
// Legacy fire-and-forget for tests / minimal dev configs.
pin_for_user(
&headers,
&first_part_cid,
Some(&key),
state.config.pinning_service_endpoint.as_deref(),
Some(&session.jwt_token),
)
.await;
}

let location = format!("/{}/{}", bucket, key);
let xml_response = xml::complete_multipart_upload_result(
Expand Down
Loading
Loading