Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 10 additions & 37 deletions src/crates/core/src/agentic/deep_review/budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,32 +556,14 @@ impl DeepReviewBudgetTracker {
parent_dialog_turn_id: &str,
max_active_reviewers: usize,
launch_batch: u64,
packet_id: Option<&str>,
_packet_id: Option<&str>,
) -> Result<Option<DeepReviewActiveReviewerGuard<'a>>, DeepReviewPolicyViolation> {
let now = Instant::now();
let mut budget = self
.turns
.entry(parent_dialog_turn_id.to_string())
.or_insert_with(|| DeepReviewTurnBudget::new(now));

if let Some((&earliest_active_batch, _)) =
budget.active_reviewer_launch_batches.iter().next()
{
if earliest_active_batch < launch_batch {
let packet_label = packet_id
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("unknown");
return Err(DeepReviewPolicyViolation::new(
"deep_review_launch_batch_blocked",
format!(
"Reviewer packet '{}' is in launch_batch {}, but launch_batch {} still has active reviewer(s). Wait for earlier-batch reviewers to finish, timeout, or be cancelled before launching this packet. If the queue remains blocked, pause or cancel the queued reviewers from the Review Team action bar and retry with a lower max parallel reviewer setting.",
packet_label, launch_batch, earliest_active_batch
),
));
}
}

if budget.active_reviewers >= max_active_reviewers {
return Ok(None);
}
Expand Down Expand Up @@ -822,31 +804,22 @@ mod tests {
use super::*;

#[test]
fn launch_batch_admission_blocks_later_batch_while_earlier_batch_is_active() {
fn launch_batch_admission_allows_later_batch_when_reviewer_capacity_is_free() {
let tracker = DeepReviewBudgetTracker::default();
let turn_id = "turn-launch-batch-blocked";
let turn_id = "turn-launch-batch-fill-free-slot";
let _first_batch = tracker
.try_begin_active_reviewer_for_launch_batch(turn_id, 2, 1, Some("packet-a"))
.expect("batch admission should not fail")
.expect("first reviewer should start");

let violation = match tracker.try_begin_active_reviewer_for_launch_batch(
turn_id,
2,
2,
Some("packet-b"),
) {
Err(violation) => violation,
Ok(_) => panic!("later launch batch should wait while earlier batch is active"),
};
let second_batch = tracker
.try_begin_active_reviewer_for_launch_batch(turn_id, 2, 2, Some("packet-b"))
.expect("later batch admission should not fail when reviewer capacity is free");

assert_eq!(violation.code, "deep_review_launch_batch_blocked");
assert!(violation.message.contains("packet-b"));
assert!(violation.message.contains("launch_batch 2"));
assert!(violation.message.contains("launch_batch 1"));
assert!(violation
.message
.contains("Wait for earlier-batch reviewers"));
assert!(
second_batch.is_some(),
"later batch should fill a freed reviewer slot instead of waiting for the earlier batch to drain"
);
}

#[test]
Expand Down
80 changes: 75 additions & 5 deletions src/crates/core/src/agentic/deep_review/task_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ use tokio::time::sleep;
const DEEP_REVIEW_QUEUE_POLL_INTERVAL: Duration = Duration::from_millis(10);
#[cfg(not(test))]
const DEEP_REVIEW_QUEUE_POLL_INTERVAL: Duration = Duration::from_secs(1);
pub(crate) const DEEP_REVIEW_PROVIDER_CAPACITY_MAX_RETRY_ATTEMPTS: usize = 3;
const DEEP_REVIEW_PROVIDER_CAPACITY_BACKOFF_MULTIPLIER: u64 = 3;
const DEEP_REVIEW_PROVIDER_CAPACITY_MAX_BACKOFF_SECONDS: u64 = 600;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DeepReviewQueueWaitSkipReason {
Expand All @@ -59,6 +62,7 @@ pub(crate) enum DeepReviewQueueWaitOutcome {
pub(crate) enum DeepReviewProviderQueueWaitOutcome {
ReadyToRetry {
queue_elapsed_ms: u64,
early_capacity_probe: bool,
},
Skipped {
queue_elapsed_ms: u64,
Expand Down Expand Up @@ -514,6 +518,39 @@ pub(crate) fn provider_capacity_queue_wait_seconds(
.filter(|seconds| *seconds > 0)
}

pub(crate) fn provider_capacity_queue_wait_seconds_for_attempt(
decision: &DeepReviewCapacityQueueDecision,
conc_policy: &DeepReviewConcurrencyPolicy,
retry_attempt_index: usize,
) -> Option<u64> {
let base_wait_seconds = provider_capacity_queue_wait_seconds(decision, conc_policy)?;
if decision.retry_after_seconds.is_some() {
return Some(base_wait_seconds);
}

let multiplier = DEEP_REVIEW_PROVIDER_CAPACITY_BACKOFF_MULTIPLIER.saturating_pow(
u32::try_from(retry_attempt_index)
.unwrap_or(u32::MAX)
.min(8),
);
Some(
base_wait_seconds
.saturating_mul(multiplier)
.min(DEEP_REVIEW_PROVIDER_CAPACITY_MAX_BACKOFF_SECONDS),
)
.filter(|seconds| *seconds > 0)
}

fn provider_capacity_wait_can_wake_on_active_reviewer_release(
reason: DeepReviewCapacityQueueReason,
) -> bool {
matches!(
reason,
DeepReviewCapacityQueueReason::ProviderConcurrencyLimit
| DeepReviewCapacityQueueReason::TemporaryOverload
)
}

pub(crate) fn capacity_skip_result_for_provider_reason(
reason: DeepReviewCapacityQueueReason,
dialog_turn_id: &str,
Expand Down Expand Up @@ -707,6 +744,9 @@ pub(crate) async fn wait_for_provider_capacity_retry(
let mut queue_timer = QueueWaitTimer::start(Instant::now());
let max_wait = Duration::from_secs(max_wait_seconds);
let optional_reviewer_count = is_optional_reviewer.then_some(1);
let initial_active_reviewers = deep_review_active_reviewer_count(dialog_turn_id);
let can_wake_on_active_reviewer_release =
provider_capacity_wait_can_wake_on_active_reviewer_release(reason);

record_deep_review_runtime_provider_capacity_queue(dialog_turn_id, reason);

Expand Down Expand Up @@ -737,7 +777,7 @@ pub(crate) async fn wait_for_provider_capacity_retry(
optional_reviewer_count,
Some(effective_parallel_instances),
queue_elapsed_ms,
conc_policy.max_queue_wait_seconds,
max_wait_seconds,
)
.await;
return DeepReviewProviderQueueWaitOutcome::Skipped {
Expand All @@ -764,7 +804,7 @@ pub(crate) async fn wait_for_provider_capacity_retry(
optional_reviewer_count,
Some(effective_parallel_instances),
queue_elapsed_ms,
conc_policy.max_queue_wait_seconds,
max_wait_seconds,
)
.await;
sleep(DEEP_REVIEW_QUEUE_POLL_INTERVAL).await;
Expand All @@ -788,10 +828,40 @@ pub(crate) async fn wait_for_provider_capacity_retry(
optional_reviewer_count,
Some(effective_parallel_instances),
queue_elapsed_ms,
conc_policy.max_queue_wait_seconds,
max_wait_seconds,
)
.await;
return DeepReviewProviderQueueWaitOutcome::ReadyToRetry { queue_elapsed_ms };
return DeepReviewProviderQueueWaitOutcome::ReadyToRetry {
queue_elapsed_ms,
early_capacity_probe: false,
};
}

if can_wake_on_active_reviewer_release
&& initial_active_reviewers > 0
&& active_reviewers < initial_active_reviewers
{
record_deep_review_runtime_queue_wait(dialog_turn_id, queue_elapsed_ms);
clear_deep_review_queue_control_for_tool(dialog_turn_id, tool_id);
emit_queue_state(
session_id,
dialog_turn_id,
tool_id,
subagent_type,
DeepReviewQueueStatus::Running,
Some(reason),
0,
active_reviewers,
optional_reviewer_count,
Some(effective_parallel_instances),
queue_elapsed_ms,
max_wait_seconds,
)
.await;
return DeepReviewProviderQueueWaitOutcome::ReadyToRetry {
queue_elapsed_ms,
early_capacity_probe: true,
};
}

emit_queue_state(
Expand All @@ -806,7 +876,7 @@ pub(crate) async fn wait_for_provider_capacity_retry(
optional_reviewer_count,
Some(effective_parallel_instances),
queue_elapsed_ms,
conc_policy.max_queue_wait_seconds,
max_wait_seconds,
)
.await;

Expand Down
Loading
Loading