From eaca2d4b1ceec11ac76190b342f71a85e2ae352f Mon Sep 17 00:00:00 2001 From: desty Date: Tue, 21 Apr 2026 08:22:52 +0200 Subject: [PATCH 1/2] refactor(dispatch): split provider_loop 808 LoC into resolver + retry + orchestration (T-SP-3 #9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit provider_loop.rs was hosting three distinct concerns in a single file: 1. Provider selection (registry + CB + HC + RE-1a endpoint health) 2. Retry + error classification + single-provider dispatch 3. Outer fallback loop orchestration Split into: - src/server/dispatch/resolver.rs (~128 LoC) — resolve_provider, try_direct_provider_lookup (backward-compat path). - src/server/dispatch/retry.rs (~482 LoC) — ProviderLoopAction, ProviderAttempt, classify_and_handle_error, emit_provider_error_metrics, handle_provider_error, try_rotate_and_retry, dispatch_streaming, dispatch_non_streaming, wrap_stream_with_middleware, build_encrypted_content. - src/server/dispatch/provider_loop.rs (~275 LoC down from 808) — dispatch_provider_loop (outer loop), log_dispatch_attempt, prepare_provider_request, emit_fallback. No behavior change: public entry point src/server/dispatch/mod.rs:349 (provider_loop::dispatch_provider_loop) keeps the same signature and callers are unaffected. Validation: - cargo build --all-targets green - cargo test --lib : 949 passed, 0 failed - cargo clippy --all-targets -- -D warnings clean - RUSTFLAGS="-D warnings" cargo check --no-default-features clean Addresses Phoenix audit 2026-04-20 item #9. --- src/server/dispatch/mod.rs | 2 + src/server/dispatch/provider_loop.rs | 603 ++------------------------- src/server/dispatch/resolver.rs | 128 ++++++ src/server/dispatch/retry.rs | 482 +++++++++++++++++++++ 4 files changed, 647 insertions(+), 568 deletions(-) create mode 100644 src/server/dispatch/resolver.rs create mode 100644 src/server/dispatch/retry.rs diff --git a/src/server/dispatch/mod.rs b/src/server/dispatch/mod.rs index ff0a3e9e..1efc529c 100644 --- a/src/server/dispatch/mod.rs +++ b/src/server/dispatch/mod.rs @@ -5,6 +5,8 @@ //! DLP scanning → cache lookup → routing → provider loop with fallback → audit → response. mod provider_loop; +mod resolver; +mod retry; mod telemetry; use crate::cli::ModelStrategy; diff --git a/src/server/dispatch/provider_loop.rs b/src/server/dispatch/provider_loop.rs index c9f28ac9..2233bf5d 100644 --- a/src/server/dispatch/provider_loop.rs +++ b/src/server/dispatch/provider_loop.rs @@ -1,88 +1,36 @@ -//! Provider fallback loop: resolve, dispatch, retry, and error handling. - -use super::{telemetry, AuditEntry, DispatchContext, DispatchResult}; -use crate::models::RouteType; -use bytes::Bytes; -use futures::stream::Stream; -use std::pin::Pin; -use std::sync::Arc; -use tracing::{info, warn}; - -use super::super::{ - check_budget, format_route_type, inject_continuation_text, is_auth_revoked_error, - is_provider_subscription, is_retryable, retry_delay, AppError, MAX_RETRIES, +//! Provider fallback loop: orchestrates the priority-list walk. +//! +//! The hot path of the dispatch pipeline. For each mapping in the sorted +//! priority list this function: +//! +//! 1. Asks [`resolver::resolve_provider`] whether the mapping is usable +//! (registry + CB + RE-1a passive CB). +//! 2. Checks the tenant / provider / model budget. +//! 3. Emits the `RequestStart` event for `grob watch`. +//! 4. Prepares the provider-specific request (model substitution, DLP, +//! continuation injection). +//! 5. Dispatches a single attempt through [`retry::dispatch_streaming`] or +//! [`retry::dispatch_non_streaming`] depending on the request mode. +//! 6. On `RateLimited` tries pool rotation via [`retry::try_rotate_and_retry`]. +//! 7. On `AuthRevoked` aborts the cascade (user-actionable 401). +//! 8. Otherwise moves on to the next mapping. +//! +//! After the loop exhausts the list, [`resolver::try_direct_provider_lookup`] +//! offers a backward-compat path for unmapped models. A final audit entry +//! is written before returning `AppError::ProviderError`. + +use super::{AuditEntry, DispatchContext, DispatchResult}; +use super::resolver::{resolve_provider, try_direct_provider_lookup}; +use super::retry::{ + dispatch_non_streaming, dispatch_streaming, try_rotate_and_retry, + ProviderAttempt, ProviderLoopAction, }; -use super::telemetry::{ - calculate_and_record_metrics, record_success_telemetry, store_response_cache, +use super::super::{ + check_budget, format_route_type, inject_continuation_text, is_provider_subscription, + should_inject_continuation, AppError, }; - -/// Check if a provider is available: exists in registry and circuit breaker is not open. -/// Returns `None` if the provider should be skipped. -pub(super) async fn resolve_provider( - ctx: &DispatchContext<'_>, - mapping: &crate::cli::ModelMapping, -) -> Option> { - let provider = ctx - .inner - .provider_registry - .provider(&mapping.provider) - .or_else(|| { - info!( - "Provider {} not found in registry, trying next fallback", - mapping.provider - ); - None - })?; - - // Check availability: scorer (which wraps CB) takes priority over bare CB - if let Some(ref scorer) = ctx.state.security.provider_scorer { - if !scorer.can_execute(&mapping.provider).await { - info!( - "Provider {} unavailable (scorer/CB), skipping", - mapping.provider - ); - metrics::counter!( - "grob_circuit_breaker_rejected_total", - "provider" => mapping.provider.clone() - ) - .increment(1); - return None; - } - } else if let Some(ref cb) = ctx.state.security.circuit_breakers { - if !cb.can_execute(&mapping.provider).await { - info!("Circuit breaker open for {}, skipping", mapping.provider); - metrics::counter!( - "grob_circuit_breaker_rejected_total", - "provider" => mapping.provider.clone() - ) - .increment(1); - return None; - } - } - - // Routing-layer passive CB (RE-1a, ADR-0018). Per-endpoint, orthogonal - // to the global per-provider security CB above — an endpoint can be - // down while the provider still has other endpoints up. - if !ctx - .inner - .provider_registry - .is_endpoint_healthy(&mapping.provider, &mapping.actual_model) - { - info!( - "Endpoint {}/{} tripped by passive CB, skipping", - mapping.provider, mapping.actual_model - ); - metrics::counter!( - "grob_routing_endpoint_cb_rejected_total", - "provider" => mapping.provider.clone(), - "model" => mapping.actual_model.clone(), - ) - .increment(1); - return None; - } - - Some(provider) -} +use crate::models::RouteType; +use tracing::info; /// Provider loop with fallback and per-provider retry. pub(super) async fn dispatch_provider_loop( @@ -277,7 +225,10 @@ fn log_dispatch_attempt( } /// Clone the request, substitute the actual model, run DLP, and optionally inject continuation. -fn prepare_provider_request( +/// +/// Visible to `retry.rs` because pool-rotation retries re-run the pre-flight +/// transformations (DLP, continuation) before the second attempt. +pub(super) fn prepare_provider_request( ctx: &DispatchContext<'_>, request: &crate::models::CanonicalRequest, mapping: &crate::cli::ModelMapping, @@ -303,33 +254,6 @@ fn prepare_provider_request( (provider_request, original_model) } -/// Attempts key pool rotation and retry on rate limit. -/// -/// Returns `Some(result)` if the retry succeeded, `None` if rotation was -/// unavailable or the retry also failed. -async fn try_rotate_and_retry( - ctx: &DispatchContext<'_>, - request: &mut crate::models::CanonicalRequest, - provider: &dyn crate::providers::LlmProvider, - attempt: &ProviderAttempt<'_>, -) -> Option { - if !provider.rotate_key_pool() { - return None; - } - info!( - "Provider {} rate-limited, rotated to next pooled key — retrying", - attempt.mapping.provider - ); - let (retry_request, _) = - prepare_provider_request(ctx, request, attempt.mapping, &attempt.decision.route_type); - let retry_result = if ctx.is_streaming { - dispatch_streaming(ctx, retry_request, provider, attempt.mapping).await - } else { - dispatch_non_streaming(ctx, retry_request, provider, attempt).await - }; - retry_result.ok() -} - /// Emits a fallback event for `grob watch`. fn emit_fallback( ctx: &DispatchContext<'_>, @@ -349,460 +273,3 @@ fn emit_fallback( }); } } - -/// Backward-compat fallback: try direct model->provider lookup from the registry. -async fn try_direct_provider_lookup( - ctx: &DispatchContext<'_>, - request: &crate::models::CanonicalRequest, - model_name: &str, -) -> Result, AppError> { - let Ok(provider) = ctx.inner.provider_registry.provider_for_model(model_name) else { - return Ok(None); - }; - info!( - "Using provider from registry (direct lookup): {}", - model_name - ); - let mut fallback_request = request.clone(); - let original_model = fallback_request.model.clone(); - fallback_request.model = model_name.to_string(); - - let mut response = provider - .send_message(fallback_request) - .await - .map_err(|e| AppError::ProviderError(e.to_string()))?; - response.model = original_model; - - Ok(Some(DispatchResult::Complete { - response, - provider: model_name.to_string(), - actual_model: model_name.to_string(), - provider_duration_ms: 0, - })) -} - -/// Wrap a raw provider stream with DLP sanitization and Tap recording layers. -fn wrap_stream_with_middleware( - ctx: &DispatchContext<'_>, - raw_stream: Pin< - Box> + Send>, - >, - tap_request_body: Option, -) -> Pin> + Send>> { - let stream: Pin< - Box> + Send>, - > = if let Some(ref dlp_engine) = ctx.dlp { - if dlp_engine.config.scan_output { - Box::pin(crate::features::dlp::stream::DlpStream::new( - raw_stream, - Arc::clone(dlp_engine), - )) - } else { - raw_stream - } - } else { - raw_stream - }; - - // HIT stream: intercept tool_use blocks for human authorization. - #[cfg(feature = "policies")] - let stream = { - let hit_policy = ctx.resolved_policy.as_ref().and_then(|p| p.hit.clone()); - if let Some(hit) = hit_policy { - Box::pin(crate::features::policies::stream::HitStream::new( - stream, - hit, - ctx.req_id.to_string(), - Some(Arc::clone(&ctx.state.hit_pending)), - Some(ctx.state.event_bus.clone()), - ctx.state.security.audit_log.clone(), - )) - as Pin< - Box< - dyn Stream> - + Send, - >, - > - } else { - stream - } - }; - - if let Some(ref tap) = ctx.state.security.tap_sender { - let tap_req_id = uuid::Uuid::new_v4().to_string(); - if let Some(body_json) = tap_request_body { - tap.try_send(crate::features::tap::TapEvent::Request { - request_id: tap_req_id.clone(), - tenant_id: ctx.tenant_id.clone(), - model: ctx.model.clone(), - body: body_json, - }); - } - Box::pin(crate::features::tap::stream::TapStream::new( - stream, - Arc::clone(tap), - tap_req_id, - )) - } else { - stream - } -} - -/// Internal signal to continue the provider loop. -enum ProviderLoopAction { - Continue, - /// Rate-limited (429): the caller should attempt key pool rotation - /// before falling through to the next provider mapping. - RateLimited, - /// Terminal auth failure (401 `authentication_error`): do NOT fall back - /// to sibling providers — surface the error directly to the client so the - /// user is prompted to run `grob connect --force-reauth`. - AuthRevoked(String), -} - -/// Handle the streaming path for a single provider attempt. -async fn dispatch_streaming( - ctx: &DispatchContext<'_>, - provider_request: crate::models::CanonicalRequest, - provider: &dyn crate::providers::LlmProvider, - mapping: &crate::cli::ModelMapping, -) -> Result { - // Capture request body for tap before ownership moves - let tap_request_body = if ctx.state.security.tap_sender.is_some() { - serde_json::to_string(&provider_request).ok() - } else { - None - }; - - match provider.send_message_stream(provider_request).await { - Ok(stream_response) => { - // Overhead = time from request receipt to first SSE byte (before provider responded). - let overhead_ms = ctx.start_time.elapsed().as_millis() as u64; - let latency_ms = overhead_ms; - ctx.record_provider_success(&mapping.provider, latency_ms) - .await; - ctx.record_endpoint_success(&mapping.provider, &mapping.actual_model); - - let stream = wrap_stream_with_middleware(ctx, stream_response.stream, tap_request_body); - - let upstream_headers: Vec<(String, String)> = - stream_response.headers.into_iter().collect(); - - Ok(DispatchResult::Streaming { - stream, - provider: mapping.provider.clone(), - actual_model: mapping.actual_model.clone(), - upstream_headers, - overhead_ms, - }) - } - Err(e) => { - ctx.record_provider_failure(&mapping.provider).await; - ctx.record_endpoint_failure(&mapping.provider, &mapping.actual_model); - if let Some(ref trace_id) = ctx.trace_id { - ctx.state - .observability - .message_tracer - .trace_error(trace_id, &e.to_string()); - } - handle_provider_error(mapping, &e); - if is_auth_revoked_error(&e) { - return Err(ProviderLoopAction::AuthRevoked(e.to_string())); - } - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit { - Err(ProviderLoopAction::RateLimited) - } else { - Err(ProviderLoopAction::Continue) - } - } - } -} - -/// Emit shared provider-error metrics (rate-limit counter + error counter). -fn emit_provider_error_metrics( - mapping: &crate::cli::ModelMapping, - e: &crate::providers::error::ProviderError, -) { - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit { - warn!("Provider {} rate limited", mapping.provider); - metrics::counter!( - "grob_ratelimit_hits_total", - "provider" => mapping.provider.clone() - ) - .increment(1); - } - metrics::counter!( - "grob_provider_errors_total", - "provider" => mapping.provider.clone() - ) - .increment(1); -} - -/// Classify a provider error, emit metrics, and decide whether to retry or break. -/// Returns `true` if the retry loop should continue (retryable + attempts remaining). -fn classify_and_handle_error( - ctx: &DispatchContext<'_>, - mapping: &crate::cli::ModelMapping, - e: &crate::providers::error::ProviderError, - attempt: u32, -) -> bool { - if let Some(ref trace_id) = ctx.trace_id { - ctx.state - .observability - .message_tracer - .trace_error(trace_id, &e.to_string()); - } - emit_provider_error_metrics(mapping, e); - is_retryable(e) && attempt < MAX_RETRIES -} - -/// Per-provider dispatch parameters (non-streaming path). -struct ProviderAttempt<'a> { - mapping: &'a crate::cli::ModelMapping, - decision: &'a crate::models::RouteDecision, - cache_key: &'a Option, - original_model: &'a str, - is_subscription: bool, -} - -/// Handle the non-streaming path with retry for a single provider. -async fn dispatch_non_streaming( - ctx: &DispatchContext<'_>, - provider_request: crate::models::CanonicalRequest, - provider: &dyn crate::providers::LlmProvider, - attempt: &ProviderAttempt<'_>, -) -> Result { - // Wrap in Option so we can move (not clone) on the final attempt. - let mut owned_request = Some(provider_request); - for retry in 0..=MAX_RETRIES { - if retry > 0 { - let delay = retry_delay(retry - 1); - warn!( - "Retrying provider {} (attempt {}/{}), backoff {}ms", - attempt.mapping.provider, - retry + 1, - MAX_RETRIES + 1, - delay.as_millis() - ); - tokio::time::sleep(delay).await; - } - - // Clone for earlier attempts; move on the last to avoid an extra allocation. - let req = if retry < MAX_RETRIES { - owned_request.as_ref().expect("set before loop").clone() - } else { - owned_request.take().expect("set before loop") - }; - - let provider_start = std::time::Instant::now(); - match provider.send_message(req).await { - Ok(mut response) => { - let provider_duration_ms = provider_start.elapsed().as_millis() as u64; - let latency_ms = ctx.start_time.elapsed().as_millis() as u64; - ctx.record_provider_success(&attempt.mapping.provider, latency_ms) - .await; - ctx.record_endpoint_success( - &attempt.mapping.provider, - &attempt.mapping.actual_model, - ); - ctx.sanitize_output(&mut response); - response.model = attempt.original_model.to_string(); - - let latency_ms = ctx.start_time.elapsed().as_millis() as u64; - let outcome = telemetry::DispatchOutcome { - mapping: attempt.mapping, - decision: attempt.decision, - response: &response, - latency_ms, - }; - let cost_usd = - calculate_and_record_metrics(ctx, &outcome, attempt.is_subscription).await; - record_success_telemetry(ctx, &outcome, cost_usd).await; - let cached_bytes = - store_response_cache(ctx, attempt.mapping, attempt.cache_key, &response).await; - - // Emit RequestEnd event for `grob watch`. - ctx.state - .event_bus - .emit(crate::features::watch::events::WatchEvent::RequestEnd { - request_id: ctx.req_id.to_string(), - model: attempt.mapping.actual_model.clone(), - provider: attempt.mapping.provider.clone(), - output_tokens: response.usage.output_tokens, - latency_ms, - cost_usd, - timestamp: chrono::Utc::now(), - }); - - // Emit to external log sinks. - if let Some(ref exporter) = ctx.state.log_exporter { - let (encrypted_content, content_recipients) = - build_encrypted_content(ctx, &cached_bytes); - - exporter.emit(&crate::features::log_export::LogEntry { - request_id: ctx.req_id.to_string(), - timestamp: chrono::Utc::now().to_rfc3339(), - model: attempt.mapping.actual_model.clone(), - provider: attempt.mapping.provider.clone(), - input_tokens: response.usage.input_tokens, - output_tokens: response.usage.output_tokens, - latency_ms, - cost_usd, - status: "success".to_string(), - dlp_actions: vec![], - tenant_id: ctx.tenant_id.clone(), - encrypted_content, - content_recipients, - }); - } - - return Ok(DispatchResult::Complete { - response, - provider: attempt.mapping.provider.clone(), - actual_model: attempt.mapping.actual_model.clone(), - provider_duration_ms, - }); - } - Err(e) => { - // 401 authentication_error is terminal — abort the cascade. - if is_auth_revoked_error(&e) { - ctx.record_provider_failure(&attempt.mapping.provider).await; - return Err(ProviderLoopAction::AuthRevoked(e.to_string())); - } - - if classify_and_handle_error(ctx, attempt.mapping, &e, retry) { - // On 429, try rotating to next pooled key before retrying. - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit && provider.rotate_key_pool() { - info!( - "Provider {} rate-limited, rotated to next pooled key", - attempt.mapping.provider - ); - } - warn!( - "Provider {} failed (retryable): {}", - attempt.mapping.provider, e - ); - continue; - } - - // Before giving up on this provider, try key rotation for 429. - let is_rate_limit = matches!( - e, - crate::providers::error::ProviderError::ApiError { status: 429, .. } - ); - if is_rate_limit && provider.rotate_key_pool() { - info!( - "Provider {} exhausted retries but rotated to next pooled key", - attempt.mapping.provider - ); - // Reset owned_request for another attempt cycle. - continue; - } - - ctx.record_provider_failure(&attempt.mapping.provider).await; - ctx.record_endpoint_failure( - &attempt.mapping.provider, - &attempt.mapping.actual_model, - ); - info!( - "Provider {} failed: {}, trying next fallback", - attempt.mapping.provider, e - ); - break; - } - } - } - Err(ProviderLoopAction::Continue) -} - -/// Log provider error metrics for the streaming path. -fn handle_provider_error( - mapping: &crate::cli::ModelMapping, - e: &crate::providers::error::ProviderError, -) { - emit_provider_error_metrics(mapping, e); - info!( - "Provider {} streaming failed: {}, trying next fallback", - mapping.provider, e - ); -} - -use super::super::should_inject_continuation; - -/// Builds encrypted content for log export when policy requires it. -/// -/// Returns `(None, None)` when encryption is not configured or no policy matches. -#[allow(unused_variables)] -fn build_encrypted_content( - ctx: &DispatchContext<'_>, - response_bytes: &Option>, -) -> (Option, Option>) { - #[cfg(feature = "policies")] - { - use crate::features::log_export::ContentMode; - - // Check if log export is configured for encryption. - if ctx.state.log_exporter.is_none() { - return (None, None); - } - let log_config = &ctx.inner.config.log_export; - if log_config.content != ContentMode::Encrypted { - return (None, None); - } - - // Resolve recipients from access policies. - let access_ctx = crate::features::log_export::access_policy::AccessContext { - tenant: ctx.tenant_id.clone(), - compliance: vec![], - dlp_triggered: false, - }; - let recipient_keys = crate::features::log_export::access_policy::resolve_recipients( - &log_config.access_policies, - &log_config.auditors, - &access_ctx, - ); - if recipient_keys.is_empty() { - return (None, None); - } - - // Build content string from response. - let content = response_bytes - .as_ref() - .map(|b| String::from_utf8_lossy(b).to_string()) - .unwrap_or_default(); - - // Encrypt. - let recipient_names: Vec = log_config - .access_policies - .iter() - .flat_map(|p| p.recipients.clone()) - .collect(); - - match crate::features::log_export::encryption::encrypt_for_recipients( - &content, - &recipient_keys, - ) { - Ok(encrypted) => (Some(encrypted), Some(recipient_names)), - Err(e) => { - tracing::warn!("Failed to encrypt log content: {}", e); - (None, None) - } - } - } - #[cfg(not(feature = "policies"))] - { - (None, None) - } -} diff --git a/src/server/dispatch/resolver.rs b/src/server/dispatch/resolver.rs new file mode 100644 index 00000000..0899b4cb --- /dev/null +++ b/src/server/dispatch/resolver.rs @@ -0,0 +1,128 @@ +//! Provider selection: pick the next usable mapping from the priority list. +//! +//! Extracted from `provider_loop.rs` to keep the fallback orchestration focused +//! on the loop shape. The two concerns live next to each other but read +//! independently: [`resolve_provider`] answers "is this provider available?" +//! and [`try_direct_provider_lookup`] is the backward-compat fallback when +//! no tier/mapping matched. + +use std::sync::Arc; + +use super::{DispatchContext, DispatchResult}; +use super::super::AppError; +use tracing::info; + +/// Returns the provider for a mapping, or `None` if it must be skipped. +/// +/// Checks, in order: +/// +/// 1. Provider exists in the registry. +/// 2. `SecurityConfig::provider_scorer` (wraps `CircuitBreakerRegistry`) — +/// if a scorer is enabled, it owns availability decisions. +/// 3. Bare `CircuitBreakerRegistry` — used when no scorer is configured. +/// 4. Routing-layer passive CB (RE-1a, ADR-0018) — per-endpoint, orthogonal +/// to the global per-provider CB above. +/// +/// Each rejection path emits a `grob_circuit_breaker_rejected_total` (or +/// `grob_routing_endpoint_cb_rejected_total`) counter for observability. +pub(super) async fn resolve_provider( + ctx: &DispatchContext<'_>, + mapping: &crate::cli::ModelMapping, +) -> Option> { + let provider = ctx + .inner + .provider_registry + .provider(&mapping.provider) + .or_else(|| { + info!( + "Provider {} not found in registry, trying next fallback", + mapping.provider + ); + None + })?; + + // Check availability: scorer (which wraps CB) takes priority over bare CB + if let Some(ref scorer) = ctx.state.security.provider_scorer { + if !scorer.can_execute(&mapping.provider).await { + info!( + "Provider {} unavailable (scorer/CB), skipping", + mapping.provider + ); + metrics::counter!( + "grob_circuit_breaker_rejected_total", + "provider" => mapping.provider.clone() + ) + .increment(1); + return None; + } + } else if let Some(ref cb) = ctx.state.security.circuit_breakers { + if !cb.can_execute(&mapping.provider).await { + info!("Circuit breaker open for {}, skipping", mapping.provider); + metrics::counter!( + "grob_circuit_breaker_rejected_total", + "provider" => mapping.provider.clone() + ) + .increment(1); + return None; + } + } + + // Routing-layer passive CB (RE-1a, ADR-0018). Per-endpoint, orthogonal + // to the global per-provider security CB above — an endpoint can be + // down while the provider still has other endpoints up. + if !ctx + .inner + .provider_registry + .is_endpoint_healthy(&mapping.provider, &mapping.actual_model) + { + info!( + "Endpoint {}/{} tripped by passive CB, skipping", + mapping.provider, mapping.actual_model + ); + metrics::counter!( + "grob_routing_endpoint_cb_rejected_total", + "provider" => mapping.provider.clone(), + "model" => mapping.actual_model.clone(), + ) + .increment(1); + return None; + } + + Some(provider) +} + +/// Backward-compat fallback: try direct model -> provider lookup from the registry. +/// +/// Invoked by the provider loop after every mapping in the priority list has +/// been exhausted. Lets a request for `claude-opus-4-7` succeed even when no +/// `[[models.mappings]]` targets it explicitly, as long as the registry can +/// resolve the bare model name. +pub(super) async fn try_direct_provider_lookup( + ctx: &DispatchContext<'_>, + request: &crate::models::CanonicalRequest, + model_name: &str, +) -> Result, AppError> { + let Ok(provider) = ctx.inner.provider_registry.provider_for_model(model_name) else { + return Ok(None); + }; + info!( + "Using provider from registry (direct lookup): {}", + model_name + ); + let mut fallback_request = request.clone(); + let original_model = fallback_request.model.clone(); + fallback_request.model = model_name.to_string(); + + let mut response = provider + .send_message(fallback_request) + .await + .map_err(|e| AppError::ProviderError(e.to_string()))?; + response.model = original_model; + + Ok(Some(DispatchResult::Complete { + response, + provider: model_name.to_string(), + actual_model: model_name.to_string(), + provider_duration_ms: 0, + })) +} diff --git a/src/server/dispatch/retry.rs b/src/server/dispatch/retry.rs new file mode 100644 index 00000000..39999b18 --- /dev/null +++ b/src/server/dispatch/retry.rs @@ -0,0 +1,482 @@ +//! Single-provider dispatch with retry + error classification. +//! +//! Owns the inner loop of the provider fallback cascade: sending the request, +//! classifying failures, deciding whether to retry with backoff, rotating the +//! key pool on rate-limit, and wiring the output stream through the DLP, HIT, +//! and Tap middleware layers. +//! +//! The caller ([`super::provider_loop::dispatch_provider_loop`]) decides which +//! mapping to try next; this module decides what happens *within* a single +//! mapping attempt. + +use bytes::Bytes; +use futures::stream::Stream; +use std::pin::Pin; +use std::sync::Arc; +use tracing::{info, warn}; + +use super::{telemetry, DispatchContext, DispatchResult}; +use super::super::{ + is_auth_revoked_error, is_retryable, retry_delay, MAX_RETRIES, +}; +use super::telemetry::{ + calculate_and_record_metrics, record_success_telemetry, store_response_cache, +}; + +/// Internal signal from the per-attempt dispatch back to the outer provider loop. +pub(super) enum ProviderLoopAction { + /// Non-terminal failure: move to the next mapping in the priority list. + Continue, + /// Rate-limited (429): the caller should attempt key pool rotation + /// before falling through to the next provider mapping. + RateLimited, + /// Terminal auth failure (401 `authentication_error`): do NOT fall back + /// to sibling providers — surface the error directly to the client so the + /// user is prompted to run `grob connect --force-reauth`. + AuthRevoked(String), +} + +/// Per-provider dispatch parameters (non-streaming path). +pub(super) struct ProviderAttempt<'a> { + pub mapping: &'a crate::cli::ModelMapping, + pub decision: &'a crate::models::RouteDecision, + pub cache_key: &'a Option, + pub original_model: &'a str, + pub is_subscription: bool, +} + +/// Emit shared provider-error metrics (rate-limit counter + error counter). +fn emit_provider_error_metrics( + mapping: &crate::cli::ModelMapping, + e: &crate::providers::error::ProviderError, +) { + let is_rate_limit = matches!( + e, + crate::providers::error::ProviderError::ApiError { status: 429, .. } + ); + if is_rate_limit { + warn!("Provider {} rate limited", mapping.provider); + metrics::counter!( + "grob_ratelimit_hits_total", + "provider" => mapping.provider.clone() + ) + .increment(1); + } + metrics::counter!( + "grob_provider_errors_total", + "provider" => mapping.provider.clone() + ) + .increment(1); +} + +/// Classify a provider error, emit metrics, and decide whether to retry or break. +/// Returns `true` if the retry loop should continue (retryable + attempts remaining). +fn classify_and_handle_error( + ctx: &DispatchContext<'_>, + mapping: &crate::cli::ModelMapping, + e: &crate::providers::error::ProviderError, + attempt: u32, +) -> bool { + if let Some(ref trace_id) = ctx.trace_id { + ctx.state + .observability + .message_tracer + .trace_error(trace_id, &e.to_string()); + } + emit_provider_error_metrics(mapping, e); + is_retryable(e) && attempt < MAX_RETRIES +} + +/// Log provider error metrics for the streaming path. +fn handle_provider_error( + mapping: &crate::cli::ModelMapping, + e: &crate::providers::error::ProviderError, +) { + emit_provider_error_metrics(mapping, e); + info!( + "Provider {} streaming failed: {}, trying next fallback", + mapping.provider, e + ); +} + +/// Attempts key pool rotation and retry on rate limit. +/// +/// Returns `Some(result)` if the retry succeeded, `None` if rotation was +/// unavailable or the retry also failed. +pub(super) async fn try_rotate_and_retry( + ctx: &DispatchContext<'_>, + request: &mut crate::models::CanonicalRequest, + provider: &dyn crate::providers::LlmProvider, + attempt: &ProviderAttempt<'_>, +) -> Option { + if !provider.rotate_key_pool() { + return None; + } + info!( + "Provider {} rate-limited, rotated to next pooled key — retrying", + attempt.mapping.provider + ); + let (retry_request, _) = super::provider_loop::prepare_provider_request( + ctx, + request, + attempt.mapping, + &attempt.decision.route_type, + ); + let retry_result = if ctx.is_streaming { + dispatch_streaming(ctx, retry_request, provider, attempt.mapping).await + } else { + dispatch_non_streaming(ctx, retry_request, provider, attempt).await + }; + retry_result.ok() +} + +/// Handle the streaming path for a single provider attempt. +pub(super) async fn dispatch_streaming( + ctx: &DispatchContext<'_>, + provider_request: crate::models::CanonicalRequest, + provider: &dyn crate::providers::LlmProvider, + mapping: &crate::cli::ModelMapping, +) -> Result { + // Capture request body for tap before ownership moves + let tap_request_body = if ctx.state.security.tap_sender.is_some() { + serde_json::to_string(&provider_request).ok() + } else { + None + }; + + match provider.send_message_stream(provider_request).await { + Ok(stream_response) => { + // Overhead = time from request receipt to first SSE byte (before provider responded). + let overhead_ms = ctx.start_time.elapsed().as_millis() as u64; + let latency_ms = overhead_ms; + ctx.record_provider_success(&mapping.provider, latency_ms) + .await; + ctx.record_endpoint_success(&mapping.provider, &mapping.actual_model); + + let stream = + wrap_stream_with_middleware(ctx, stream_response.stream, tap_request_body); + + let upstream_headers: Vec<(String, String)> = + stream_response.headers.into_iter().collect(); + + Ok(DispatchResult::Streaming { + stream, + provider: mapping.provider.clone(), + actual_model: mapping.actual_model.clone(), + upstream_headers, + overhead_ms, + }) + } + Err(e) => { + ctx.record_provider_failure(&mapping.provider).await; + ctx.record_endpoint_failure(&mapping.provider, &mapping.actual_model); + if let Some(ref trace_id) = ctx.trace_id { + ctx.state + .observability + .message_tracer + .trace_error(trace_id, &e.to_string()); + } + handle_provider_error(mapping, &e); + if is_auth_revoked_error(&e) { + return Err(ProviderLoopAction::AuthRevoked(e.to_string())); + } + let is_rate_limit = matches!( + e, + crate::providers::error::ProviderError::ApiError { status: 429, .. } + ); + if is_rate_limit { + Err(ProviderLoopAction::RateLimited) + } else { + Err(ProviderLoopAction::Continue) + } + } + } +} + +/// Handle the non-streaming path with retry for a single provider. +pub(super) async fn dispatch_non_streaming( + ctx: &DispatchContext<'_>, + provider_request: crate::models::CanonicalRequest, + provider: &dyn crate::providers::LlmProvider, + attempt: &ProviderAttempt<'_>, +) -> Result { + // Wrap in Option so we can move (not clone) on the final attempt. + let mut owned_request = Some(provider_request); + for retry in 0..=MAX_RETRIES { + if retry > 0 { + let delay = retry_delay(retry - 1); + warn!( + "Retrying provider {} (attempt {}/{}), backoff {}ms", + attempt.mapping.provider, + retry + 1, + MAX_RETRIES + 1, + delay.as_millis() + ); + tokio::time::sleep(delay).await; + } + + // Clone for earlier attempts; move on the last to avoid an extra allocation. + let req = if retry < MAX_RETRIES { + owned_request.as_ref().expect("set before loop").clone() + } else { + owned_request.take().expect("set before loop") + }; + + let provider_start = std::time::Instant::now(); + match provider.send_message(req).await { + Ok(mut response) => { + let provider_duration_ms = provider_start.elapsed().as_millis() as u64; + let latency_ms = ctx.start_time.elapsed().as_millis() as u64; + ctx.record_provider_success(&attempt.mapping.provider, latency_ms) + .await; + ctx.record_endpoint_success( + &attempt.mapping.provider, + &attempt.mapping.actual_model, + ); + ctx.sanitize_output(&mut response); + response.model = attempt.original_model.to_string(); + + let latency_ms = ctx.start_time.elapsed().as_millis() as u64; + let outcome = telemetry::DispatchOutcome { + mapping: attempt.mapping, + decision: attempt.decision, + response: &response, + latency_ms, + }; + let cost_usd = + calculate_and_record_metrics(ctx, &outcome, attempt.is_subscription).await; + record_success_telemetry(ctx, &outcome, cost_usd).await; + let cached_bytes = + store_response_cache(ctx, attempt.mapping, attempt.cache_key, &response) + .await; + + // Emit RequestEnd event for `grob watch`. + ctx.state + .event_bus + .emit(crate::features::watch::events::WatchEvent::RequestEnd { + request_id: ctx.req_id.to_string(), + model: attempt.mapping.actual_model.clone(), + provider: attempt.mapping.provider.clone(), + output_tokens: response.usage.output_tokens, + latency_ms, + cost_usd, + timestamp: chrono::Utc::now(), + }); + + // Emit to external log sinks. + if let Some(ref exporter) = ctx.state.log_exporter { + let (encrypted_content, content_recipients) = + build_encrypted_content(ctx, &cached_bytes); + + exporter.emit(&crate::features::log_export::LogEntry { + request_id: ctx.req_id.to_string(), + timestamp: chrono::Utc::now().to_rfc3339(), + model: attempt.mapping.actual_model.clone(), + provider: attempt.mapping.provider.clone(), + input_tokens: response.usage.input_tokens, + output_tokens: response.usage.output_tokens, + latency_ms, + cost_usd, + status: "success".to_string(), + dlp_actions: vec![], + tenant_id: ctx.tenant_id.clone(), + encrypted_content, + content_recipients, + }); + } + + return Ok(DispatchResult::Complete { + response, + provider: attempt.mapping.provider.clone(), + actual_model: attempt.mapping.actual_model.clone(), + provider_duration_ms, + }); + } + Err(e) => { + // 401 authentication_error is terminal — abort the cascade. + if is_auth_revoked_error(&e) { + ctx.record_provider_failure(&attempt.mapping.provider).await; + return Err(ProviderLoopAction::AuthRevoked(e.to_string())); + } + + if classify_and_handle_error(ctx, attempt.mapping, &e, retry) { + // On 429, try rotating to next pooled key before retrying. + let is_rate_limit = matches!( + e, + crate::providers::error::ProviderError::ApiError { status: 429, .. } + ); + if is_rate_limit && provider.rotate_key_pool() { + info!( + "Provider {} rate-limited, rotated to next pooled key", + attempt.mapping.provider + ); + } + warn!( + "Provider {} failed (retryable): {}", + attempt.mapping.provider, e + ); + continue; + } + + // Before giving up on this provider, try key rotation for 429. + let is_rate_limit = matches!( + e, + crate::providers::error::ProviderError::ApiError { status: 429, .. } + ); + if is_rate_limit && provider.rotate_key_pool() { + info!( + "Provider {} exhausted retries but rotated to next pooled key", + attempt.mapping.provider + ); + // Reset owned_request for another attempt cycle. + continue; + } + + ctx.record_provider_failure(&attempt.mapping.provider).await; + ctx.record_endpoint_failure( + &attempt.mapping.provider, + &attempt.mapping.actual_model, + ); + info!( + "Provider {} failed: {}, trying next fallback", + attempt.mapping.provider, e + ); + break; + } + } + } + Err(ProviderLoopAction::Continue) +} + +/// Wrap a raw provider stream with DLP sanitization, HIT authorization, and Tap recording layers. +fn wrap_stream_with_middleware( + ctx: &DispatchContext<'_>, + raw_stream: Pin< + Box> + Send>, + >, + tap_request_body: Option, +) -> Pin> + Send>> { + let stream: Pin< + Box> + Send>, + > = if let Some(ref dlp_engine) = ctx.dlp { + if dlp_engine.config.scan_output { + Box::pin(crate::features::dlp::stream::DlpStream::new( + raw_stream, + Arc::clone(dlp_engine), + )) + } else { + raw_stream + } + } else { + raw_stream + }; + + // HIT stream: intercept tool_use blocks for human authorization. + #[cfg(feature = "policies")] + let stream = { + let hit_policy = ctx.resolved_policy.as_ref().and_then(|p| p.hit.clone()); + if let Some(hit) = hit_policy { + Box::pin(crate::features::policies::stream::HitStream::new( + stream, + hit, + ctx.req_id.to_string(), + Some(Arc::clone(&ctx.state.hit_pending)), + Some(ctx.state.event_bus.clone()), + ctx.state.security.audit_log.clone(), + )) + as Pin< + Box< + dyn Stream> + + Send, + >, + > + } else { + stream + } + }; + + if let Some(ref tap) = ctx.state.security.tap_sender { + let tap_req_id = uuid::Uuid::new_v4().to_string(); + if let Some(body_json) = tap_request_body { + tap.try_send(crate::features::tap::TapEvent::Request { + request_id: tap_req_id.clone(), + tenant_id: ctx.tenant_id.clone(), + model: ctx.model.clone(), + body: body_json, + }); + } + Box::pin(crate::features::tap::stream::TapStream::new( + stream, + Arc::clone(tap), + tap_req_id, + )) + } else { + stream + } +} + +/// Builds encrypted content for log export when policy requires it. +/// +/// Returns `(None, None)` when encryption is not configured or no policy matches. +#[allow(unused_variables)] +fn build_encrypted_content( + ctx: &DispatchContext<'_>, + response_bytes: &Option>, +) -> (Option, Option>) { + #[cfg(feature = "policies")] + { + use crate::features::log_export::ContentMode; + + // Check if log export is configured for encryption. + if ctx.state.log_exporter.is_none() { + return (None, None); + } + let log_config = &ctx.inner.config.log_export; + if log_config.content != ContentMode::Encrypted { + return (None, None); + } + + // Resolve recipients from access policies. + let access_ctx = crate::features::log_export::access_policy::AccessContext { + tenant: ctx.tenant_id.clone(), + compliance: vec![], + dlp_triggered: false, + }; + let recipient_keys = crate::features::log_export::access_policy::resolve_recipients( + &log_config.access_policies, + &log_config.auditors, + &access_ctx, + ); + if recipient_keys.is_empty() { + return (None, None); + } + + // Build content string from response. + let content = response_bytes + .as_ref() + .map(|b| String::from_utf8_lossy(b).to_string()) + .unwrap_or_default(); + + // Encrypt. + let recipient_names: Vec = log_config + .access_policies + .iter() + .flat_map(|p| p.recipients.clone()) + .collect(); + + match crate::features::log_export::encryption::encrypt_for_recipients( + &content, + &recipient_keys, + ) { + Ok(encrypted) => (Some(encrypted), Some(recipient_names)), + Err(e) => { + tracing::warn!("Failed to encrypt log content: {}", e); + (None, None) + } + } + } + #[cfg(not(feature = "policies"))] + { + (None, None) + } +} From 1005ced27f63055c678cbcc0e5ca14f0efc1ab85 Mon Sep 17 00:00:00 2001 From: commis-ci-fix Date: Tue, 21 Apr 2026 08:40:21 +0200 Subject: [PATCH 2/2] style(dispatch): cargo fmt on resolver/retry/provider_loop imports Co-Authored-By: Claude Opus 4.7 (1M context) --- src/server/dispatch/provider_loop.rs | 12 ++++++------ src/server/dispatch/resolver.rs | 2 +- src/server/dispatch/retry.rs | 12 ++++-------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/server/dispatch/provider_loop.rs b/src/server/dispatch/provider_loop.rs index 2233bf5d..9b441d19 100644 --- a/src/server/dispatch/provider_loop.rs +++ b/src/server/dispatch/provider_loop.rs @@ -19,16 +19,16 @@ //! offers a backward-compat path for unmapped models. A final audit entry //! is written before returning `AppError::ProviderError`. -use super::{AuditEntry, DispatchContext, DispatchResult}; -use super::resolver::{resolve_provider, try_direct_provider_lookup}; -use super::retry::{ - dispatch_non_streaming, dispatch_streaming, try_rotate_and_retry, - ProviderAttempt, ProviderLoopAction, -}; use super::super::{ check_budget, format_route_type, inject_continuation_text, is_provider_subscription, should_inject_continuation, AppError, }; +use super::resolver::{resolve_provider, try_direct_provider_lookup}; +use super::retry::{ + dispatch_non_streaming, dispatch_streaming, try_rotate_and_retry, ProviderAttempt, + ProviderLoopAction, +}; +use super::{AuditEntry, DispatchContext, DispatchResult}; use crate::models::RouteType; use tracing::info; diff --git a/src/server/dispatch/resolver.rs b/src/server/dispatch/resolver.rs index 0899b4cb..95e0cf54 100644 --- a/src/server/dispatch/resolver.rs +++ b/src/server/dispatch/resolver.rs @@ -8,8 +8,8 @@ use std::sync::Arc; -use super::{DispatchContext, DispatchResult}; use super::super::AppError; +use super::{DispatchContext, DispatchResult}; use tracing::info; /// Returns the provider for a mapping, or `None` if it must be skipped. diff --git a/src/server/dispatch/retry.rs b/src/server/dispatch/retry.rs index 39999b18..aa7fd690 100644 --- a/src/server/dispatch/retry.rs +++ b/src/server/dispatch/retry.rs @@ -15,13 +15,11 @@ use std::pin::Pin; use std::sync::Arc; use tracing::{info, warn}; -use super::{telemetry, DispatchContext, DispatchResult}; -use super::super::{ - is_auth_revoked_error, is_retryable, retry_delay, MAX_RETRIES, -}; +use super::super::{is_auth_revoked_error, is_retryable, retry_delay, MAX_RETRIES}; use super::telemetry::{ calculate_and_record_metrics, record_success_telemetry, store_response_cache, }; +use super::{telemetry, DispatchContext, DispatchResult}; /// Internal signal from the per-attempt dispatch back to the outer provider loop. pub(super) enum ProviderLoopAction { @@ -153,8 +151,7 @@ pub(super) async fn dispatch_streaming( .await; ctx.record_endpoint_success(&mapping.provider, &mapping.actual_model); - let stream = - wrap_stream_with_middleware(ctx, stream_response.stream, tap_request_body); + let stream = wrap_stream_with_middleware(ctx, stream_response.stream, tap_request_body); let upstream_headers: Vec<(String, String)> = stream_response.headers.into_iter().collect(); @@ -247,8 +244,7 @@ pub(super) async fn dispatch_non_streaming( calculate_and_record_metrics(ctx, &outcome, attempt.is_subscription).await; record_success_telemetry(ctx, &outcome, cost_usd).await; let cached_bytes = - store_response_cache(ctx, attempt.mapping, attempt.cache_key, &response) - .await; + store_response_cache(ctx, attempt.mapping, attempt.cache_key, &response).await; // Emit RequestEnd event for `grob watch`. ctx.state