From 9ab769d956248642a0025831192798d0d326795d Mon Sep 17 00:00:00 2001 From: grumbach Date: Tue, 19 May 2026 15:43:33 +0900 Subject: [PATCH 1/4] fix(replication): bound pending_verify/fetch_queue with per-source fairness (D1) ReplicationQueues::pending_verify (HashMap) and fetch_queue (BinaryHeap) had no capacity bound (the source carried the project's own TODO). handle_neighbor_sync_request documents "No per-request hint count limit" and the only gate is sender_in_rt, so a routing-table peer could flood NeighborSyncRequest hints (each message capped only by MAX_REPLICATION_MESSAGE_SIZE, ~320k 32-byte hints) and grow these structures unboundedly: memory exhaustion plus a self-amplifying outbound verification/fetch request storm. Fix, two layers: - Global memory backstop: MAX_PENDING_VERIFY / MAX_FETCH_QUEUE (131_072 each). add_pending_verify / enqueue_fetch reject once full (callers already treat non-admission as "not added"). - Per-source fairness (the actual D1 defence): each pending entry is accounted to its hint_sender via a pending_per_sender counter kept in lockstep with pending_verify (insert/remove/evict_stale). A single source may hold at most MAX_PENDING_VERIFY_PER_PEER (8_192) entries, so a flooding peer exhausts only its own quota and can never starve honest peers, whose quotas are independent. hint_sender is the cryptographically authenticated connection identity, not a forgeable payload field, so the accounting cannot be evaded. A global cap alone would have converted the memory DoS into a worse silent honest-replication starvation DoS; the per-source quota is what actually closes D1. Counter decrement uses saturating_sub with a debug_assert; get_pending_mut documents the no-reattribution invariant. A residual (~16 in-RT Sybil PeerIds to reach the global backstop) is documented as an accepted, tracked follow-up. tests/poc_d1_bounded_queues.rs proves the attack is defeated and that the critical starvation test fails against a global-cap-only fix. 460 lib tests pass; cfd clean. --- src/replication/scheduling.rs | 164 ++++++++++++++++++- tests/poc_d1_bounded_queues.rs | 283 +++++++++++++++++++++++++++++++++ 2 files changed, 439 insertions(+), 8 deletions(-) create mode 100644 tests/poc_d1_bounded_queues.rs diff --git a/src/replication/scheduling.rs b/src/replication/scheduling.rs index 2240b286..0ad4b773 100644 --- a/src/replication/scheduling.rs +++ b/src/replication/scheduling.rs @@ -13,6 +13,67 @@ use crate::ant_protocol::XorName; use crate::replication::types::{FetchCandidate, VerificationEntry}; use saorsa_core::identity::PeerId; +/// Global hard upper bound on the number of keys held in `pending_verify`. +/// +/// Without a bound, a peer in the local routing table can flood +/// `NeighborSyncRequest` messages (each capped only by +/// `MAX_REPLICATION_MESSAGE_SIZE` ≈ 10 MiB, i.e. ~320k 32-byte hints per +/// message) and grow this map without limit, exhausting node memory and +/// driving a self-amplifying storm of outbound verification requests. +/// +/// `131_072` entries is far above any legitimate aggregate need while +/// bounding worst-case memory to a few tens of MiB (each `VerificationEntry` +/// is on the order of a few hundred bytes; its sub-collections are populated +/// only from close-group-sized verification evidence, never from attacker +/// hint volume). +/// +/// This global cap alone is **not** sufficient: with blind capacity-reject a +/// single malicious routing-table peer could fill the whole map with cheap +/// admission-passing junk and starve every honest peer's hints until the +/// 30-minute `evict_stale` backstop fires (and re-fill immediately after). +/// Honest-replication fairness is therefore enforced by +/// [`MAX_PENDING_VERIFY_PER_PEER`] below; this global value is only the +/// memory backstop. +pub const MAX_PENDING_VERIFY: usize = 131_072; + +/// Per-source hard cap on `pending_verify` entries attributed to a single +/// `hint_sender` peer. +/// +/// This is the actual D1 defence. Each pending entry records the peer that +/// hinted it (`VerificationEntry::hint_sender`); a single source may occupy +/// at most this many slots. A flooding peer can therefore consume only its +/// own quota — it can never deny slots to honest peers, because honest +/// sources are accounted independently. Set well above any legitimate +/// per-peer hint working set (a healthy neighbour syncs at most a few +/// thousand keys to us per cycle) yet small enough that +/// `MAX_PENDING_VERIFY / MAX_PENDING_VERIFY_PER_PEER` distinct malicious +/// peers would be required to approach the global cap. +/// +/// Residual (accepted, follow-up): with the current ratio, ~16 distinct +/// `PeerId`s that are *all* simultaneously in the victim's routing table +/// (gated by `sender_in_rt`) could still collectively reach the global +/// `MAX_PENDING_VERIFY` backstop. `hint_sender` is the cryptographically +/// authenticated connection identity (not a forgeable payload field), so +/// this requires running ~16 real Kademlia-adjacent Sybil nodes — a large +/// step up from the single-peer pre-fix attack, and the worst case degrades +/// only to the bounded memory backstop, not silent permanent starvation of +/// non-Sybil peers (each keeps its independent quota). A future hardening +/// (reserved headroom for under-quota sources, or a per-source cap that +/// scales with distinct-source pressure) is tracked as a follow-up and is +/// intentionally out of scope for this `DoS` fix. +pub const MAX_PENDING_VERIFY_PER_PEER: usize = 8_192; + +/// Hard upper bound on the number of keys held in `fetch_queue`. +/// +/// `fetch_queue` is fed only by `enqueue_fetch`, which is reached **after** a +/// key passes quorum verification in `run_verification_cycle` — attacker junk +/// keys (no real holder) fail quorum and never reach this stage, so the +/// bounded-and-fair `pending_verify` upstream is the primary protection. This +/// global cap remains as a defence-in-depth memory backstop and is dropped +/// (consistent with the existing cross-queue-dedup no-op contract of +/// `enqueue_fetch`) when full. +pub const MAX_FETCH_QUEUE: usize = 131_072; + // --------------------------------------------------------------------------- // In-flight entry // --------------------------------------------------------------------------- @@ -44,17 +105,26 @@ pub struct InFlightEntry { /// 3. **`InFlightFetch`** -- keys actively being downloaded. pub struct ReplicationQueues { /// Keys awaiting quorum result (dedup by key). - // TODO: Add capacity bound to prevent unbounded growth under network flood. - // Consider evicting farthest-distance entries when at capacity. + /// + /// Capacity-bounded by [`MAX_PENDING_VERIFY`]: admissions are rejected + /// once full, preventing unbounded growth under a network hint flood. pending_verify: HashMap, /// Presence-quorum-passed or paid-list-authorized keys waiting for fetch. - // TODO: Add capacity bound (e.g. MAX_FETCH_QUEUE_SIZE) to prevent - // unbounded growth. Reject or evict farthest-distance candidates when full. + /// + /// Capacity-bounded by [`MAX_FETCH_QUEUE`]: enqueues are dropped once + /// full, preventing unbounded growth under a network hint flood. fetch_queue: BinaryHeap, /// Keys present in `fetch_queue` for O(1) dedup. fetch_queue_keys: HashSet, /// Active downloads keyed by `XorName`. in_flight_fetch: HashMap, + /// Number of `pending_verify` entries currently attributed to each + /// `hint_sender` peer. Maintained in lockstep with `pending_verify` + /// (insert/remove/evict) so the per-peer quota + /// ([`MAX_PENDING_VERIFY_PER_PEER`]) can be enforced in O(1). An entry is + /// removed from this map when its count reaches zero so the map itself is + /// bounded by the number of distinct currently-pending sources. + pending_per_sender: HashMap, } impl Default for ReplicationQueues { @@ -72,6 +142,7 @@ impl ReplicationQueues { fetch_queue: BinaryHeap::new(), fetch_queue_keys: HashSet::new(), in_flight_fetch: HashMap::new(), + pending_per_sender: HashMap::new(), } } @@ -82,14 +153,57 @@ impl ReplicationQueues { /// Add a key to pending verification if not already present in any queue. /// /// Returns `true` if the key was newly added (Rule 8: cross-queue dedup). + /// + /// Returns `false` — without inserting — when either: + /// * the global [`MAX_PENDING_VERIFY`] memory backstop is reached, or + /// * the entry's `hint_sender` already holds [`MAX_PENDING_VERIFY_PER_PEER`] + /// pending entries (per-source fairness — a flooding peer can only + /// exhaust its own quota and can never starve honest peers). + /// + /// Callers already treat a `false` result as "not admitted". pub fn add_pending_verify(&mut self, key: XorName, entry: VerificationEntry) -> bool { if self.contains_key(&key) { return false; } + if self.pending_verify.len() >= MAX_PENDING_VERIFY { + debug!( + "pending_verify at global capacity ({MAX_PENDING_VERIFY}); rejecting key {}", + hex::encode(key) + ); + return false; + } + let sender = entry.hint_sender; + let sender_count = self.pending_per_sender.get(&sender).copied().unwrap_or(0); + if sender_count >= MAX_PENDING_VERIFY_PER_PEER { + debug!( + "peer {sender} at per-source pending cap ({MAX_PENDING_VERIFY_PER_PEER}); \ + rejecting key {} (honest peers are unaffected)", + hex::encode(key) + ); + return false; + } self.pending_verify.insert(key, entry); + *self.pending_per_sender.entry(sender).or_insert(0) += 1; true } + /// Decrement (and prune at zero) the per-sender counter for `sender`. + /// + /// Kept private so the counter can only move in lockstep with + /// `pending_verify` mutations. The decrement uses `saturating_sub` so a + /// hypothetical future invariant break (a release without a matching + /// admission) self-heals to zero instead of panicking on `usize` + /// underflow; `debug_assert!` still surfaces such a break in test builds. + fn release_sender_slot(pending_per_sender: &mut HashMap, sender: &PeerId) { + if let Some(count) = pending_per_sender.get_mut(sender) { + debug_assert!(*count > 0, "per-sender counter underflow for {sender}"); + *count = count.saturating_sub(1); + if *count == 0 { + pending_per_sender.remove(sender); + } + } + } + /// Get a reference to a pending verification entry. #[must_use] pub fn get_pending(&self, key: &XorName) -> Option<&VerificationEntry> { @@ -97,13 +211,25 @@ impl ReplicationQueues { } /// Get a mutable reference to a pending verification entry. + /// + /// INVARIANT: callers MUST NOT reassign `entry.hint_sender` through the + /// returned reference. The per-source quota counter + /// (`pending_per_sender`) is keyed by the `hint_sender` recorded at + /// admission; re-attributing a live entry to a different peer here would + /// orphan a count (decremented against the wrong peer on removal/eviction) + /// and silently desync the quota. Mutate only verification-progress + /// fields (e.g. `state`, `verified_sources`, `tried_sources`). pub fn get_pending_mut(&mut self, key: &XorName) -> Option<&mut VerificationEntry> { self.pending_verify.get_mut(key) } /// Remove a key from pending verification. pub fn remove_pending(&mut self, key: &XorName) -> Option { - self.pending_verify.remove(key) + let removed = self.pending_verify.remove(key); + if let Some(entry) = &removed { + Self::release_sender_slot(&mut self.pending_per_sender, &entry.hint_sender); + } + removed } /// Collect all pending verification keys (for batch processing). @@ -125,7 +251,9 @@ impl ReplicationQueues { /// Enqueue a key for fetch with its distance and verified sources. /// /// No-op if the key is already in any pipeline stage (Rule 8: cross-queue - /// dedup). + /// dedup), or if `fetch_queue` is already at [`MAX_FETCH_QUEUE`]. The + /// capacity drop bounds memory (and the outbound `FetchRequest` storm) + /// under a network hint flood. pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec) { if self.pending_verify.contains_key(&key) || self.fetch_queue_keys.contains(&key) @@ -133,6 +261,13 @@ impl ReplicationQueues { { return; } + if self.fetch_queue.len() >= MAX_FETCH_QUEUE { + debug!( + "fetch_queue at capacity ({MAX_FETCH_QUEUE}); dropping new key {}", + hex::encode(key) + ); + return; + } self.fetch_queue_keys.insert(key); self.fetch_queue.push(FetchCandidate { key, @@ -240,13 +375,26 @@ impl ReplicationQueues { pub fn evict_stale(&mut self, max_age: Duration) { let now = Instant::now(); let before = self.pending_verify.len(); - self.pending_verify - .retain(|_, entry| now.duration_since(entry.created_at) < max_age); + let pending_per_sender = &mut self.pending_per_sender; + self.pending_verify.retain(|_, entry| { + let fresh = now.duration_since(entry.created_at) < max_age; + if !fresh { + Self::release_sender_slot(pending_per_sender, &entry.hint_sender); + } + fresh + }); let evicted = before.saturating_sub(self.pending_verify.len()); if evicted > 0 { debug!("Evicted {evicted} stale pending-verification entries"); } } + + /// Number of `pending_verify` entries currently attributed to `sender`. + /// Exposed for tests and observability of the per-source fairness quota. + #[must_use] + pub fn pending_count_for_sender(&self, sender: &PeerId) -> usize { + self.pending_per_sender.get(sender).copied().unwrap_or(0) + } } // --------------------------------------------------------------------------- diff --git a/tests/poc_d1_bounded_queues.rs b/tests/poc_d1_bounded_queues.rs new file mode 100644 index 00000000..27fff25c --- /dev/null +++ b/tests/poc_d1_bounded_queues.rs @@ -0,0 +1,283 @@ +//! Proof-of-concept regression test for finding **D1** (unbounded replication +//! queues → OOM + reflective amplification, then honest-replication starvation +//! — from a single routing-table peer). +//! +//! ## The vulnerability (pre-fix) +//! +//! `ReplicationQueues::pending_verify` (`HashMap`) and `fetch_queue` +//! (`BinaryHeap`) had **no capacity bound** — the source even carried the +//! project's own `TODO`. `handle_neighbor_sync_request` documents "No +//! per-request hint count limit"; the only gate is `sender_in_rt`. A peer +//! floods `NeighborSyncRequest` messages (each capped only by +//! `MAX_REPLICATION_MESSAGE_SIZE` ≈ 10 MiB → ~320k 32-byte hints) and grows +//! these structures 1:1 → memory exhaustion + an outbound request storm. +//! +//! ## The fix (two layers) +//! +//! 1. **Global memory backstop** — `add_pending_verify` / `enqueue_fetch` +//! reject once `MAX_PENDING_VERIFY` / `MAX_FETCH_QUEUE` is reached. +//! 2. **Per-source fairness (the real D1 defence)** — each pending entry is +//! accounted to its `hint_sender`; a single peer may hold at most +//! `MAX_PENDING_VERIFY_PER_PEER` entries. A flooding peer can exhaust only +//! its own quota and can **never** deny slots to honest peers. Without +//! layer 2, a blind global cap merely converts the memory DoS into a +//! *worse* silent honest-replication starvation DoS (a single ~4 MB +//! message every <30 min permanently rejects all honest hints). +//! +//! Each test states what it would do pre-fix. The starvation test in +//! particular FAILS against a global-cap-only fix and only passes with the +//! per-source quota — it is the test that proves D1 is actually closed, not +//! merely reshaped. + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::missing_panics_doc, + clippy::cast_possible_truncation, + clippy::doc_markdown +)] + +use ant_node::replication::scheduling::{ + ReplicationQueues, MAX_FETCH_QUEUE, MAX_PENDING_VERIFY, MAX_PENDING_VERIFY_PER_PEER, +}; +use ant_node::replication::types::{HintPipeline, VerificationEntry, VerificationState}; +use saorsa_core::identity::PeerId; +use std::collections::HashSet; +use std::time::Instant; + +fn peer_id_from_byte(b: u8) -> PeerId { + let mut bytes = [0u8; 32]; + bytes[0] = b; + PeerId::from_bytes(bytes) +} + +/// Distinct 32-byte key per index (attacker can grind these freely). +fn unique_xorname(i: u32) -> [u8; 32] { + let mut x = [0u8; 32]; + x[..4].copy_from_slice(&i.to_le_bytes()); + x +} + +fn entry_from(sender: PeerId) -> VerificationEntry { + VerificationEntry { + state: VerificationState::PendingVerify, + pipeline: HintPipeline::Replica, + verified_sources: Vec::new(), + tried_sources: HashSet::new(), + created_at: Instant::now(), + hint_sender: sender, + } +} + +/// D1a — `pending_verify` is globally memory-bounded: a flood spread across +/// many distinct sources (so the per-peer quota never bites) still cannot +/// grow the map past `MAX_PENDING_VERIFY`. +#[test] +fn poc_d1_pending_verify_is_globally_bounded() { + let mut queues = ReplicationQueues::new(); + + // Spread the flood across enough sources that per-peer quota is not the + // limiter — isolating the global memory backstop. + let per_peer = MAX_PENDING_VERIFY_PER_PEER; + let mut i: u32 = 0; + let mut sender: u32 = 0; + let target = (MAX_PENDING_VERIFY as u32).saturating_add(20_000); + while i < target { + // PeerId space here is just sender index spread over 4 bytes. + let mut pid = [0u8; 32]; + pid[..4].copy_from_slice(&sender.to_le_bytes()); + let s = PeerId::from_bytes(pid); + for _ in 0..per_peer { + if i >= target { + break; + } + queues.add_pending_verify(unique_xorname(i), entry_from(s)); + i += 1; + } + sender += 1; + } + + assert!( + queues.pending_count() <= MAX_PENDING_VERIFY, + "pending_verify must never exceed MAX_PENDING_VERIFY ({MAX_PENDING_VERIFY}); got {}", + queues.pending_count() + ); + assert_eq!( + queues.pending_count(), + MAX_PENDING_VERIFY, + "global memory backstop clamps exactly at the cap" + ); +} + +/// D1b — `fetch_queue` global memory backstop holds. +#[test] +fn poc_d1_fetch_queue_is_capacity_bounded() { + let mut queues = ReplicationQueues::new(); + let sources = vec![peer_id_from_byte(0x02)]; + + let flood: u32 = (MAX_FETCH_QUEUE as u32).saturating_add(50_000); + for i in 0..flood { + let key = unique_xorname(i); + queues.enqueue_fetch(key, key, sources.clone()); + } + + assert!( + queues.fetch_queue_count() <= MAX_FETCH_QUEUE, + "fetch_queue must never exceed MAX_FETCH_QUEUE ({MAX_FETCH_QUEUE}); got {}", + queues.fetch_queue_count() + ); + assert_eq!(queues.fetch_queue_count(), MAX_FETCH_QUEUE); +} + +/// D1c — **the critical test**: a single flooding peer CANNOT starve an +/// honest peer. Pre-fix (and against a global-cap-only fix) the attacker +/// fills the whole queue and every honest hint is rejected. With per-source +/// fairness the attacker is clamped to its own quota and the honest peer's +/// hints are still admitted. +#[test] +fn poc_d1_flooding_peer_cannot_starve_honest_peer() { + let mut queues = ReplicationQueues::new(); + + let attacker = peer_id_from_byte(0xAA); + let honest = peer_id_from_byte(0xBB); + + // Attacker floods far beyond any single-peer budget. + let attacker_flood: u32 = (MAX_PENDING_VERIFY_PER_PEER as u32).saturating_add(500_000); + let mut attacker_admitted = 0usize; + for i in 0..attacker_flood { + if queues.add_pending_verify(unique_xorname(i), entry_from(attacker)) { + attacker_admitted += 1; + } + } + + // The attacker is clamped to exactly its per-source quota... + assert_eq!( + attacker_admitted, MAX_PENDING_VERIFY_PER_PEER, + "a single peer can occupy at most MAX_PENDING_VERIFY_PER_PEER slots" + ); + assert_eq!( + queues.pending_count_for_sender(&attacker), + MAX_PENDING_VERIFY_PER_PEER, + "per-source accounting matches" + ); + // ...and crucially the global map is NOT full (attacker can't monopolise). + assert!( + queues.pending_count() < MAX_PENDING_VERIFY, + "one flooding peer must not be able to fill the global queue; \ + pending_count={} cap={MAX_PENDING_VERIFY}", + queues.pending_count() + ); + + // The honest peer's hints are still admitted despite the ongoing flood. + // (Use a disjoint key range so dedup is not the reason for admission.) + let mut honest_admitted = 0usize; + for j in 0..2_000u32 { + let key = unique_xorname(10_000_000 + j); + if queues.add_pending_verify(key, entry_from(honest)) { + honest_admitted += 1; + } + } + assert_eq!( + honest_admitted, 2_000, + "every honest hint is admitted — the flooding peer cannot starve it. \ + (This assertion FAILS against a global-cap-only fix.)" + ); +} + +/// D1d — per-source counter stays consistent across remove and stale eviction +/// (so freed quota is actually reusable and there is no counter leak/desync). +#[test] +fn poc_d1_per_sender_counter_is_consistent() { + let mut queues = ReplicationQueues::new(); + let peer = peer_id_from_byte(0xCC); + + for i in 0..100u32 { + assert!(queues.add_pending_verify(unique_xorname(i), entry_from(peer))); + } + assert_eq!(queues.pending_count_for_sender(&peer), 100); + + // Removing entries frees the peer's quota. + for i in 0..40u32 { + assert!(queues.remove_pending(&unique_xorname(i)).is_some()); + } + assert_eq!( + queues.pending_count_for_sender(&peer), + 60, + "remove_pending decrements the per-source counter in lockstep" + ); + + // Stale eviction also frees quota (max_age = 0 → everything is stale). + queues.evict_stale(std::time::Duration::from_secs(0)); + assert_eq!(queues.pending_count(), 0, "all entries evicted as stale"); + assert_eq!( + queues.pending_count_for_sender(&peer), + 0, + "evict_stale releases per-source slots; the freed quota is reusable \ + and the per-sender map is pruned (no leak/desync)" + ); + + // Quota fully reusable after release. + assert!(queues.add_pending_verify(unique_xorname(999), entry_from(peer))); + assert_eq!(queues.pending_count_for_sender(&peer), 1); +} + +/// D1e — the bounds do not break legitimate small working sets or dedup. +#[test] +fn poc_d1_bound_preserves_legitimate_entries() { + let mut queues = ReplicationQueues::new(); + let peer = peer_id_from_byte(0xDD); + + for i in 0..1_000u32 { + assert!( + queues.add_pending_verify(unique_xorname(i), entry_from(peer)), + "legitimate entries well under both caps are always admitted" + ); + } + assert_eq!(queues.pending_count(), 1_000); + + // Cross-queue dedup still holds (existing key not re-admitted, no + // double-count of the per-source quota). + assert!(!queues.add_pending_verify(unique_xorname(0), entry_from(peer))); + assert_eq!( + queues.pending_count(), + 1_000, + "no spurious growth from dedup" + ); + assert_eq!( + queues.pending_count_for_sender(&peer), + 1_000, + "dedup must not double-count the per-source quota" + ); +} + +/// D1f — mutating an entry via `get_pending_mut` (the real pipeline path: +/// advancing `state`) must not desync the per-source quota counter. Guards +/// the documented `get_pending_mut` invariant. +#[test] +fn poc_d1_get_pending_mut_state_change_keeps_counter_consistent() { + let mut queues = ReplicationQueues::new(); + let peer = peer_id_from_byte(0xEE); + let key = unique_xorname(1); + + assert!(queues.add_pending_verify(key, entry_from(peer))); + assert_eq!(queues.pending_count_for_sender(&peer), 1); + + // Exactly what run_verification_cycle does: advance the FSM state. + let entry = queues.get_pending_mut(&key).expect("entry must be present"); + entry.state = VerificationState::QuorumVerified; + + // Counter unchanged by a state mutation (it tracks membership, not state). + assert_eq!( + queues.pending_count_for_sender(&peer), + 1, + "get_pending_mut state change must not touch the per-source counter" + ); + + // And removal still correctly releases exactly one slot. + assert!(queues.remove_pending(&key).is_some()); + assert_eq!( + queues.pending_count_for_sender(&peer), + 0, + "removal after a state mutation releases the slot exactly once" + ); +} From f3fd84f21415d93c04676a2b5d11f40cdc326d91 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 20 May 2026 13:49:59 +0900 Subject: [PATCH 2/4] followup(D1): replace get_pending_mut with narrow set_pending_state setter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review-wave findings on PR #99: Replace the `get_pending_mut(&XorName) -> Option<&mut VerificationEntry>` helper with `set_pending_state(&XorName, VerificationState) -> Option`. The old API exposed `&mut hint_sender` to callers and relied on a doc comment to keep them from re-assigning it; a re-attribution would have orphaned a per-source quota count and silently desynced the fairness counter — the same silent-starvation class this PR was written to prevent. The new setter writes only `state` and returns `pipeline`, making the desync impossible to commit by accident. Single production caller (run_verification_cycle in replication/mod.rs) migrated; D1f test renamed/updated to exercise the new setter; PoC flood shrunk from +500_000 to +10_000 over the cap (same security signal, faster in debug builds). --- src/replication/mod.rs | 7 ++++--- src/replication/scheduling.rs | 33 +++++++++++++++++++++------------ tests/poc_d1_bounded_queues.rs | 20 ++++++++++++-------- 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index c33af49c..afbddea9 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -1860,9 +1860,10 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { let mut q = queues.write().await; for key in &pending_keys { if paid_list.contains(key).unwrap_or(false) { - if let Some(entry) = q.get_pending_mut(key) { - entry.state = VerificationState::PaidListVerified; - match entry.pipeline { + if let Some(pipeline) = + q.set_pending_state(key, VerificationState::PaidListVerified) + { + match pipeline { HintPipeline::PaidOnly => { // Paid-only + local paid state needs one more // responsibility check outside this lock: if we diff --git a/src/replication/scheduling.rs b/src/replication/scheduling.rs index 0ad4b773..d7ec944d 100644 --- a/src/replication/scheduling.rs +++ b/src/replication/scheduling.rs @@ -10,7 +10,9 @@ use std::time::{Duration, Instant}; use crate::logging::debug; use crate::ant_protocol::XorName; -use crate::replication::types::{FetchCandidate, VerificationEntry}; +use crate::replication::types::{ + FetchCandidate, HintPipeline, VerificationEntry, VerificationState, +}; use saorsa_core::identity::PeerId; /// Global hard upper bound on the number of keys held in `pending_verify`. @@ -210,17 +212,25 @@ impl ReplicationQueues { self.pending_verify.get(key) } - /// Get a mutable reference to a pending verification entry. + /// Advance a pending entry's verification `state`, returning the entry's + /// `pipeline` (so the caller can branch on it) when the key was found. /// - /// INVARIANT: callers MUST NOT reassign `entry.hint_sender` through the - /// returned reference. The per-source quota counter - /// (`pending_per_sender`) is keyed by the `hint_sender` recorded at - /// admission; re-attributing a live entry to a different peer here would - /// orphan a count (decremented against the wrong peer on removal/eviction) - /// and silently desync the quota. Mutate only verification-progress - /// fields (e.g. `state`, `verified_sources`, `tried_sources`). - pub fn get_pending_mut(&mut self, key: &XorName) -> Option<&mut VerificationEntry> { - self.pending_verify.get_mut(key) + /// Replaces a prior `get_pending_mut` which handed out `&mut VerificationEntry` + /// and relied on a doc-comment to keep callers from re-assigning + /// `hint_sender`. The per-source quota counter (`pending_per_sender`) is + /// keyed by `hint_sender` recorded at admission; re-attributing a live + /// entry to a different peer would orphan a count and silently desync + /// the quota — exactly the silent-starvation class this fix prevents. + /// Narrowing the mutation API to a single setter makes that mistake + /// impossible to commit by accident. + pub fn set_pending_state( + &mut self, + key: &XorName, + state: VerificationState, + ) -> Option { + let entry = self.pending_verify.get_mut(key)?; + entry.state = state; + Some(entry.pipeline) } /// Remove a key from pending verification. @@ -408,7 +418,6 @@ mod tests { use std::time::{Duration, Instant}; use super::*; - use crate::replication::types::{HintPipeline, VerificationState}; /// Build a `PeerId` from a single byte (zero-padded to 32 bytes). fn peer_id_from_byte(b: u8) -> PeerId { diff --git a/tests/poc_d1_bounded_queues.rs b/tests/poc_d1_bounded_queues.rs index 27fff25c..9406785e 100644 --- a/tests/poc_d1_bounded_queues.rs +++ b/tests/poc_d1_bounded_queues.rs @@ -142,7 +142,7 @@ fn poc_d1_flooding_peer_cannot_starve_honest_peer() { let honest = peer_id_from_byte(0xBB); // Attacker floods far beyond any single-peer budget. - let attacker_flood: u32 = (MAX_PENDING_VERIFY_PER_PEER as u32).saturating_add(500_000); + let attacker_flood: u32 = (MAX_PENDING_VERIFY_PER_PEER as u32).saturating_add(10_000); let mut attacker_admitted = 0usize; for i in 0..attacker_flood { if queues.add_pending_verify(unique_xorname(i), entry_from(attacker)) { @@ -250,11 +250,13 @@ fn poc_d1_bound_preserves_legitimate_entries() { ); } -/// D1f — mutating an entry via `get_pending_mut` (the real pipeline path: -/// advancing `state`) must not desync the per-source quota counter. Guards -/// the documented `get_pending_mut` invariant. +/// D1f — advancing an entry's state via the narrow `set_pending_state` +/// setter (the real pipeline path) must not desync the per-source quota +/// counter. Guards the invariant that previously rested on a doc warning on +/// the now-removed `get_pending_mut`: no public API can re-attribute a live +/// entry to a different `hint_sender`. #[test] -fn poc_d1_get_pending_mut_state_change_keeps_counter_consistent() { +fn poc_d1_set_pending_state_keeps_counter_consistent() { let mut queues = ReplicationQueues::new(); let peer = peer_id_from_byte(0xEE); let key = unique_xorname(1); @@ -263,14 +265,16 @@ fn poc_d1_get_pending_mut_state_change_keeps_counter_consistent() { assert_eq!(queues.pending_count_for_sender(&peer), 1); // Exactly what run_verification_cycle does: advance the FSM state. - let entry = queues.get_pending_mut(&key).expect("entry must be present"); - entry.state = VerificationState::QuorumVerified; + let pipeline = queues + .set_pending_state(&key, VerificationState::QuorumVerified) + .expect("entry must be present"); + assert_eq!(pipeline, HintPipeline::Replica, "pipeline preserved"); // Counter unchanged by a state mutation (it tracks membership, not state). assert_eq!( queues.pending_count_for_sender(&peer), 1, - "get_pending_mut state change must not touch the per-source counter" + "state change must not touch the per-source counter" ); // And removal still correctly releases exactly one slot. From ecd4b2c3bb9dcdceec791ddee304ce1f1989e88d Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 20 May 2026 14:47:19 +0900 Subject: [PATCH 3/4] followup(D1): atomic promote_pending_to_fetch + bootstrap drain tracks capacity rejections Per codex round-2 review on PR #99 (two BLOCKERs + one NIT): 1. [BLOCKER] Silent verified-key drop on full fetch_queue. Both verification paths removed the key from pending_verify BEFORE calling enqueue_fetch. When MAX_FETCH_QUEUE was full, enqueue_fetch silently dropped the key, so a verified key disappeared from every queue with no retry path. New `ReplicationQueues::promote_pending_to_fetch` checks capacity FIRST, then removes-from-pending-and-enqueues atomically; on capacity miss the pending entry is left in place so the next verification cycle retries. Both production call sites in `run_verification_cycle` migrated. enqueue_fetch now also returns bool (true = enqueued, false = dedup-or-full) so the few raw callers can observe outcomes. 2. [BLOCKER] False bootstrap completion under capacity-rejected hints. `add_pending_verify` now returns `AdmissionResult` (Admitted / AlreadyPresent / CapacityRejected) instead of bool, so callers can tell "no work needed" from "work was silently dropped". `admit_and_queue_hints` now reports a `capacity_rejected_count` alongside `discovered`; bootstrap paths (initial sync, neighbour sync, recurring sync) feed that count into a new `BootstrapState.capacity_rejected_outstanding` counter via `bootstrap::note_capacity_rejected`. `check_bootstrap_drained` refuses to mark drain complete while that counter is non-zero, so a bootstrap snapshot whose hints overflowed the queue caps stays pending until the source re-delivers them post-drain. 3. [NIT] `evict_stale_removes_old_entries` test used to bypass `add_pending_verify` via direct field access, so it no longer exercised the per-sender counter maintenance the followup added to `evict_stale`. Routed through the public API and added a per-sender counter assertion. A new `AdmissionResult::admitted()` helper preserves backward-compat for the dozen-plus call sites that just want the boolean. The few production sites that distinguish capacity vs dedup branch on the enum. 479 lib tests pass; cfd clean. --- src/replication/bootstrap.rs | 44 ++++++++++ src/replication/mod.rs | 130 +++++++++++++++++++++------ src/replication/scheduling.rs | 155 +++++++++++++++++++++++++-------- src/replication/types.rs | 11 +++ tests/poc_d1_bounded_queues.rs | 28 ++++-- 5 files changed, 299 insertions(+), 69 deletions(-) diff --git a/src/replication/bootstrap.rs b/src/replication/bootstrap.rs index b4d66f9d..07af86a0 100644 --- a/src/replication/bootstrap.rs +++ b/src/replication/bootstrap.rs @@ -122,6 +122,18 @@ pub async fn check_bootstrap_drained( return false; } + // Hints capacity-rejected at the pending_verify bounds during bootstrap + // must be re-delivered by the source before drain can be claimed; + // otherwise we'd silently mark ourselves complete with outstanding work + // the source still owes us (codex round-2 BLOCKER). + if state.capacity_rejected_outstanding > 0 { + debug!( + "Bootstrap NOT drained: {} hints capacity-rejected and awaiting re-admission", + state.capacity_rejected_outstanding + ); + return false; + } + if queues.is_bootstrap_work_empty(&state.pending_keys) { state.drained = true; info!("Bootstrap drained: all peer requests completed and work queues empty"); @@ -131,6 +143,34 @@ pub async fn check_bootstrap_drained( } } +/// Record `n` capacity-rejected bootstrap hints. +/// +/// Hints rejected at the `pending_verify` capacity bounds during bootstrap. +/// Bootstrap cannot drain while this counter is non-zero; the source's next +/// periodic hint will replay them once the pending queue has space. +pub async fn note_capacity_rejected(bootstrap_state: &Arc>, n: usize) { + let mut state = bootstrap_state.write().await; + state.capacity_rejected_outstanding = state.capacity_rejected_outstanding.saturating_add(n); + debug!( + "Bootstrap: +{n} capacity-rejected hints (now {})", + state.capacity_rejected_outstanding + ); +} + +/// Clear the capacity-rejected counter. Called by an admission cycle that +/// observed zero rejections (the source successfully re-delivered all +/// previously-overflowed hints). +pub async fn clear_capacity_rejected(bootstrap_state: &Arc>) { + let mut state = bootstrap_state.write().await; + if state.capacity_rejected_outstanding > 0 { + debug!( + "Bootstrap: clearing {} previously-outstanding capacity-rejected hints", + state.capacity_rejected_outstanding + ); + state.capacity_rejected_outstanding = 0; + } +} + /// Record a set of discovered keys into the bootstrap state for drain tracking. #[allow(clippy::implicit_hasher)] pub async fn track_discovered_keys( @@ -193,6 +233,7 @@ mod tests { drained: true, pending_peer_requests: 5, pending_keys: HashSet::new(), + capacity_rejected_outstanding: 0, })); let queues = ReplicationQueues::new(); @@ -208,6 +249,7 @@ mod tests { drained: false, pending_peer_requests: 2, pending_keys: HashSet::new(), + capacity_rejected_outstanding: 0, })); let queues = ReplicationQueues::new(); @@ -223,6 +265,7 @@ mod tests { drained: false, pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), + capacity_rejected_outstanding: 0, })); let queues = ReplicationQueues::new(); @@ -237,6 +280,7 @@ mod tests { drained: false, pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), + capacity_rejected_outstanding: 0, })); let mut queues = ReplicationQueues::new(); diff --git a/src/replication/mod.rs b/src/replication/mod.rs index afbddea9..1cacbf6d 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -852,7 +852,7 @@ impl ReplicationEngine { if let Some(resp) = response { if !resp.bootstrapping { // Admit hints into verification pipeline. - let admitted_keys = admit_and_queue_hints( + let outcome = admit_and_queue_hints( &self_id, peer, &resp.replica_hints, @@ -866,9 +866,26 @@ impl ReplicationEngine { .await; // Track discovered keys for drain detection. - if !admitted_keys.is_empty() { - bootstrap::track_discovered_keys(&bootstrap_state, &admitted_keys) - .await; + if !outcome.discovered.is_empty() { + bootstrap::track_discovered_keys( + &bootstrap_state, + &outcome.discovered, + ) + .await; + } + + // Record capacity rejections so the drain check + // does NOT mark bootstrap complete while work + // was silently dropped at the queue caps. The + // source's next periodic hint will replay the + // rejected keys once `pending_verify` has + // drained. + if outcome.capacity_rejected_count > 0 { + bootstrap::note_capacity_rejected( + &bootstrap_state, + outcome.capacity_rejected_count, + ) + .await; } } } @@ -1300,7 +1317,7 @@ async fn handle_neighbor_sync_request( } // Admit inbound hints and queue for verification. - let admitted_keys = admit_and_queue_hints( + let outcome = admit_and_queue_hints( &self_id, source, &request.replica_hints, @@ -1313,10 +1330,18 @@ async fn handle_neighbor_sync_request( ) .await; - // Track discovered keys for bootstrap drain detection so that - // hints admitted via inbound sync requests are not missed. - if is_bootstrapping && !admitted_keys.is_empty() { - bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await; + // Track discovered keys for bootstrap drain detection so that hints + // admitted via inbound sync requests are not missed. If any hints were + // capacity-rejected, mark the bootstrap "not yet drained" so the next + // sync from this source can re-admit them once queues free up. + if is_bootstrapping { + if !outcome.discovered.is_empty() { + bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await; + } + if outcome.capacity_rejected_count > 0 { + bootstrap::note_capacity_rejected(bootstrap_state, outcome.capacity_rejected_count) + .await; + } } Ok(()) @@ -1717,7 +1742,7 @@ async fn handle_sync_response( let mut state = sync_state.write().await; state.clear_active_bootstrap_claim(peer); } - let admitted_keys = admit_and_queue_hints( + let outcome = admit_and_queue_hints( self_id, peer, &resp.replica_hints, @@ -1730,10 +1755,18 @@ async fn handle_sync_response( ) .await; - // Track discovered keys for bootstrap drain detection so that - // hints admitted via regular neighbor sync are not missed. - if bootstrapping && !admitted_keys.is_empty() { - bootstrap::track_discovered_keys(bootstrap_state, &admitted_keys).await; + // Track discovered keys for bootstrap drain detection so that hints + // admitted via regular neighbor sync are not missed. Capacity- + // rejected keys keep bootstrap "not yet drained" until the next + // sync replays them post-drain. + if bootstrapping { + if !outcome.discovered.is_empty() { + bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await; + } + if outcome.capacity_rejected_count > 0 { + bootstrap::note_capacity_rejected(bootstrap_state, outcome.capacity_rejected_count) + .await; + } } } } @@ -1742,6 +1775,19 @@ async fn handle_sync_response( /// /// Shared by neighbor-sync request handling, response handling, and bootstrap /// sync so that admission + queueing logic lives in one place. +#[allow(clippy::too_many_arguments)] +/// Outcome of [`admit_and_queue_hints`]. +/// +/// `capacity_rejected_count` is non-zero when one or more legitimately +/// admissible hints were dropped because `pending_verify`'s global or +/// per-source bound was hit. Callers that care about completeness +/// (bootstrap drain accounting) MUST NOT treat their work as complete while +/// this is > 0 — the source will need to re-hint after capacity frees up. +struct AdmissionOutcome { + discovered: HashSet, + capacity_rejected_count: usize, +} + #[allow(clippy::too_many_arguments)] async fn admit_and_queue_hints( self_id: &PeerId, @@ -1753,7 +1799,7 @@ async fn admit_and_queue_hints( storage: &Arc, paid_list: &Arc, queues: &Arc>, -) -> HashSet { +) -> AdmissionOutcome { let pending_keys: HashSet = { let q = queues.read().await; q.pending_keys().into_iter().collect() @@ -1772,12 +1818,13 @@ async fn admit_and_queue_hints( .await; let mut discovered = HashSet::new(); + let mut capacity_rejected_count: usize = 0; let mut q = queues.write().await; let now = Instant::now(); for key in admitted.replica_keys { if !storage.exists(&key).unwrap_or(false) { - let added = q.add_pending_verify( + let result = q.add_pending_verify( key, VerificationEntry { state: VerificationState::PendingVerify, @@ -1788,14 +1835,20 @@ async fn admit_and_queue_hints( hint_sender: *source_peer, }, ); - if added { - discovered.insert(key); + match result { + crate::replication::scheduling::AdmissionResult::Admitted => { + discovered.insert(key); + } + crate::replication::scheduling::AdmissionResult::AlreadyPresent => {} + crate::replication::scheduling::AdmissionResult::CapacityRejected => { + capacity_rejected_count += 1; + } } } } for key in admitted.paid_only_keys { - let added = q.add_pending_verify( + let result = q.add_pending_verify( key, VerificationEntry { state: VerificationState::PendingVerify, @@ -1806,12 +1859,28 @@ async fn admit_and_queue_hints( hint_sender: *source_peer, }, ); - if added { - discovered.insert(key); + match result { + crate::replication::scheduling::AdmissionResult::Admitted => { + discovered.insert(key); + } + crate::replication::scheduling::AdmissionResult::AlreadyPresent => {} + crate::replication::scheduling::AdmissionResult::CapacityRejected => { + capacity_rejected_count += 1; + } } } - discovered + if capacity_rejected_count > 0 { + debug!( + "admit_and_queue_hints from {source_peer}: {capacity_rejected_count} hints \ + rejected at queue capacity; source will need to re-hint after pending_verify drains" + ); + } + + AdmissionOutcome { + discovered, + capacity_rejected_count, + } } // --------------------------------------------------------------------------- @@ -1938,8 +2007,9 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { let sources = evidence.get(&key).map_or_else(Vec::new, |ev| { quorum::present_sources_for_key(&key, ev, &targets) }); - q.remove_pending(&key); if sources.is_empty() { + // Terminal failure: remove pending and report. No fetch path. + q.remove_pending(&key); warn!( "Locally paid key {} has no responding holders (possible data loss)", hex::encode(key) @@ -1947,7 +2017,10 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { terminal_keys.push(key); } else { let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes()); - q.enqueue_fetch(key, distance, sources); + // Atomic remove+enqueue: if fetch_queue is at capacity, the + // pending entry is preserved and retried next cycle (no + // silent drop of verified replica-repair work). + let _ = q.promote_pending_to_fetch(key, distance, sources); } } } @@ -2026,9 +2099,12 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { if fetch_eligible && !sources.is_empty() { let distance = crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes()); - q.remove_pending(&key); - q.enqueue_fetch(key, distance, sources); - // Not terminal — key moved to fetch queue. + // Atomic remove+enqueue: on fetch_queue capacity miss + // the pending entry is preserved so this verified key + // is retried on the next cycle (no silent drop). + let _ = q.promote_pending_to_fetch(key, distance, sources); + // Not terminal — either moved to fetch queue, or + // retained as pending until queue drains. } else if fetch_eligible && sources.is_empty() { warn!( "Verified responsible key {} has no holders (possible data loss)", diff --git a/src/replication/scheduling.rs b/src/replication/scheduling.rs index d7ec944d..ce02386c 100644 --- a/src/replication/scheduling.rs +++ b/src/replication/scheduling.rs @@ -76,6 +76,34 @@ pub const MAX_PENDING_VERIFY_PER_PEER: usize = 8_192; /// `enqueue_fetch`) when full. pub const MAX_FETCH_QUEUE: usize = 131_072; +/// Outcome of [`ReplicationQueues::add_pending_verify`]. +/// +/// Distinguishes "the key is already being handled" from "the key was +/// silently dropped due to a queue capacity bound". Bootstrap drain +/// accounting and source-side retry logic MUST treat `CapacityRejected` as +/// outstanding work; treating it like a dedup hit was the silent-drop +/// regression introduced when the queues first became bounded. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AdmissionResult { + /// New entry inserted into `pending_verify`. + Admitted, + /// Key was already in some pipeline stage; the existing entry is left + /// in place. No retry required. + AlreadyPresent, + /// Global or per-source capacity bound rejected the entry. The caller + /// MUST treat this as work still to do (not as silently completed). + CapacityRejected, +} + +impl AdmissionResult { + /// `true` only for [`AdmissionResult::Admitted`]. Preserves call sites + /// that only want to know "did the insert happen". + #[must_use] + pub fn admitted(self) -> bool { + matches!(self, Self::Admitted) + } +} + // --------------------------------------------------------------------------- // In-flight entry // --------------------------------------------------------------------------- @@ -154,25 +182,30 @@ impl ReplicationQueues { /// Add a key to pending verification if not already present in any queue. /// - /// Returns `true` if the key was newly added (Rule 8: cross-queue dedup). - /// - /// Returns `false` — without inserting — when either: - /// * the global [`MAX_PENDING_VERIFY`] memory backstop is reached, or - /// * the entry's `hint_sender` already holds [`MAX_PENDING_VERIFY_PER_PEER`] - /// pending entries (per-source fairness — a flooding peer can only - /// exhaust its own quota and can never starve honest peers). - /// - /// Callers already treat a `false` result as "not admitted". - pub fn add_pending_verify(&mut self, key: XorName, entry: VerificationEntry) -> bool { + /// Returns an [`AdmissionResult`] distinguishing the three outcomes: + /// * `Admitted` — newly inserted. + /// * `AlreadyPresent` — Rule 8 cross-queue dedup (the key is already in + /// `pending_verify`, `fetch_queue`, or `in_flight_fetch`); the existing + /// entry remains and there is no work to retry. + /// * `CapacityRejected` — global or per-source bound hit; the work is + /// genuinely lost and the caller (e.g. bootstrap drain accounting, + /// source-side retry) MUST treat this as still-outstanding work, not as + /// "done". Without this distinction a bootstrap snapshot whose hints + /// are capacity-rejected would silently mark itself drained. + pub fn add_pending_verify( + &mut self, + key: XorName, + entry: VerificationEntry, + ) -> AdmissionResult { if self.contains_key(&key) { - return false; + return AdmissionResult::AlreadyPresent; } if self.pending_verify.len() >= MAX_PENDING_VERIFY { debug!( "pending_verify at global capacity ({MAX_PENDING_VERIFY}); rejecting key {}", hex::encode(key) ); - return false; + return AdmissionResult::CapacityRejected; } let sender = entry.hint_sender; let sender_count = self.pending_per_sender.get(&sender).copied().unwrap_or(0); @@ -182,11 +215,11 @@ impl ReplicationQueues { rejecting key {} (honest peers are unaffected)", hex::encode(key) ); - return false; + return AdmissionResult::CapacityRejected; } self.pending_verify.insert(key, entry); *self.pending_per_sender.entry(sender).or_insert(0) += 1; - true + AdmissionResult::Admitted } /// Decrement (and prune at zero) the per-sender counter for `sender`. @@ -260,23 +293,28 @@ impl ReplicationQueues { /// Enqueue a key for fetch with its distance and verified sources. /// - /// No-op if the key is already in any pipeline stage (Rule 8: cross-queue - /// dedup), or if `fetch_queue` is already at [`MAX_FETCH_QUEUE`]. The - /// capacity drop bounds memory (and the outbound `FetchRequest` storm) - /// under a network hint flood. - pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec) { + /// Returns `true` if the candidate was enqueued, `false` if it was + /// already present in any pipeline stage (Rule 8: cross-queue dedup) or + /// the `fetch_queue` is at [`MAX_FETCH_QUEUE`]. + /// + /// Callers that have removed the key from `pending_verify` immediately + /// before this call should prefer [`promote_pending_to_fetch`](Self::promote_pending_to_fetch), + /// which performs the move atomically and leaves the pending entry in + /// place when the fetch queue is full (so verified work is retried on + /// the next cycle instead of being silently lost). + pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec) -> bool { if self.pending_verify.contains_key(&key) || self.fetch_queue_keys.contains(&key) || self.in_flight_fetch.contains_key(&key) { - return; + return false; } if self.fetch_queue.len() >= MAX_FETCH_QUEUE { debug!( "fetch_queue at capacity ({MAX_FETCH_QUEUE}); dropping new key {}", hex::encode(key) ); - return; + return false; } self.fetch_queue_keys.insert(key); self.fetch_queue.push(FetchCandidate { @@ -284,6 +322,41 @@ impl ReplicationQueues { distance, sources, }); + true + } + + /// Atomically promote a key from `pending_verify` to `fetch_queue`. + /// + /// Checks `fetch_queue` capacity FIRST, then removes the pending entry + /// and enqueues the fetch candidate. If `fetch_queue` is full, the + /// pending entry is **left in place** so the next verification cycle + /// can retry — preventing the silent-drop regression where a verified + /// key removed from `pending_verify` could be dropped by a full fetch + /// queue and lost from every stage. + /// + /// Returns `true` on successful promotion, `false` when the fetch queue + /// is at capacity (pending entry preserved). + pub fn promote_pending_to_fetch( + &mut self, + key: XorName, + distance: XorName, + sources: Vec, + ) -> bool { + if self.fetch_queue.len() >= MAX_FETCH_QUEUE { + debug!( + "fetch_queue at capacity ({MAX_FETCH_QUEUE}); leaving {} pending \ + for retry next cycle", + hex::encode(key) + ); + return false; + } + // Capacity confirmed; safe to release the pending slot and enqueue. + let _ = self.remove_pending(&key); + // enqueue_fetch returns false only on capacity or already-queued; the + // capacity check above and the just-removed pending state make this + // succeed. If a concurrent path put the key into fetch_queue/in_flight + // between, dropping the duplicate is fine. + self.enqueue_fetch(key, distance, sources) } /// Dequeue the nearest fetch candidate. @@ -449,7 +522,7 @@ mod tests { fn add_pending_verify_new_key_succeeds() { let mut queues = ReplicationQueues::new(); let key = xor_name_from_byte(0x01); - assert!(queues.add_pending_verify(key, test_entry(1))); + assert!(queues.add_pending_verify(key, test_entry(1)).admitted()); assert_eq!(queues.pending_count(), 1); } @@ -457,8 +530,8 @@ mod tests { fn add_pending_verify_duplicate_rejected() { let mut queues = ReplicationQueues::new(); let key = xor_name_from_byte(0x01); - assert!(queues.add_pending_verify(key, test_entry(1))); - assert!(!queues.add_pending_verify(key, test_entry(2))); + assert!(queues.add_pending_verify(key, test_entry(1)).admitted()); + assert!(!queues.add_pending_verify(key, test_entry(2)).admitted()); assert_eq!(queues.pending_count(), 1); } @@ -470,7 +543,7 @@ mod tests { queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(1)]); assert!( - !queues.add_pending_verify(key, test_entry(1)), + !queues.add_pending_verify(key, test_entry(1)).admitted(), "should reject key already in fetch queue" ); } @@ -483,7 +556,7 @@ mod tests { queues.start_fetch(key, source, vec![source]); assert!( - !queues.add_pending_verify(key, test_entry(1)), + !queues.add_pending_verify(key, test_entry(1)).admitted(), "should reject key already in-flight" ); } @@ -638,21 +711,33 @@ mod tests { let mut queues = ReplicationQueues::new(); let key = xor_name_from_byte(0x01); - // Create entry with a backdated timestamp. Use a small subtraction - // to avoid `checked_sub` returning `None` on freshly-booted CI runners. + // Go through the public `add_pending_verify` so the per-sender + // counter is correctly bumped — the entry's `hint_sender` slot must + // be released by `evict_stale` and we want to exercise that path. let mut entry = test_entry(1); + let sender = entry.hint_sender; + // Backdate via the same defensive checked_sub used elsewhere so + // freshly-booted CI clocks don't trip us up. entry.created_at = Instant::now() .checked_sub(Duration::from_secs(2)) .unwrap_or_else(Instant::now); - queues.pending_verify.insert(key, entry); + assert!(queues.add_pending_verify(key, entry).admitted()); assert_eq!(queues.pending_count(), 1); + assert_eq!(queues.pending_count_for_sender(&sender), 1); + queues.evict_stale(Duration::from_secs(1)); assert_eq!( queues.pending_count(), 0, "entry older than max_age should be evicted" ); + // Per-sender counter must be released alongside the map removal. + assert_eq!( + queues.pending_count_for_sender(&sender), + 0, + "evict_stale must release the per-sender slot" + ); } #[test] @@ -703,7 +788,7 @@ mod tests { // Step 1: Add to PendingVerify. assert!( - queues.add_pending_verify(key, test_entry(1)), + queues.add_pending_verify(key, test_entry(1)).admitted(), "first add to PendingVerify should succeed" ); assert!( @@ -726,7 +811,7 @@ mod tests { // Step 4: Attempt to re-add to PendingVerify -> should fail. assert!( - !queues.add_pending_verify(key, test_entry(4)), + !queues.add_pending_verify(key, test_entry(4)).admitted(), "key in FetchQueue should be rejected from PendingVerify" ); @@ -740,7 +825,7 @@ mod tests { // Step 6: Attempt to add to PendingVerify while in-flight -> reject. assert!( - !queues.add_pending_verify(key, test_entry(5)), + !queues.add_pending_verify(key, test_entry(5)).admitted(), "key in-flight should be rejected from PendingVerify" ); @@ -773,7 +858,7 @@ mod tests { hint_sender: peer_id_from_byte(1), }; - assert!(queues.add_pending_verify(key, entry)); + assert!(queues.add_pending_verify(key, entry).admitted()); let pending = queues.get_pending(&key).expect("should be pending"); assert_eq!( @@ -793,7 +878,7 @@ mod tests { }; assert!( - !queues.add_pending_verify(key, paid_entry), + !queues.add_pending_verify(key, paid_entry).admitted(), "duplicate key should be rejected regardless of pipeline" ); @@ -832,7 +917,7 @@ mod tests { hint_sender, }; assert!( - queues.add_pending_verify(key, entry), + queues.add_pending_verify(key, entry).admitted(), "new key should be admitted to PendingVerify" ); assert!(queues.contains_key(&key)); diff --git a/src/replication/types.rs b/src/replication/types.rs index 1d2ff646..75e24d08 100644 --- a/src/replication/types.rs +++ b/src/replication/types.rs @@ -371,6 +371,16 @@ pub struct BootstrapState { /// Keys discovered during bootstrap that are still in the verification / /// fetch pipeline. pub pending_keys: HashSet, + /// Hints that were silently dropped at the `pending_verify` capacity + /// bounds during bootstrap, awaiting re-admission once queues drain. + /// While this is non-zero `check_bootstrap_drained` refuses to mark the + /// node fully drained: the source has not yet successfully delivered + /// every key, even though the explicitly-admitted ones may have + /// completed. Decremented when the next admission cycle sees that + /// `pending_verify` has space again (logically the source can now + /// re-deliver) — or, conservatively, when explicit drain replay logic + /// observes "no rejections this cycle". + pub capacity_rejected_outstanding: usize, } impl BootstrapState { @@ -381,6 +391,7 @@ impl BootstrapState { drained: false, pending_peer_requests: 0, pending_keys: HashSet::new(), + capacity_rejected_outstanding: 0, } } diff --git a/tests/poc_d1_bounded_queues.rs b/tests/poc_d1_bounded_queues.rs index 9406785e..79465f08 100644 --- a/tests/poc_d1_bounded_queues.rs +++ b/tests/poc_d1_bounded_queues.rs @@ -145,7 +145,10 @@ fn poc_d1_flooding_peer_cannot_starve_honest_peer() { let attacker_flood: u32 = (MAX_PENDING_VERIFY_PER_PEER as u32).saturating_add(10_000); let mut attacker_admitted = 0usize; for i in 0..attacker_flood { - if queues.add_pending_verify(unique_xorname(i), entry_from(attacker)) { + if queues + .add_pending_verify(unique_xorname(i), entry_from(attacker)) + .admitted() + { attacker_admitted += 1; } } @@ -173,7 +176,10 @@ fn poc_d1_flooding_peer_cannot_starve_honest_peer() { let mut honest_admitted = 0usize; for j in 0..2_000u32 { let key = unique_xorname(10_000_000 + j); - if queues.add_pending_verify(key, entry_from(honest)) { + if queues + .add_pending_verify(key, entry_from(honest)) + .admitted() + { honest_admitted += 1; } } @@ -192,7 +198,9 @@ fn poc_d1_per_sender_counter_is_consistent() { let peer = peer_id_from_byte(0xCC); for i in 0..100u32 { - assert!(queues.add_pending_verify(unique_xorname(i), entry_from(peer))); + assert!(queues + .add_pending_verify(unique_xorname(i), entry_from(peer)) + .admitted()); } assert_eq!(queues.pending_count_for_sender(&peer), 100); @@ -217,7 +225,9 @@ fn poc_d1_per_sender_counter_is_consistent() { ); // Quota fully reusable after release. - assert!(queues.add_pending_verify(unique_xorname(999), entry_from(peer))); + assert!(queues + .add_pending_verify(unique_xorname(999), entry_from(peer)) + .admitted()); assert_eq!(queues.pending_count_for_sender(&peer), 1); } @@ -229,7 +239,9 @@ fn poc_d1_bound_preserves_legitimate_entries() { for i in 0..1_000u32 { assert!( - queues.add_pending_verify(unique_xorname(i), entry_from(peer)), + queues + .add_pending_verify(unique_xorname(i), entry_from(peer)) + .admitted(), "legitimate entries well under both caps are always admitted" ); } @@ -237,7 +249,9 @@ fn poc_d1_bound_preserves_legitimate_entries() { // Cross-queue dedup still holds (existing key not re-admitted, no // double-count of the per-source quota). - assert!(!queues.add_pending_verify(unique_xorname(0), entry_from(peer))); + assert!(!queues + .add_pending_verify(unique_xorname(0), entry_from(peer)) + .admitted()); assert_eq!( queues.pending_count(), 1_000, @@ -261,7 +275,7 @@ fn poc_d1_set_pending_state_keeps_counter_consistent() { let peer = peer_id_from_byte(0xEE); let key = unique_xorname(1); - assert!(queues.add_pending_verify(key, entry_from(peer))); + assert!(queues.add_pending_verify(key, entry_from(peer)).admitted()); assert_eq!(queues.pending_count_for_sender(&peer), 1); // Exactly what run_verification_cycle does: advance the FSM state. From 237367f972e72e7ce94e2cf434903e5d27415b21 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 20 May 2026 15:11:08 +0900 Subject: [PATCH 4/4] followup(D1): wire clear_capacity_rejected per-source so bootstrap can drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per round-3 review on PR #99 (both Claude + codex flagged the same BLOCKER): The round-2 commit introduced `note_capacity_rejected` / `clear_capacity_rejected` for bootstrap drain accounting but only wired the first one. `clear_capacity_rejected` was defined as the retirement path in the doc comment yet had zero callers. The counter was monotonically increasing, so after the first `pending_verify` capacity rejection during bootstrap the drain check returned `false` forever — the node never transitioned out of `is_bootstrapping`. Fix: track outstanding rejections per `PeerId` (not as a single counter) so each source's contribution to "not yet drained" is retired independently. `note_capacity_rejected` adds the source to a `HashSet`; `clear_capacity_rejected(source)` removes it. The three production `admit_and_queue_hints` call sites now branch explicitly: a cycle with `capacity_rejected_count > 0` notes the source, while a clean cycle from the same source clears it. This matches the per-source granularity codex recommended (treating one source's clean cycle as evidence for THAT source's hints, not others'), so a node bootstrapping from multiple peers can't have one peer's clean cycle wrongly retire another peer's outstanding work. Two new tests in `bootstrap::tests` pin the behaviour: capacity_rejected_clears_on_clean_cycle — round-3 regression guard capacity_rejected_is_per_source — per-source granularity 481 lib tests pass; 211 replication tests; 6 D1 PoC; cfd clean. --- src/replication/bootstrap.rs | 123 ++++++++++++++++++++++++++--------- src/replication/mod.rs | 38 +++++------ src/replication/types.rs | 23 +++---- 3 files changed, 123 insertions(+), 61 deletions(-) diff --git a/src/replication/bootstrap.rs b/src/replication/bootstrap.rs index 07af86a0..a4ea7026 100644 --- a/src/replication/bootstrap.rs +++ b/src/replication/bootstrap.rs @@ -123,14 +123,14 @@ pub async fn check_bootstrap_drained( } // Hints capacity-rejected at the pending_verify bounds during bootstrap - // must be re-delivered by the source before drain can be claimed; - // otherwise we'd silently mark ourselves complete with outstanding work - // the source still owes us (codex round-2 BLOCKER). - if state.capacity_rejected_outstanding > 0 { - debug!( - "Bootstrap NOT drained: {} hints capacity-rejected and awaiting re-admission", - state.capacity_rejected_outstanding - ); + // must be re-delivered by the originating source before drain can be + // claimed; otherwise we'd silently mark ourselves complete with + // outstanding work the source still owes us (codex round-2 BLOCKER). + // The set retires per-source as each source's next admission cycle + // completes with zero rejections — see `clear_capacity_rejected`. + if !state.capacity_rejected_sources.is_empty() { + let n = state.capacity_rejected_sources.len(); + debug!("Bootstrap NOT drained: {n} source(s) have outstanding capacity-rejected hints"); return false; } @@ -143,31 +143,44 @@ pub async fn check_bootstrap_drained( } } -/// Record `n` capacity-rejected bootstrap hints. +/// Record that `source` had one or more hints capacity-rejected this cycle. /// -/// Hints rejected at the `pending_verify` capacity bounds during bootstrap. -/// Bootstrap cannot drain while this counter is non-zero; the source's next -/// periodic hint will replay them once the pending queue has space. -pub async fn note_capacity_rejected(bootstrap_state: &Arc>, n: usize) { +/// Idempotent: tracks a set of sources, not a counter. Bootstrap cannot +/// drain while this source is in the set; cleared by +/// [`clear_capacity_rejected`] when the same source's next admission cycle +/// completes with zero rejections (i.e. the source successfully +/// re-delivered everything that previously overflowed). +pub async fn note_capacity_rejected( + bootstrap_state: &Arc>, + source: saorsa_core::identity::PeerId, +) { let mut state = bootstrap_state.write().await; - state.capacity_rejected_outstanding = state.capacity_rejected_outstanding.saturating_add(n); - debug!( - "Bootstrap: +{n} capacity-rejected hints (now {})", - state.capacity_rejected_outstanding - ); + if state.capacity_rejected_sources.insert(source) { + let n = state.capacity_rejected_sources.len(); + debug!( + "Bootstrap: source {source} now has outstanding capacity-rejected hints \ + ({n} sources outstanding)" + ); + } } -/// Clear the capacity-rejected counter. Called by an admission cycle that -/// observed zero rejections (the source successfully re-delivered all -/// previously-overflowed hints). -pub async fn clear_capacity_rejected(bootstrap_state: &Arc>) { +/// Mark `source`'s outstanding capacity rejections as cleared. +/// +/// Called whenever `source` completes an admission cycle with zero +/// capacity rejections: the source successfully re-delivered any hints +/// that previously overflowed, so its contribution to "bootstrap not +/// drained" is retired. No-op if the source had no outstanding rejections. +pub async fn clear_capacity_rejected( + bootstrap_state: &Arc>, + source: &saorsa_core::identity::PeerId, +) { let mut state = bootstrap_state.write().await; - if state.capacity_rejected_outstanding > 0 { + if state.capacity_rejected_sources.remove(source) { + let n = state.capacity_rejected_sources.len(); debug!( - "Bootstrap: clearing {} previously-outstanding capacity-rejected hints", - state.capacity_rejected_outstanding + "Bootstrap: cleared outstanding capacity rejections for {source} \ + ({n} sources still outstanding)" ); - state.capacity_rejected_outstanding = 0; } } @@ -233,7 +246,7 @@ mod tests { drained: true, pending_peer_requests: 5, pending_keys: HashSet::new(), - capacity_rejected_outstanding: 0, + capacity_rejected_sources: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -249,7 +262,7 @@ mod tests { drained: false, pending_peer_requests: 2, pending_keys: HashSet::new(), - capacity_rejected_outstanding: 0, + capacity_rejected_sources: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -265,7 +278,7 @@ mod tests { drained: false, pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), - capacity_rejected_outstanding: 0, + capacity_rejected_sources: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -280,7 +293,7 @@ mod tests { drained: false, pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), - capacity_rejected_outstanding: 0, + capacity_rejected_sources: HashSet::new(), })); let mut queues = ReplicationQueues::new(); @@ -343,4 +356,54 @@ mod tests { "should saturate at zero" ); } + + /// Round-3 regression: a source that previously had capacity-rejected + /// hints must be retired from the "not yet drained" list when it + /// completes a later admission cycle with zero rejections, otherwise + /// `check_bootstrap_drained` is permanently wedged after a single + /// rejection. + #[tokio::test] + async fn capacity_rejected_clears_on_clean_cycle() { + let state = Arc::new(RwLock::new(BootstrapState::new())); + let queues = ReplicationQueues::new(); + let source = saorsa_core::identity::PeerId::from_bytes([7u8; 32]); + + // First cycle: this source overflowed, drain blocked. + note_capacity_rejected(&state, source).await; + assert!( + !check_bootstrap_drained(&state, &queues).await, + "drain must be blocked while a source has outstanding capacity rejections" + ); + + // Second cycle from the SAME source: zero rejections → clear it. + clear_capacity_rejected(&state, &source).await; + assert!( + check_bootstrap_drained(&state, &queues).await, + "drain must complete once the source's outstanding rejections are cleared" + ); + } + + /// Per-source granularity: one source's clean cycle must NOT clear a + /// different source's outstanding rejections. + #[tokio::test] + async fn capacity_rejected_is_per_source() { + let state = Arc::new(RwLock::new(BootstrapState::new())); + let queues = ReplicationQueues::new(); + let source_a = saorsa_core::identity::PeerId::from_bytes([0xAA; 32]); + let source_b = saorsa_core::identity::PeerId::from_bytes([0xBB; 32]); + + note_capacity_rejected(&state, source_a).await; + note_capacity_rejected(&state, source_b).await; + assert!(!check_bootstrap_drained(&state, &queues).await); + + // Only A clears; B still owes us re-hints. + clear_capacity_rejected(&state, &source_a).await; + assert!( + !check_bootstrap_drained(&state, &queues).await, + "B's outstanding rejections must keep drain blocked" + ); + + clear_capacity_rejected(&state, &source_b).await; + assert!(check_bootstrap_drained(&state, &queues).await); + } } diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 1cacbf6d..9e4e03f0 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -874,18 +874,14 @@ impl ReplicationEngine { .await; } - // Record capacity rejections so the drain check - // does NOT mark bootstrap complete while work - // was silently dropped at the queue caps. The - // source's next periodic hint will replay the - // rejected keys once `pending_verify` has - // drained. + // Record / retire capacity rejections so the + // drain check correctly reflects whether each + // source still owes us re-hinted work after + // queue overflow. if outcome.capacity_rejected_count > 0 { - bootstrap::note_capacity_rejected( - &bootstrap_state, - outcome.capacity_rejected_count, - ) - .await; + bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await; + } else { + bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await; } } } @@ -1331,16 +1327,17 @@ async fn handle_neighbor_sync_request( .await; // Track discovered keys for bootstrap drain detection so that hints - // admitted via inbound sync requests are not missed. If any hints were - // capacity-rejected, mark the bootstrap "not yet drained" so the next - // sync from this source can re-admit them once queues free up. + // admitted via inbound sync requests are not missed. Capacity-rejected + // hints keep this source on the "not yet drained" list until its next + // sync re-admits them; a clean cycle clears the source. if is_bootstrapping { if !outcome.discovered.is_empty() { bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await; } if outcome.capacity_rejected_count > 0 { - bootstrap::note_capacity_rejected(bootstrap_state, outcome.capacity_rejected_count) - .await; + bootstrap::note_capacity_rejected(bootstrap_state, *source).await; + } else { + bootstrap::clear_capacity_rejected(bootstrap_state, source).await; } } @@ -1757,15 +1754,16 @@ async fn handle_sync_response( // Track discovered keys for bootstrap drain detection so that hints // admitted via regular neighbor sync are not missed. Capacity- - // rejected keys keep bootstrap "not yet drained" until the next - // sync replays them post-drain. + // rejected hints keep this source on the "not yet drained" list + // until its next sync replays them; a clean cycle clears it. if bootstrapping { if !outcome.discovered.is_empty() { bootstrap::track_discovered_keys(bootstrap_state, &outcome.discovered).await; } if outcome.capacity_rejected_count > 0 { - bootstrap::note_capacity_rejected(bootstrap_state, outcome.capacity_rejected_count) - .await; + bootstrap::note_capacity_rejected(bootstrap_state, *peer).await; + } else { + bootstrap::clear_capacity_rejected(bootstrap_state, peer).await; } } } diff --git a/src/replication/types.rs b/src/replication/types.rs index 75e24d08..be842e5f 100644 --- a/src/replication/types.rs +++ b/src/replication/types.rs @@ -371,16 +371,17 @@ pub struct BootstrapState { /// Keys discovered during bootstrap that are still in the verification / /// fetch pipeline. pub pending_keys: HashSet, - /// Hints that were silently dropped at the `pending_verify` capacity - /// bounds during bootstrap, awaiting re-admission once queues drain. - /// While this is non-zero `check_bootstrap_drained` refuses to mark the - /// node fully drained: the source has not yet successfully delivered - /// every key, even though the explicitly-admitted ones may have - /// completed. Decremented when the next admission cycle sees that - /// `pending_verify` has space again (logically the source can now - /// re-deliver) — or, conservatively, when explicit drain replay logic - /// observes "no rejections this cycle". - pub capacity_rejected_outstanding: usize, + /// Peers whose last bootstrap admission cycle had one or more hints + /// silently dropped at the `pending_verify` capacity bounds. Each entry + /// represents "this source still owes us at least one re-hinted key + /// after the queues drain". `check_bootstrap_drained` refuses to claim + /// the node fully drained while this set is non-empty: a source's + /// presence is cleared by its next admission cycle that completes with + /// zero capacity rejections (i.e. the source successfully re-delivered + /// everything that previously overflowed). Tracking per-source instead + /// of a global counter prevents one peer's rejection from being + /// "cleared" by an unrelated peer's clean cycle. + pub capacity_rejected_sources: HashSet, } impl BootstrapState { @@ -391,7 +392,7 @@ impl BootstrapState { drained: false, pending_peer_requests: 0, pending_keys: HashSet::new(), - capacity_rejected_outstanding: 0, + capacity_rejected_sources: HashSet::new(), } }