diff --git a/src/replication/bootstrap.rs b/src/replication/bootstrap.rs index b4d66f9..a4ea702 100644 --- a/src/replication/bootstrap.rs +++ b/src/replication/bootstrap.rs @@ -122,6 +122,18 @@ pub async fn check_bootstrap_drained( return false; } + // Hints capacity-rejected at the pending_verify bounds during bootstrap + // must be re-delivered by the originating source before drain can be + // claimed; otherwise we'd silently mark ourselves complete with + // outstanding work the source still owes us (codex round-2 BLOCKER). + // The set retires per-source as each source's next admission cycle + // completes with zero rejections — see `clear_capacity_rejected`. + if !state.capacity_rejected_sources.is_empty() { + let n = state.capacity_rejected_sources.len(); + debug!("Bootstrap NOT drained: {n} source(s) have outstanding capacity-rejected hints"); + return false; + } + if queues.is_bootstrap_work_empty(&state.pending_keys) { state.drained = true; info!("Bootstrap drained: all peer requests completed and work queues empty"); @@ -131,6 +143,47 @@ pub async fn check_bootstrap_drained( } } +/// Record that `source` had one or more hints capacity-rejected this cycle. +/// +/// Idempotent: tracks a set of sources, not a counter. Bootstrap cannot +/// drain while this source is in the set; cleared by +/// [`clear_capacity_rejected`] when the same source's next admission cycle +/// completes with zero rejections (i.e. the source successfully +/// re-delivered everything that previously overflowed). +pub async fn note_capacity_rejected( + bootstrap_state: &Arc>, + source: saorsa_core::identity::PeerId, +) { + let mut state = bootstrap_state.write().await; + if state.capacity_rejected_sources.insert(source) { + let n = state.capacity_rejected_sources.len(); + debug!( + "Bootstrap: source {source} now has outstanding capacity-rejected hints \ + ({n} sources outstanding)" + ); + } +} + +/// Mark `source`'s outstanding capacity rejections as cleared. +/// +/// Called whenever `source` completes an admission cycle with zero +/// capacity rejections: the source successfully re-delivered any hints +/// that previously overflowed, so its contribution to "bootstrap not +/// drained" is retired. No-op if the source had no outstanding rejections. +pub async fn clear_capacity_rejected( + bootstrap_state: &Arc>, + source: &saorsa_core::identity::PeerId, +) { + let mut state = bootstrap_state.write().await; + if state.capacity_rejected_sources.remove(source) { + let n = state.capacity_rejected_sources.len(); + debug!( + "Bootstrap: cleared outstanding capacity rejections for {source} \ + ({n} sources still outstanding)" + ); + } +} + /// Record a set of discovered keys into the bootstrap state for drain tracking. #[allow(clippy::implicit_hasher)] pub async fn track_discovered_keys( @@ -193,6 +246,7 @@ mod tests { drained: true, pending_peer_requests: 5, pending_keys: HashSet::new(), + capacity_rejected_sources: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -208,6 +262,7 @@ mod tests { drained: false, pending_peer_requests: 2, pending_keys: HashSet::new(), + capacity_rejected_sources: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -223,6 +278,7 @@ mod tests { drained: false, pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), + capacity_rejected_sources: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -237,6 +293,7 @@ mod tests { drained: false, pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), + capacity_rejected_sources: HashSet::new(), })); let mut queues = ReplicationQueues::new(); @@ -299,4 +356,54 @@ mod tests { "should saturate at zero" ); } + + /// Round-3 regression: a source that previously had capacity-rejected + /// hints must be retired from the "not yet drained" list when it + /// completes a later admission cycle with zero rejections, otherwise + /// `check_bootstrap_drained` is permanently wedged after a single + /// rejection. + #[tokio::test] + async fn capacity_rejected_clears_on_clean_cycle() { + let state = Arc::new(RwLock::new(BootstrapState::new())); + let queues = ReplicationQueues::new(); + let source = saorsa_core::identity::PeerId::from_bytes([7u8; 32]); + + // First cycle: this source overflowed, drain blocked. + note_capacity_rejected(&state, source).await; + assert!( + !check_bootstrap_drained(&state, &queues).await, + "drain must be blocked while a source has outstanding capacity rejections" + ); + + // Second cycle from the SAME source: zero rejections → clear it. + clear_capacity_rejected(&state, &source).await; + assert!( + check_bootstrap_drained(&state, &queues).await, + "drain must complete once the source's outstanding rejections are cleared" + ); + } + + /// Per-source granularity: one source's clean cycle must NOT clear a + /// different source's outstanding rejections. + #[tokio::test] + async fn capacity_rejected_is_per_source() { + let state = Arc::new(RwLock::new(BootstrapState::new())); + let queues = ReplicationQueues::new(); + let source_a = saorsa_core::identity::PeerId::from_bytes([0xAA; 32]); + let source_b = saorsa_core::identity::PeerId::from_bytes([0xBB; 32]); + + note_capacity_rejected(&state, source_a).await; + note_capacity_rejected(&state, source_b).await; + assert!(!check_bootstrap_drained(&state, &queues).await); + + // Only A clears; B still owes us re-hints. + clear_capacity_rejected(&state, &source_a).await; + assert!( + !check_bootstrap_drained(&state, &queues).await, + "B's outstanding rejections must keep drain blocked" + ); + + clear_capacity_rejected(&state, &source_b).await; + assert!(check_bootstrap_drained(&state, &queues).await); + } } diff --git a/src/replication/mod.rs b/src/replication/mod.rs index c33af49..9e4e03f 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -852,7 +852,7 @@ impl ReplicationEngine { if let Some(resp) = response { if !resp.bootstrapping { // Admit hints into verification pipeline. - let admitted_keys = admit_and_queue_hints( + let outcome = admit_and_queue_hints( &self_id, peer, &resp.replica_hints, @@ -866,9 +866,22 @@ impl ReplicationEngine { .await; // Track discovered keys for drain detection. - if !admitted_keys.is_empty() { - bootstrap::track_discovered_keys(&bootstrap_state, &admitted_keys) - .await; + if !outcome.discovered.is_empty() { + bootstrap::track_discovered_keys( + &bootstrap_state, + &outcome.discovered, + ) + .await; + } + + // Record / retire capacity rejections so the + // drain check correctly reflects whether each + // source still owes us re-hinted work after + // queue overflow. + if outcome.capacity_rejected_count > 0 { + bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await; + } else { + bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await; } } } @@ -1300,7 +1313,7 @@ async fn handle_neighbor_sync_request( } // Admit inbound hints and queue for verification. - let admitted_keys = admit_and_queue_hints( + let outcome = admit_and_queue_hints( &self_id, source, &request.replica_hints, @@ -1313,10 +1326,19 @@ async fn handle_neighbor_sync_request( ) .await; - // Track discovered keys for bootstrap drain detection so that - // hints admitted via inbound sync requests are not missed. - if is_bootstrapping && !admitted_keys.is_empty() { - bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await; + // Track discovered keys for bootstrap drain detection so that hints + // admitted via inbound sync requests are not missed. Capacity-rejected + // hints keep this source on the "not yet drained" list until its next + // sync re-admits them; a clean cycle clears the source. + if is_bootstrapping { + if !outcome.discovered.is_empty() { + bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await; + } + if outcome.capacity_rejected_count > 0 { + bootstrap::note_capacity_rejected(bootstrap_state, *source).await; + } else { + bootstrap::clear_capacity_rejected(bootstrap_state, source).await; + } } Ok(()) @@ -1717,7 +1739,7 @@ async fn handle_sync_response( let mut state = sync_state.write().await; state.clear_active_bootstrap_claim(peer); } - let admitted_keys = admit_and_queue_hints( + let outcome = admit_and_queue_hints( self_id, peer, &resp.replica_hints, @@ -1730,10 +1752,19 @@ async fn handle_sync_response( ) .await; - // Track discovered keys for bootstrap drain detection so that - // hints admitted via regular neighbor sync are not missed. - if bootstrapping && !admitted_keys.is_empty() { - bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await; + // Track discovered keys for bootstrap drain detection so that hints + // admitted via regular neighbor sync are not missed. Capacity- + // rejected hints keep this source on the "not yet drained" list + // until its next sync replays them; a clean cycle clears it. + if bootstrapping { + if !outcome.discovered.is_empty() { + bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await; + } + if outcome.capacity_rejected_count > 0 { + bootstrap::note_capacity_rejected(bootstrap_state, *peer).await; + } else { + bootstrap::clear_capacity_rejected(bootstrap_state, peer).await; + } } } } @@ -1742,6 +1773,19 @@ async fn handle_sync_response( /// /// Shared by neighbor-sync request handling, response handling, and bootstrap /// sync so that admission + queueing logic lives in one place. +#[allow(clippy::too_many_arguments)] +/// Outcome of [`admit_and_queue_hints`]. +/// +/// `capacity_rejected_count` is non-zero when one or more legitimately +/// admissible hints were dropped because `pending_verify`'s global or +/// per-source bound was hit. Callers that care about completeness +/// (bootstrap drain accounting) MUST NOT treat their work as complete while +/// this is > 0 — the source will need to re-hint after capacity frees up. +struct AdmissionOutcome { + discovered: HashSet, + capacity_rejected_count: usize, +} + #[allow(clippy::too_many_arguments)] async fn admit_and_queue_hints( self_id: &PeerId, @@ -1753,7 +1797,7 @@ async fn admit_and_queue_hints( storage: &Arc, paid_list: &Arc, queues: &Arc>, -) -> HashSet { +) -> AdmissionOutcome { let pending_keys: HashSet = { let q = queues.read().await; q.pending_keys().into_iter().collect() @@ -1772,12 +1816,13 @@ async fn admit_and_queue_hints( .await; let mut discovered = HashSet::new(); + let mut capacity_rejected_count: usize = 0; let mut q = queues.write().await; let now = Instant::now(); for key in admitted.replica_keys { if !storage.exists(&key).unwrap_or(false) { - let added = q.add_pending_verify( + let result = q.add_pending_verify( key, VerificationEntry { state: VerificationState::PendingVerify, @@ -1788,14 +1833,20 @@ async fn admit_and_queue_hints( hint_sender: *source_peer, }, ); - if added { - discovered.insert(key); + match result { + crate::replication::scheduling::AdmissionResult::Admitted => { + discovered.insert(key); + } + crate::replication::scheduling::AdmissionResult::AlreadyPresent => {} + crate::replication::scheduling::AdmissionResult::CapacityRejected => { + capacity_rejected_count += 1; + } } } } for key in admitted.paid_only_keys { - let added = q.add_pending_verify( + let result = q.add_pending_verify( key, VerificationEntry { state: VerificationState::PendingVerify, @@ -1806,12 +1857,28 @@ async fn admit_and_queue_hints( hint_sender: *source_peer, }, ); - if added { - discovered.insert(key); + match result { + crate::replication::scheduling::AdmissionResult::Admitted => { + discovered.insert(key); + } + crate::replication::scheduling::AdmissionResult::AlreadyPresent => {} + crate::replication::scheduling::AdmissionResult::CapacityRejected => { + capacity_rejected_count += 1; + } } } - discovered + if capacity_rejected_count > 0 { + debug!( + "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \ + rejected at queue capacity; source will need to re-hint after pending_verify drains" + ); + } + + AdmissionOutcome { + discovered, + capacity_rejected_count, + } } // --------------------------------------------------------------------------- @@ -1860,9 +1927,10 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { let mut q = queues.write().await; for key in &pending_keys { if paid_list.contains(key).unwrap_or(false) { - if let Some(entry) = q.get_pending_mut(key) { - entry.state = VerificationState::PaidListVerified; - match entry.pipeline { + if let Some(pipeline) = + q.set_pending_state(key, VerificationState::PaidListVerified) + { + match pipeline { HintPipeline::PaidOnly => { // Paid-only + local paid state needs one more // responsibility check outside this lock: if we @@ -1937,8 +2005,9 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { let sources = evidence.get(&key).map_or_else(Vec::new, |ev| { quorum::present_sources_for_key(&key, ev, &targets) }); - q.remove_pending(&key); if sources.is_empty() { + // Terminal failure: remove pending and report. No fetch path. + q.remove_pending(&key); warn!( "Locally paid key {} has no responding holders (possible data loss)", hex::encode(key) @@ -1946,7 +2015,10 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { terminal_keys.push(key); } else { let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes()); - q.enqueue_fetch(key, distance, sources); + // Atomic remove+enqueue: if fetch_queue is at capacity, the + // pending entry is preserved and retried next cycle (no + // silent drop of verified replica-repair work). + let _ = q.promote_pending_to_fetch(key, distance, sources); } } } @@ -2025,9 +2097,12 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { if fetch_eligible && !sources.is_empty() { let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes()); - q.remove_pending(&key); - q.enqueue_fetch(key, distance, sources); - // Not terminal — key moved to fetch queue. + // Atomic remove+enqueue: on fetch_queue capacity miss + // the pending entry is preserved so this verified key + // is retried on the next cycle (no silent drop). + let _ = q.promote_pending_to_fetch(key, distance, sources); + // Not terminal — either moved to fetch queue, or + // retained as pending until queue drains. } else if fetch_eligible && sources.is_empty() { warn!( "Verified responsible key {} has no holders (possible data loss)", diff --git a/src/replication/scheduling.rs b/src/replication/scheduling.rs index 2240b28..ce02386 100644 --- a/src/replication/scheduling.rs +++ b/src/replication/scheduling.rs @@ -10,9 +10,100 @@ use std::time::{Duration, Instant}; use crate::logging::debug; use crate::ant_protocol::XorName; -use crate::replication::types::{FetchCandidate, VerificationEntry}; +use crate::replication::types::{ + FetchCandidate, HintPipeline, VerificationEntry, VerificationState, +}; use saorsa_core::identity::PeerId; +/// Global hard upper bound on the number of keys held in `pending_verify`. +/// +/// Without a bound, a peer in the local routing table can flood +/// `NeighborSyncRequest` messages (each capped only by +/// `MAX_REPLICATION_MESSAGE_SIZE` ≈ 10 MiB, i.e. ~320k 32-byte hints per +/// message) and grow this map without limit, exhausting node memory and +/// driving a self-amplifying storm of outbound verification requests. +/// +/// `131_072` entries is far above any legitimate aggregate need while +/// bounding worst-case memory to a few tens of MiB (each `VerificationEntry` +/// is on the order of a few hundred bytes; its sub-collections are populated +/// only from close-group-sized verification evidence, never from attacker +/// hint volume). +/// +/// This global cap alone is **not** sufficient: with blind capacity-reject a +/// single malicious routing-table peer could fill the whole map with cheap +/// admission-passing junk and starve every honest peer's hints until the +/// 30-minute `evict_stale` backstop fires (and re-fill immediately after). +/// Honest-replication fairness is therefore enforced by +/// [`MAX_PENDING_VERIFY_PER_PEER`] below; this global value is only the +/// memory backstop. +pub const MAX_PENDING_VERIFY: usize = 131_072; + +/// Per-source hard cap on `pending_verify` entries attributed to a single +/// `hint_sender` peer. +/// +/// This is the actual D1 defence. Each pending entry records the peer that +/// hinted it (`VerificationEntry::hint_sender`); a single source may occupy +/// at most this many slots. A flooding peer can therefore consume only its +/// own quota — it can never deny slots to honest peers, because honest +/// sources are accounted independently. Set well above any legitimate +/// per-peer hint working set (a healthy neighbour syncs at most a few +/// thousand keys to us per cycle) yet small enough that +/// `MAX_PENDING_VERIFY / MAX_PENDING_VERIFY_PER_PEER` distinct malicious +/// peers would be required to approach the global cap. +/// +/// Residual (accepted, follow-up): with the current ratio, ~16 distinct +/// `PeerId`s that are *all* simultaneously in the victim's routing table +/// (gated by `sender_in_rt`) could still collectively reach the global +/// `MAX_PENDING_VERIFY` backstop. `hint_sender` is the cryptographically +/// authenticated connection identity (not a forgeable payload field), so +/// this requires running ~16 real Kademlia-adjacent Sybil nodes — a large +/// step up from the single-peer pre-fix attack, and the worst case degrades +/// only to the bounded memory backstop, not silent permanent starvation of +/// non-Sybil peers (each keeps its independent quota). A future hardening +/// (reserved headroom for under-quota sources, or a per-source cap that +/// scales with distinct-source pressure) is tracked as a follow-up and is +/// intentionally out of scope for this `DoS` fix. +pub const MAX_PENDING_VERIFY_PER_PEER: usize = 8_192; + +/// Hard upper bound on the number of keys held in `fetch_queue`. +/// +/// `fetch_queue` is fed only by `enqueue_fetch`, which is reached **after** a +/// key passes quorum verification in `run_verification_cycle` — attacker junk +/// keys (no real holder) fail quorum and never reach this stage, so the +/// bounded-and-fair `pending_verify` upstream is the primary protection. This +/// global cap remains as a defence-in-depth memory backstop and is dropped +/// (consistent with the existing cross-queue-dedup no-op contract of +/// `enqueue_fetch`) when full. +pub const MAX_FETCH_QUEUE: usize = 131_072; + +/// Outcome of [`ReplicationQueues::add_pending_verify`]. +/// +/// Distinguishes "the key is already being handled" from "the key was +/// silently dropped due to a queue capacity bound". Bootstrap drain +/// accounting and source-side retry logic MUST treat `CapacityRejected` as +/// outstanding work; treating it like a dedup hit was the silent-drop +/// regression introduced when the queues first became bounded. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AdmissionResult { + /// New entry inserted into `pending_verify`. + Admitted, + /// Key was already in some pipeline stage; the existing entry is left + /// in place. No retry required. + AlreadyPresent, + /// Global or per-source capacity bound rejected the entry. The caller + /// MUST treat this as work still to do (not as silently completed). + CapacityRejected, +} + +impl AdmissionResult { + /// `true` only for [`AdmissionResult::Admitted`]. Preserves call sites + /// that only want to know "did the insert happen". + #[must_use] + pub fn admitted(self) -> bool { + matches!(self, Self::Admitted) + } +} + // --------------------------------------------------------------------------- // In-flight entry // --------------------------------------------------------------------------- @@ -44,17 +135,26 @@ pub struct InFlightEntry { /// 3. **`InFlightFetch`** -- keys actively being downloaded. pub struct ReplicationQueues { /// Keys awaiting quorum result (dedup by key). - // TODO: Add capacity bound to prevent unbounded growth under network flood. - // Consider evicting farthest-distance entries when at capacity. + /// + /// Capacity-bounded by [`MAX_PENDING_VERIFY`]: admissions are rejected + /// once full, preventing unbounded growth under a network hint flood. pending_verify: HashMap, /// Presence-quorum-passed or paid-list-authorized keys waiting for fetch. - // TODO: Add capacity bound (e.g. MAX_FETCH_QUEUE_SIZE) to prevent - // unbounded growth. Reject or evict farthest-distance candidates when full. + /// + /// Capacity-bounded by [`MAX_FETCH_QUEUE`]: enqueues are dropped once + /// full, preventing unbounded growth under a network hint flood. fetch_queue: BinaryHeap, /// Keys present in `fetch_queue` for O(1) dedup. fetch_queue_keys: HashSet, /// Active downloads keyed by `XorName`. in_flight_fetch: HashMap, + /// Number of `pending_verify` entries currently attributed to each + /// `hint_sender` peer. Maintained in lockstep with `pending_verify` + /// (insert/remove/evict) so the per-peer quota + /// ([`MAX_PENDING_VERIFY_PER_PEER`]) can be enforced in O(1). An entry is + /// removed from this map when its count reaches zero so the map itself is + /// bounded by the number of distinct currently-pending sources. + pending_per_sender: HashMap, } impl Default for ReplicationQueues { @@ -72,6 +172,7 @@ impl ReplicationQueues { fetch_queue: BinaryHeap::new(), fetch_queue_keys: HashSet::new(), in_flight_fetch: HashMap::new(), + pending_per_sender: HashMap::new(), } } @@ -81,13 +182,61 @@ impl ReplicationQueues { /// Add a key to pending verification if not already present in any queue. /// - /// Returns `true` if the key was newly added (Rule 8: cross-queue dedup). - pub fn add_pending_verify(&mut self, key: XorName, entry: VerificationEntry) -> bool { + /// Returns an [`AdmissionResult`] distinguishing the three outcomes: + /// * `Admitted` — newly inserted. + /// * `AlreadyPresent` — Rule 8 cross-queue dedup (the key is already in + /// `pending_verify`, `fetch_queue`, or `in_flight_fetch`); the existing + /// entry remains and there is no work to retry. + /// * `CapacityRejected` — global or per-source bound hit; the work is + /// genuinely lost and the caller (e.g. bootstrap drain accounting, + /// source-side retry) MUST treat this as still-outstanding work, not as + /// "done". Without this distinction a bootstrap snapshot whose hints + /// are capacity-rejected would silently mark itself drained. + pub fn add_pending_verify( + &mut self, + key: XorName, + entry: VerificationEntry, + ) -> AdmissionResult { if self.contains_key(&key) { - return false; + return AdmissionResult::AlreadyPresent; + } + if self.pending_verify.len() >= MAX_PENDING_VERIFY { + debug!( + "pending_verify at global capacity ({MAX_PENDING_VERIFY}); rejecting key {}", + hex::encode(key) + ); + return AdmissionResult::CapacityRejected; + } + let sender = entry.hint_sender; + let sender_count = self.pending_per_sender.get(&sender).copied().unwrap_or(0); + if sender_count >= MAX_PENDING_VERIFY_PER_PEER { + debug!( + "peer {sender} at per-source pending cap ({MAX_PENDING_VERIFY_PER_PEER}); \ + rejecting key {} (honest peers are unaffected)", + hex::encode(key) + ); + return AdmissionResult::CapacityRejected; } self.pending_verify.insert(key, entry); - true + *self.pending_per_sender.entry(sender).or_insert(0) += 1; + AdmissionResult::Admitted + } + + /// Decrement (and prune at zero) the per-sender counter for `sender`. + /// + /// Kept private so the counter can only move in lockstep with + /// `pending_verify` mutations. The decrement uses `saturating_sub` so a + /// hypothetical future invariant break (a release without a matching + /// admission) self-heals to zero instead of panicking on `usize` + /// underflow; `debug_assert!` still surfaces such a break in test builds. + fn release_sender_slot(pending_per_sender: &mut HashMap, sender: &PeerId) { + if let Some(count) = pending_per_sender.get_mut(sender) { + debug_assert!(*count > 0, "per-sender counter underflow for {sender}"); + *count = count.saturating_sub(1); + if *count == 0 { + pending_per_sender.remove(sender); + } + } } /// Get a reference to a pending verification entry. @@ -96,14 +245,34 @@ impl ReplicationQueues { self.pending_verify.get(key) } - /// Get a mutable reference to a pending verification entry. - pub fn get_pending_mut(&mut self, key: &XorName) -> Option<&mut VerificationEntry> { - self.pending_verify.get_mut(key) + /// Advance a pending entry's verification `state`, returning the entry's + /// `pipeline` (so the caller can branch on it) when the key was found. + /// + /// Replaces a prior `get_pending_mut` which handed out `&mut VerificationEntry` + /// and relied on a doc-comment to keep callers from re-assigning + /// `hint_sender`. The per-source quota counter (`pending_per_sender`) is + /// keyed by `hint_sender` recorded at admission; re-attributing a live + /// entry to a different peer would orphan a count and silently desync + /// the quota — exactly the silent-starvation class this fix prevents. + /// Narrowing the mutation API to a single setter makes that mistake + /// impossible to commit by accident. + pub fn set_pending_state( + &mut self, + key: &XorName, + state: VerificationState, + ) -> Option { + let entry = self.pending_verify.get_mut(key)?; + entry.state = state; + Some(entry.pipeline) } /// Remove a key from pending verification. pub fn remove_pending(&mut self, key: &XorName) -> Option { - self.pending_verify.remove(key) + let removed = self.pending_verify.remove(key); + if let Some(entry) = &removed { + Self::release_sender_slot(&mut self.pending_per_sender, &entry.hint_sender); + } + removed } /// Collect all pending verification keys (for batch processing). @@ -124,14 +293,28 @@ impl ReplicationQueues { /// Enqueue a key for fetch with its distance and verified sources. /// - /// No-op if the key is already in any pipeline stage (Rule 8: cross-queue - /// dedup). - pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec) { + /// Returns `true` if the candidate was enqueued, `false` if it was + /// already present in any pipeline stage (Rule 8: cross-queue dedup) or + /// the `fetch_queue` is at [`MAX_FETCH_QUEUE`]. + /// + /// Callers that have removed the key from `pending_verify` immediately + /// before this call should prefer [`promote_pending_to_fetch`](Self::promote_pending_to_fetch), + /// which performs the move atomically and leaves the pending entry in + /// place when the fetch queue is full (so verified work is retried on + /// the next cycle instead of being silently lost). + pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec) -> bool { if self.pending_verify.contains_key(&key) || self.fetch_queue_keys.contains(&key) || self.in_flight_fetch.contains_key(&key) { - return; + return false; + } + if self.fetch_queue.len() >= MAX_FETCH_QUEUE { + debug!( + "fetch_queue at capacity ({MAX_FETCH_QUEUE}); dropping new key {}", + hex::encode(key) + ); + return false; } self.fetch_queue_keys.insert(key); self.fetch_queue.push(FetchCandidate { @@ -139,6 +322,41 @@ impl ReplicationQueues { distance, sources, }); + true + } + + /// Atomically promote a key from `pending_verify` to `fetch_queue`. + /// + /// Checks `fetch_queue` capacity FIRST, then removes the pending entry + /// and enqueues the fetch candidate. If `fetch_queue` is full, the + /// pending entry is **left in place** so the next verification cycle + /// can retry — preventing the silent-drop regression where a verified + /// key removed from `pending_verify` could be dropped by a full fetch + /// queue and lost from every stage. + /// + /// Returns `true` on successful promotion, `false` when the fetch queue + /// is at capacity (pending entry preserved). + pub fn promote_pending_to_fetch( + &mut self, + key: XorName, + distance: XorName, + sources: Vec, + ) -> bool { + if self.fetch_queue.len() >= MAX_FETCH_QUEUE { + debug!( + "fetch_queue at capacity ({MAX_FETCH_QUEUE}); leaving {} pending \ + for retry next cycle", + hex::encode(key) + ); + return false; + } + // Capacity confirmed; safe to release the pending slot and enqueue. + let _ = self.remove_pending(&key); + // enqueue_fetch returns false only on capacity or already-queued; the + // capacity check above and the just-removed pending state make this + // succeed. If a concurrent path put the key into fetch_queue/in_flight + // between, dropping the duplicate is fine. + self.enqueue_fetch(key, distance, sources) } /// Dequeue the nearest fetch candidate. @@ -240,13 +458,26 @@ impl ReplicationQueues { pub fn evict_stale(&mut self, max_age: Duration) { let now = Instant::now(); let before = self.pending_verify.len(); - self.pending_verify - .retain(|_, entry| now.duration_since(entry.created_at) < max_age); + let pending_per_sender = &mut self.pending_per_sender; + self.pending_verify.retain(|_, entry| { + let fresh = now.duration_since(entry.created_at) < max_age; + if !fresh { + Self::release_sender_slot(pending_per_sender, &entry.hint_sender); + } + fresh + }); let evicted = before.saturating_sub(self.pending_verify.len()); if evicted > 0 { debug!("Evicted {evicted} stale pending-verification entries"); } } + + /// Number of `pending_verify` entries currently attributed to `sender`. + /// Exposed for tests and observability of the per-source fairness quota. + #[must_use] + pub fn pending_count_for_sender(&self, sender: &PeerId) -> usize { + self.pending_per_sender.get(sender).copied().unwrap_or(0) + } } // --------------------------------------------------------------------------- @@ -260,7 +491,6 @@ mod tests { use std::time::{Duration, Instant}; use super::*; - use crate::replication::types::{HintPipeline, VerificationState}; /// Build a `PeerId` from a single byte (zero-padded to 32 bytes). fn peer_id_from_byte(b: u8) -> PeerId { @@ -292,7 +522,7 @@ mod tests { fn add_pending_verify_new_key_succeeds() { let mut queues = ReplicationQueues::new(); let key = xor_name_from_byte(0x01); - assert!(queues.add_pending_verify(key, test_entry(1))); + assert!(queues.add_pending_verify(key, test_entry(1)).admitted()); assert_eq!(queues.pending_count(), 1); } @@ -300,8 +530,8 @@ mod tests { fn add_pending_verify_duplicate_rejected() { let mut queues = ReplicationQueues::new(); let key = xor_name_from_byte(0x01); - assert!(queues.add_pending_verify(key, test_entry(1))); - assert!(!queues.add_pending_verify(key, test_entry(2))); + assert!(queues.add_pending_verify(key, test_entry(1)).admitted()); + assert!(!queues.add_pending_verify(key, test_entry(2)).admitted()); assert_eq!(queues.pending_count(), 1); } @@ -313,7 +543,7 @@ mod tests { queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(1)]); assert!( - !queues.add_pending_verify(key, test_entry(1)), + !queues.add_pending_verify(key, test_entry(1)).admitted(), "should reject key already in fetch queue" ); } @@ -326,7 +556,7 @@ mod tests { queues.start_fetch(key, source, vec![source]); assert!( - !queues.add_pending_verify(key, test_entry(1)), + !queues.add_pending_verify(key, test_entry(1)).admitted(), "should reject key already in-flight" ); } @@ -481,21 +711,33 @@ mod tests { let mut queues = ReplicationQueues::new(); let key = xor_name_from_byte(0x01); - // Create entry with a backdated timestamp. Use a small subtraction - // to avoid `checked_sub` returning `None` on freshly-booted CI runners. + // Go through the public `add_pending_verify` so the per-sender + // counter is correctly bumped — the entry's `hint_sender` slot must + // be released by `evict_stale` and we want to exercise that path. let mut entry = test_entry(1); + let sender = entry.hint_sender; + // Backdate via the same defensive checked_sub used elsewhere so + // freshly-booted CI clocks don't trip us up. entry.created_at = Instant::now() .checked_sub(Duration::from_secs(2)) .unwrap_or_else(Instant::now); - queues.pending_verify.insert(key, entry); + assert!(queues.add_pending_verify(key, entry).admitted()); assert_eq!(queues.pending_count(), 1); + assert_eq!(queues.pending_count_for_sender(&sender), 1); + queues.evict_stale(Duration::from_secs(1)); assert_eq!( queues.pending_count(), 0, "entry older than max_age should be evicted" ); + // Per-sender counter must be released alongside the map removal. + assert_eq!( + queues.pending_count_for_sender(&sender), + 0, + "evict_stale must release the per-sender slot" + ); } #[test] @@ -546,7 +788,7 @@ mod tests { // Step 1: Add to PendingVerify. assert!( - queues.add_pending_verify(key, test_entry(1)), + queues.add_pending_verify(key, test_entry(1)).admitted(), "first add to PendingVerify should succeed" ); assert!( @@ -569,7 +811,7 @@ mod tests { // Step 4: Attempt to re-add to PendingVerify -> should fail. assert!( - !queues.add_pending_verify(key, test_entry(4)), + !queues.add_pending_verify(key, test_entry(4)).admitted(), "key in FetchQueue should be rejected from PendingVerify" ); @@ -583,7 +825,7 @@ mod tests { // Step 6: Attempt to add to PendingVerify while in-flight -> reject. assert!( - !queues.add_pending_verify(key, test_entry(5)), + !queues.add_pending_verify(key, test_entry(5)).admitted(), "key in-flight should be rejected from PendingVerify" ); @@ -616,7 +858,7 @@ mod tests { hint_sender: peer_id_from_byte(1), }; - assert!(queues.add_pending_verify(key, entry)); + assert!(queues.add_pending_verify(key, entry).admitted()); let pending = queues.get_pending(&key).expect("should be pending"); assert_eq!( @@ -636,7 +878,7 @@ mod tests { }; assert!( - !queues.add_pending_verify(key, paid_entry), + !queues.add_pending_verify(key, paid_entry).admitted(), "duplicate key should be rejected regardless of pipeline" ); @@ -675,7 +917,7 @@ mod tests { hint_sender, }; assert!( - queues.add_pending_verify(key, entry), + queues.add_pending_verify(key, entry).admitted(), "new key should be admitted to PendingVerify" ); assert!(queues.contains_key(&key)); diff --git a/src/replication/types.rs b/src/replication/types.rs index 1d2ff64..be842e5 100644 --- a/src/replication/types.rs +++ b/src/replication/types.rs @@ -371,6 +371,17 @@ pub struct BootstrapState { /// Keys discovered during bootstrap that are still in the verification / /// fetch pipeline. pub pending_keys: HashSet, + /// Peers whose last bootstrap admission cycle had one or more hints + /// silently dropped at the `pending_verify` capacity bounds. Each entry + /// represents "this source still owes us at least one re-hinted key + /// after the queues drain". `check_bootstrap_drained` refuses to claim + /// the node fully drained while this set is non-empty: a source's + /// presence is cleared by its next admission cycle that completes with + /// zero capacity rejections (i.e. the source successfully re-delivered + /// everything that previously overflowed). Tracking per-source instead + /// of a global counter prevents one peer's rejection from being + /// "cleared" by an unrelated peer's clean cycle. + pub capacity_rejected_sources: HashSet, } impl BootstrapState { @@ -381,6 +392,7 @@ impl BootstrapState { drained: false, pending_peer_requests: 0, pending_keys: HashSet::new(), + capacity_rejected_sources: HashSet::new(), } } diff --git a/tests/poc_d1_bounded_queues.rs b/tests/poc_d1_bounded_queues.rs new file mode 100644 index 0000000..79465f0 --- /dev/null +++ b/tests/poc_d1_bounded_queues.rs @@ -0,0 +1,301 @@ +//! Proof-of-concept regression test for finding **D1** (unbounded replication +//! queues → OOM + reflective amplification, then honest-replication starvation +//! — from a single routing-table peer). +//! +//! ## The vulnerability (pre-fix) +//! +//! `ReplicationQueues::pending_verify` (`HashMap`) and `fetch_queue` +//! (`BinaryHeap`) had **no capacity bound** — the source even carried the +//! project's own `TODO`. `handle_neighbor_sync_request` documents "No +//! per-request hint count limit"; the only gate is `sender_in_rt`. A peer +//! floods `NeighborSyncRequest` messages (each capped only by +//! `MAX_REPLICATION_MESSAGE_SIZE` ≈ 10 MiB → ~320k 32-byte hints) and grows +//! these structures 1:1 → memory exhaustion + an outbound request storm. +//! +//! ## The fix (two layers) +//! +//! 1. **Global memory backstop** — `add_pending_verify` / `enqueue_fetch` +//! reject once `MAX_PENDING_VERIFY` / `MAX_FETCH_QUEUE` is reached. +//! 2. **Per-source fairness (the real D1 defence)** — each pending entry is +//! accounted to its `hint_sender`; a single peer may hold at most +//! `MAX_PENDING_VERIFY_PER_PEER` entries. A flooding peer can exhaust only +//! its own quota and can **never** deny slots to honest peers. Without +//! layer 2, a blind global cap merely converts the memory DoS into a +//! *worse* silent honest-replication starvation DoS (a single ~4 MB +//! message every <30 min permanently rejects all honest hints). +//! +//! Each test states what it would do pre-fix. The starvation test in +//! particular FAILS against a global-cap-only fix and only passes with the +//! per-source quota — it is the test that proves D1 is actually closed, not +//! merely reshaped. + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::missing_panics_doc, + clippy::cast_possible_truncation, + clippy::doc_markdown +)] + +use ant_node::replication::scheduling::{ + ReplicationQueues, MAX_FETCH_QUEUE, MAX_PENDING_VERIFY, MAX_PENDING_VERIFY_PER_PEER, +}; +use ant_node::replication::types::{HintPipeline, VerificationEntry, VerificationState}; +use saorsa_core::identity::PeerId; +use std::collections::HashSet; +use std::time::Instant; + +fn peer_id_from_byte(b: u8) -> PeerId { + let mut bytes = [0u8; 32]; + bytes[0] = b; + PeerId::from_bytes(bytes) +} + +/// Distinct 32-byte key per index (attacker can grind these freely). +fn unique_xorname(i: u32) -> [u8; 32] { + let mut x = [0u8; 32]; + x[..4].copy_from_slice(&i.to_le_bytes()); + x +} + +fn entry_from(sender: PeerId) -> VerificationEntry { + VerificationEntry { + state: VerificationState::PendingVerify, + pipeline: HintPipeline::Replica, + verified_sources: Vec::new(), + tried_sources: HashSet::new(), + created_at: Instant::now(), + hint_sender: sender, + } +} + +/// D1a — `pending_verify` is globally memory-bounded: a flood spread across +/// many distinct sources (so the per-peer quota never bites) still cannot +/// grow the map past `MAX_PENDING_VERIFY`. +#[test] +fn poc_d1_pending_verify_is_globally_bounded() { + let mut queues = ReplicationQueues::new(); + + // Spread the flood across enough sources that per-peer quota is not the + // limiter — isolating the global memory backstop. + let per_peer = MAX_PENDING_VERIFY_PER_PEER; + let mut i: u32 = 0; + let mut sender: u32 = 0; + let target = (MAX_PENDING_VERIFY as u32).saturating_add(20_000); + while i < target { + // PeerId space here is just sender index spread over 4 bytes. + let mut pid = [0u8; 32]; + pid[..4].copy_from_slice(&sender.to_le_bytes()); + let s = PeerId::from_bytes(pid); + for _ in 0..per_peer { + if i >= target { + break; + } + queues.add_pending_verify(unique_xorname(i), entry_from(s)); + i += 1; + } + sender += 1; + } + + assert!( + queues.pending_count() <= MAX_PENDING_VERIFY, + "pending_verify must never exceed MAX_PENDING_VERIFY ({MAX_PENDING_VERIFY}); got {}", + queues.pending_count() + ); + assert_eq!( + queues.pending_count(), + MAX_PENDING_VERIFY, + "global memory backstop clamps exactly at the cap" + ); +} + +/// D1b — `fetch_queue` global memory backstop holds. +#[test] +fn poc_d1_fetch_queue_is_capacity_bounded() { + let mut queues = ReplicationQueues::new(); + let sources = vec![peer_id_from_byte(0x02)]; + + let flood: u32 = (MAX_FETCH_QUEUE as u32).saturating_add(50_000); + for i in 0..flood { + let key = unique_xorname(i); + queues.enqueue_fetch(key, key, sources.clone()); + } + + assert!( + queues.fetch_queue_count() <= MAX_FETCH_QUEUE, + "fetch_queue must never exceed MAX_FETCH_QUEUE ({MAX_FETCH_QUEUE}); got {}", + queues.fetch_queue_count() + ); + assert_eq!(queues.fetch_queue_count(), MAX_FETCH_QUEUE); +} + +/// D1c — **the critical test**: a single flooding peer CANNOT starve an +/// honest peer. Pre-fix (and against a global-cap-only fix) the attacker +/// fills the whole queue and every honest hint is rejected. With per-source +/// fairness the attacker is clamped to its own quota and the honest peer's +/// hints are still admitted. +#[test] +fn poc_d1_flooding_peer_cannot_starve_honest_peer() { + let mut queues = ReplicationQueues::new(); + + let attacker = peer_id_from_byte(0xAA); + let honest = peer_id_from_byte(0xBB); + + // Attacker floods far beyond any single-peer budget. + let attacker_flood: u32 = (MAX_PENDING_VERIFY_PER_PEER as u32).saturating_add(10_000); + let mut attacker_admitted = 0usize; + for i in 0..attacker_flood { + if queues + .add_pending_verify(unique_xorname(i), entry_from(attacker)) + .admitted() + { + attacker_admitted += 1; + } + } + + // The attacker is clamped to exactly its per-source quota... + assert_eq!( + attacker_admitted, MAX_PENDING_VERIFY_PER_PEER, + "a single peer can occupy at most MAX_PENDING_VERIFY_PER_PEER slots" + ); + assert_eq!( + queues.pending_count_for_sender(&attacker), + MAX_PENDING_VERIFY_PER_PEER, + "per-source accounting matches" + ); + // ...and crucially the global map is NOT full (attacker can't monopolise). + assert!( + queues.pending_count() < MAX_PENDING_VERIFY, + "one flooding peer must not be able to fill the global queue; \ + pending_count={} cap={MAX_PENDING_VERIFY}", + queues.pending_count() + ); + + // The honest peer's hints are still admitted despite the ongoing flood. + // (Use a disjoint key range so dedup is not the reason for admission.) + let mut honest_admitted = 0usize; + for j in 0..2_000u32 { + let key = unique_xorname(10_000_000 + j); + if queues + .add_pending_verify(key, entry_from(honest)) + .admitted() + { + honest_admitted += 1; + } + } + assert_eq!( + honest_admitted, 2_000, + "every honest hint is admitted — the flooding peer cannot starve it. \ + (This assertion FAILS against a global-cap-only fix.)" + ); +} + +/// D1d — per-source counter stays consistent across remove and stale eviction +/// (so freed quota is actually reusable and there is no counter leak/desync). +#[test] +fn poc_d1_per_sender_counter_is_consistent() { + let mut queues = ReplicationQueues::new(); + let peer = peer_id_from_byte(0xCC); + + for i in 0..100u32 { + assert!(queues + .add_pending_verify(unique_xorname(i), entry_from(peer)) + .admitted()); + } + assert_eq!(queues.pending_count_for_sender(&peer), 100); + + // Removing entries frees the peer's quota. + for i in 0..40u32 { + assert!(queues.remove_pending(&unique_xorname(i)).is_some()); + } + assert_eq!( + queues.pending_count_for_sender(&peer), + 60, + "remove_pending decrements the per-source counter in lockstep" + ); + + // Stale eviction also frees quota (max_age = 0 → everything is stale). + queues.evict_stale(std::time::Duration::from_secs(0)); + assert_eq!(queues.pending_count(), 0, "all entries evicted as stale"); + assert_eq!( + queues.pending_count_for_sender(&peer), + 0, + "evict_stale releases per-source slots; the freed quota is reusable \ + and the per-sender map is pruned (no leak/desync)" + ); + + // Quota fully reusable after release. + assert!(queues + .add_pending_verify(unique_xorname(999), entry_from(peer)) + .admitted()); + assert_eq!(queues.pending_count_for_sender(&peer), 1); +} + +/// D1e — the bounds do not break legitimate small working sets or dedup. +#[test] +fn poc_d1_bound_preserves_legitimate_entries() { + let mut queues = ReplicationQueues::new(); + let peer = peer_id_from_byte(0xDD); + + for i in 0..1_000u32 { + assert!( + queues + .add_pending_verify(unique_xorname(i), entry_from(peer)) + .admitted(), + "legitimate entries well under both caps are always admitted" + ); + } + assert_eq!(queues.pending_count(), 1_000); + + // Cross-queue dedup still holds (existing key not re-admitted, no + // double-count of the per-source quota). + assert!(!queues + .add_pending_verify(unique_xorname(0), entry_from(peer)) + .admitted()); + assert_eq!( + queues.pending_count(), + 1_000, + "no spurious growth from dedup" + ); + assert_eq!( + queues.pending_count_for_sender(&peer), + 1_000, + "dedup must not double-count the per-source quota" + ); +} + +/// D1f — advancing an entry's state via the narrow `set_pending_state` +/// setter (the real pipeline path) must not desync the per-source quota +/// counter. Guards the invariant that previously rested on a doc warning on +/// the now-removed `get_pending_mut`: no public API can re-attribute a live +/// entry to a different `hint_sender`. +#[test] +fn poc_d1_set_pending_state_keeps_counter_consistent() { + let mut queues = ReplicationQueues::new(); + let peer = peer_id_from_byte(0xEE); + let key = unique_xorname(1); + + assert!(queues.add_pending_verify(key, entry_from(peer)).admitted()); + assert_eq!(queues.pending_count_for_sender(&peer), 1); + + // Exactly what run_verification_cycle does: advance the FSM state. + let pipeline = queues + .set_pending_state(&key, VerificationState::QuorumVerified) + .expect("entry must be present"); + assert_eq!(pipeline, HintPipeline::Replica, "pipeline preserved"); + + // Counter unchanged by a state mutation (it tracks membership, not state). + assert_eq!( + queues.pending_count_for_sender(&peer), + 1, + "state change must not touch the per-source counter" + ); + + // And removal still correctly releases exactly one slot. + assert!(queues.remove_pending(&key).is_some()); + assert_eq!( + queues.pending_count_for_sender(&peer), + 0, + "removal after a state mutation releases the slot exactly once" + ); +}