Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/replication/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ pub async fn check_bootstrap_drained(
return false;
}

// Hints capacity-rejected at the pending_verify bounds during bootstrap
// must be re-delivered by the originating source before drain can be
// claimed; otherwise we'd silently mark ourselves complete with
// outstanding work the source still owes us (codex round-2 BLOCKER).
// The set retires per-source as each source's next admission cycle
// completes with zero rejections — see `clear_capacity_rejected`.
if !state.capacity_rejected_sources.is_empty() {
let n = state.capacity_rejected_sources.len();
debug!("Bootstrap NOT drained: {n} source(s) have outstanding capacity-rejected hints");
return false;
}

if queues.is_bootstrap_work_empty(&state.pending_keys) {
state.drained = true;
info!("Bootstrap drained: all peer requests completed and work queues empty");
Expand All @@ -131,6 +143,47 @@ pub async fn check_bootstrap_drained(
}
}

/// Record that `source` had one or more hints capacity-rejected this cycle.
///
/// Idempotent: tracks a set of sources, not a counter. Bootstrap cannot
/// drain while this source is in the set; cleared by
/// [`clear_capacity_rejected`] when the same source's next admission cycle
/// completes with zero rejections (i.e. the source successfully
/// re-delivered everything that previously overflowed).
pub async fn note_capacity_rejected(
bootstrap_state: &Arc<RwLock<BootstrapState>>,
source: saorsa_core::identity::PeerId,
) {
let mut state = bootstrap_state.write().await;
if state.capacity_rejected_sources.insert(source) {
let n = state.capacity_rejected_sources.len();
debug!(
"Bootstrap: source {source} now has outstanding capacity-rejected hints \
({n} sources outstanding)"
);
}
}

/// Mark `source`'s outstanding capacity rejections as cleared.
///
/// Called whenever `source` completes an admission cycle with zero
/// capacity rejections: the source successfully re-delivered any hints
/// that previously overflowed, so its contribution to "bootstrap not
/// drained" is retired. No-op if the source had no outstanding rejections.
pub async fn clear_capacity_rejected(
bootstrap_state: &Arc<RwLock<BootstrapState>>,
source: &saorsa_core::identity::PeerId,
) {
let mut state = bootstrap_state.write().await;
if state.capacity_rejected_sources.remove(source) {
let n = state.capacity_rejected_sources.len();
debug!(
"Bootstrap: cleared outstanding capacity rejections for {source} \
({n} sources still outstanding)"
);
}
}

/// Record a set of discovered keys into the bootstrap state for drain tracking.
#[allow(clippy::implicit_hasher)]
pub async fn track_discovered_keys(
Expand Down Expand Up @@ -193,6 +246,7 @@ mod tests {
drained: true,
pending_peer_requests: 5,
pending_keys: HashSet::new(),
capacity_rejected_sources: HashSet::new(),
}));
let queues = ReplicationQueues::new();

Expand All @@ -208,6 +262,7 @@ mod tests {
drained: false,
pending_peer_requests: 2,
pending_keys: HashSet::new(),
capacity_rejected_sources: HashSet::new(),
}));
let queues = ReplicationQueues::new();

Expand All @@ -223,6 +278,7 @@ mod tests {
drained: false,
pending_peer_requests: 0,
pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(),
capacity_rejected_sources: HashSet::new(),
}));
let queues = ReplicationQueues::new();

Expand All @@ -237,6 +293,7 @@ mod tests {
drained: false,
pending_peer_requests: 0,
pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(),
capacity_rejected_sources: HashSet::new(),
}));
let mut queues = ReplicationQueues::new();

Expand Down Expand Up @@ -299,4 +356,54 @@ mod tests {
"should saturate at zero"
);
}

/// Round-3 regression: a source that previously had capacity-rejected
/// hints must be retired from the "not yet drained" list when it
/// completes a later admission cycle with zero rejections, otherwise
/// `check_bootstrap_drained` is permanently wedged after a single
/// rejection.
#[tokio::test]
async fn capacity_rejected_clears_on_clean_cycle() {
let state = Arc::new(RwLock::new(BootstrapState::new()));
let queues = ReplicationQueues::new();
let source = saorsa_core::identity::PeerId::from_bytes([7u8; 32]);

// First cycle: this source overflowed, drain blocked.
note_capacity_rejected(&state, source).await;
assert!(
!check_bootstrap_drained(&state, &queues).await,
"drain must be blocked while a source has outstanding capacity rejections"
);

// Second cycle from the SAME source: zero rejections → clear it.
clear_capacity_rejected(&state, &source).await;
assert!(
check_bootstrap_drained(&state, &queues).await,
"drain must complete once the source's outstanding rejections are cleared"
);
}

/// Per-source granularity: one source's clean cycle must NOT clear a
/// different source's outstanding rejections.
#[tokio::test]
async fn capacity_rejected_is_per_source() {
let state = Arc::new(RwLock::new(BootstrapState::new()));
let queues = ReplicationQueues::new();
let source_a = saorsa_core::identity::PeerId::from_bytes([0xAA; 32]);
let source_b = saorsa_core::identity::PeerId::from_bytes([0xBB; 32]);

note_capacity_rejected(&state, source_a).await;
note_capacity_rejected(&state, source_b).await;
assert!(!check_bootstrap_drained(&state, &queues).await);

// Only A clears; B still owes us re-hints.
clear_capacity_rejected(&state, &source_a).await;
assert!(
!check_bootstrap_drained(&state, &queues).await,
"B's outstanding rejections must keep drain blocked"
);

clear_capacity_rejected(&state, &source_b).await;
assert!(check_bootstrap_drained(&state, &queues).await);
}
}
Loading
Loading