Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions architecture/inference-routing.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ sequenceDiagram
Router->>Router: Select route by protocol
Router->>Router: Rewrite auth + model
Router->>Backend: HTTPS request
Backend->>Router: Response
Router->>Proxy: ProxyResponse
Proxy->>Agent: HTTP response over TLS tunnel
Backend->>Router: Response headers + body stream
Router->>Proxy: StreamingProxyResponse (headers first)
Proxy->>Agent: HTTP/1.1 headers (chunked TE)
loop Each body chunk
Router->>Proxy: chunk via next_chunk()
Proxy->>Agent: Chunked-encoded frame
end
Proxy->>Agent: Chunk terminator (0\r\n\r\n)
```

## Provider Profiles
Expand Down Expand Up @@ -149,8 +154,8 @@ If no pattern matches, the proxy returns `403 Forbidden` with `{"error": "connec

Files:

- `crates/navigator-router/src/lib.rs` -- `Router`, `proxy_with_candidates()`
- `crates/navigator-router/src/backend.rs` -- `proxy_to_backend()`, URL construction
- `crates/navigator-router/src/lib.rs` -- `Router`, `proxy_with_candidates()`, `proxy_with_candidates_streaming()`
- `crates/navigator-router/src/backend.rs` -- `proxy_to_backend()`, `proxy_to_backend_streaming()`, URL construction
- `crates/navigator-router/src/config.rs` -- `RouteConfig`, `ResolvedRoute`, YAML loading

### Route selection
Expand All @@ -174,6 +179,29 @@ Before forwarding inference requests, the proxy strips sensitive and hop-by-hop
- **Request**: `authorization`, `x-api-key`, `host`, `content-length`, and hop-by-hop headers (`connection`, `keep-alive`, `proxy-authenticate`, `proxy-authorization`, `proxy-connection`, `te`, `trailer`, `transfer-encoding`, `upgrade`).
- **Response**: `content-length` and hop-by-hop headers.

### Response streaming

The router supports two response modes:

- **Buffered** (`proxy_with_candidates()`): Reads the entire upstream response body into memory before returning a `ProxyResponse { status, headers, body: Bytes }`. Used by mock routes and in-process system inference calls where latency is not a concern.
- **Streaming** (`proxy_with_candidates_streaming()`): Returns a `StreamingProxyResponse` as soon as response headers arrive from the backend. The body is exposed as a `StreamingBody` enum with a `next_chunk()` method that yields `Option<Bytes>` incrementally.

`StreamingBody` has two variants:

| Variant | Source | Behavior |
|---------|--------|----------|
| `Live(reqwest::Response)` | Real HTTP backend | Calls `response.chunk()` to yield each body fragment as it arrives from the network |
| `Buffered(Option<Bytes>)` | Mock routes or fallback | Yields the entire body on the first call, then `None` |

The sandbox proxy (`route_inference_request()` in `proxy.rs`) uses the streaming path for all inference requests:

1. Calls `proxy_with_candidates_streaming()` to get headers immediately.
2. Formats and sends the HTTP/1.1 response header with `Transfer-Encoding: chunked` via `format_http_response_header()`.
3. Loops on `body.next_chunk()`, wrapping each fragment in HTTP chunked encoding via `format_chunk()`.
4. Sends the chunk terminator (`0\r\n\r\n`) via `format_chunk_terminator()`.

This eliminates full-body buffering for streaming responses (SSE). Time-to-first-byte is determined by the backend's first chunk latency rather than the full generation time.

### Mock routes

File: `crates/navigator-router/src/mock.rs`
Expand Down
6 changes: 3 additions & 3 deletions architecture/sandbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@ The interception steps:

4. **Header sanitization**: For matched inference requests, the proxy strips credential headers (`Authorization`, `x-api-key`) and framing/hop-by-hop headers (`host`, `content-length`, `transfer-encoding`, `connection`, etc.). The router rebuilds correct framing for the forwarded body.

5. **Local routing**: Matched requests are executed by calling `Router::proxy_with_candidates()` directly, passing the detected protocol, HTTP method, path, sanitized headers, body, and the cached `ResolvedRoute` list from `InferenceContext`. The router selects the first route whose `protocols` list contains the source protocol (see [Inference Router](inference-routing.md#inference-router) for route selection details). When forwarding to the backend, the router rewrites the request: the route's `api_key` replaces the `Authorization` header, the `Host` header is set to the backend endpoint, and the `"model"` field in the JSON request body is replaced with the route's configured `model` value. If the request body is not valid JSON or does not contain a `"model"` key, the body is forwarded unchanged.
5. **Local routing**: Matched requests are executed by calling `Router::proxy_with_candidates_streaming()`, passing the detected protocol, HTTP method, path, sanitized headers, body, and the cached `ResolvedRoute` list from `InferenceContext`. The router selects the first route whose `protocols` list contains the source protocol (see [Inference Routing -- Response streaming](inference-routing.md#response-streaming) for details). When forwarding to the backend, the router rewrites the request: the route's `api_key` replaces the `Authorization` header, the `Host` header is set to the backend endpoint, and the `"model"` field in the JSON request body is replaced with the route's configured `model` value. If the request body is not valid JSON or does not contain a `"model"` key, the body is forwarded unchanged.

6. **Response handling**:
- On success: the router's response (status code, headers, body) is formatted as an HTTP/1.1 response and sent back to the client after stripping response framing/hop-by-hop headers (`transfer-encoding`, `content-length`, `connection`, etc.)
6. **Response handling (streaming)**:
- On success: response headers are sent back to the client immediately as an HTTP/1.1 response with `Transfer-Encoding: chunked`, using `format_http_response_header()`. Framing/hop-by-hop headers are stripped from the upstream response. Body chunks are then forwarded incrementally as they arrive from the backend via `StreamingProxyResponse::next_chunk()`, each wrapped in HTTP chunked encoding by `format_chunk()`. The stream is terminated with a `0\r\n\r\n` chunk terminator. This ensures time-to-first-byte reflects the backend's first token latency rather than the full generation time.
- On router failure: the error is mapped to an HTTP status code via `router_error_to_http()` and returned as a JSON error body (see error table below)
- Empty route cache: returns `503` JSON error (`{"error": "cluster inference is not configured"}`)
- Non-inference requests: returns `403 Forbidden` with a JSON error body (`{"error": "connection not allowed by policy"}`)
Expand Down
103 changes: 91 additions & 12 deletions crates/navigator-router/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,63 @@
use crate::RouterError;
use crate::config::{AuthHeader, ResolvedRoute};

/// Response from a proxied HTTP request to a backend.
/// Response from a proxied HTTP request to a backend (fully buffered).
#[derive(Debug)]
pub struct ProxyResponse {
pub status: u16,
pub headers: Vec<(String, String)>,
pub body: bytes::Bytes,
}

/// Forward a raw HTTP request to the backend configured in `route`.
/// Response from a proxied HTTP request where the body can be streamed
/// incrementally via [`StreamingProxyResponse::next_chunk`].
pub struct StreamingProxyResponse {
pub status: u16,
pub headers: Vec<(String, String)>,
/// Either a live response to stream from, or a pre-buffered body (for mock routes).
body: StreamingBody,
}

enum StreamingBody {
/// Live upstream response — call `chunk().await` to read incrementally.
Live(reqwest::Response),
/// Pre-buffered body (e.g. from mock routes). Drained on first `next_chunk()`.
Buffered(Option<bytes::Bytes>),
}

impl StreamingProxyResponse {
/// Create from a fully-buffered [`ProxyResponse`] (for mock routes).
pub fn from_buffered(resp: ProxyResponse) -> Self {
Self {
status: resp.status,
headers: resp.headers,
body: StreamingBody::Buffered(Some(resp.body)),
}
}

/// Read the next body chunk. Returns `None` when the body is exhausted.
pub async fn next_chunk(&mut self) -> Result<Option<bytes::Bytes>, RouterError> {
match &mut self.body {
StreamingBody::Live(response) => response.chunk().await.map_err(|e| {
RouterError::UpstreamProtocol(format!("failed to read response chunk: {e}"))
}),
StreamingBody::Buffered(buf) => Ok(buf.take()),
}
}
}

/// Build and send an HTTP request to the backend configured in `route`.
///
/// Rewrites the auth header with the route's API key (using the
/// route's configured [`AuthHeader`] mechanism) and the `Host` header
/// to match the backend endpoint. The original path is appended to
/// the route's endpoint URL.
pub async fn proxy_to_backend(
/// Returns the [`reqwest::Response`] with status, headers, and an un-consumed
/// body stream. Shared by both the buffered and streaming public APIs.
async fn send_backend_request(
client: &reqwest::Client,
route: &ResolvedRoute,
_source_protocol: &str,
method: &str,
path: &str,
headers: Vec<(String, String)>,
body: bytes::Bytes,
) -> Result<ProxyResponse, RouterError> {
) -> Result<reqwest::Response, RouterError> {
let url = build_backend_url(&route.endpoint, path);

let reqwest_method: reqwest::Method = method
Expand Down Expand Up @@ -93,22 +127,43 @@ pub async fn proxy_to_backend(
};
builder = builder.body(body);

let response = builder.send().await.map_err(|e| {
builder.send().await.map_err(|e| {
if e.is_timeout() {
RouterError::UpstreamUnavailable(format!("request to {url} timed out"))
} else if e.is_connect() {
RouterError::UpstreamUnavailable(format!("failed to connect to {url}: {e}"))
} else {
RouterError::Internal(format!("HTTP request failed: {e}"))
}
})?;
})
}

/// Extract status and headers from a [`reqwest::Response`].
fn extract_response_metadata(response: &reqwest::Response) -> (u16, Vec<(String, String)>) {
let status = response.status().as_u16();
let resp_headers: Vec<(String, String)> = response
let headers: Vec<(String, String)> = response
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();
(status, headers)
}

/// Forward a raw HTTP request to the backend configured in `route`.
///
/// Buffers the entire response body before returning. Suitable for
/// non-streaming responses or mock routes.
pub async fn proxy_to_backend(
client: &reqwest::Client,
route: &ResolvedRoute,
_source_protocol: &str,
method: &str,
path: &str,
headers: Vec<(String, String)>,
body: bytes::Bytes,
) -> Result<ProxyResponse, RouterError> {
let response = send_backend_request(client, route, method, path, headers, body).await?;
let (status, resp_headers) = extract_response_metadata(&response);
let resp_body = response
.bytes()
.await
Expand All @@ -121,6 +176,30 @@ pub async fn proxy_to_backend(
})
}

/// Forward a raw HTTP request to the backend, returning response headers
/// immediately without buffering the body.
///
/// The caller streams the body incrementally via
/// [`StreamingProxyResponse::response`] using `chunk().await`.
pub async fn proxy_to_backend_streaming(
client: &reqwest::Client,
route: &ResolvedRoute,
_source_protocol: &str,
method: &str,
path: &str,
headers: Vec<(String, String)>,
body: bytes::Bytes,
) -> Result<StreamingProxyResponse, RouterError> {
let response = send_backend_request(client, route, method, path, headers, body).await?;
let (status, resp_headers) = extract_response_metadata(&response);

Ok(StreamingProxyResponse {
status,
headers: resp_headers,
body: StreamingBody::Live(response),
})
}

fn build_backend_url(endpoint: &str, path: &str) -> String {
let base = endpoint.trim_end_matches('/');
if base.ends_with("/v1") && (path == "/v1" || path.starts_with("/v1/")) {
Expand Down
47 changes: 46 additions & 1 deletion crates/navigator-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod mock;

use std::time::Duration;

pub use backend::ProxyResponse;
pub use backend::{ProxyResponse, StreamingProxyResponse};
use config::{ResolvedRoute, RouterConfig};
use tracing::info;

Expand Down Expand Up @@ -95,6 +95,51 @@ impl Router {
)
.await
}

/// Streaming variant of [`proxy_with_candidates`](Self::proxy_with_candidates).
///
/// Returns response headers immediately without buffering the body.
/// The caller streams body chunks via [`StreamingProxyResponse::response`].
pub async fn proxy_with_candidates_streaming(
&self,
source_protocol: &str,
method: &str,
path: &str,
headers: Vec<(String, String)>,
body: bytes::Bytes,
candidates: &[ResolvedRoute],
) -> Result<StreamingProxyResponse, RouterError> {
let normalized_source = source_protocol.trim().to_ascii_lowercase();
let route = candidates
.iter()
.find(|r| r.protocols.iter().any(|p| p == &normalized_source))
.ok_or_else(|| RouterError::NoCompatibleRoute(source_protocol.to_string()))?;

info!(
protocols = %route.protocols.join(","),
endpoint = %route.endpoint,
method = %method,
path = %path,
"routing proxy inference request (streaming)"
);

if mock::is_mock_route(route) {
info!(endpoint = %route.endpoint, "returning mock response (buffered)");
let buffered = mock::mock_response(route, &normalized_source);
return Ok(StreamingProxyResponse::from_buffered(buffered));
}

backend::proxy_to_backend_streaming(
&self.client,
route,
&normalized_source,
method,
path,
headers,
body,
)
.await
}
}

#[cfg(test)]
Expand Down
Loading
Loading