From 80c8cce8ea1ab80d607047f180d4b3868debe11b Mon Sep 17 00:00:00 2001 From: John Myers <9696606+johntmyers@users.noreply.github.com> Date: Thu, 12 Mar 2026 09:33:14 -0700 Subject: [PATCH 1/3] fix(proxy): stream inference responses instead of buffering entire body The inference.local proxy path called response.bytes().await which buffered the entire upstream response before sending anything to the client. For streaming SSE responses this inflated TTFB from sub-second to the full generation time, causing clients with TTFB timeouts to abort. Add a streaming proxy variant that returns response headers immediately and forwards body chunks incrementally using HTTP chunked transfer encoding. Non-streaming responses and mock routes continue to work through the existing buffered path. Closes #260 --- crates/navigator-router/src/backend.rs | 103 +++++++++++++-- crates/navigator-router/src/lib.rs | 47 ++++++- crates/navigator-sandbox/src/l7/inference.rs | 112 ++++++++++++++++ crates/navigator-sandbox/src/proxy.rs | 37 +++++- examples/local-inference/README.md | 127 +++++++------------ examples/local-inference/inference.py | 94 ++++++++++++-- examples/local-inference/sandbox-policy.yaml | 10 +- 7 files changed, 419 insertions(+), 111 deletions(-) diff --git a/crates/navigator-router/src/backend.rs b/crates/navigator-router/src/backend.rs index 23c4cd02..a9fe579a 100644 --- a/crates/navigator-router/src/backend.rs +++ b/crates/navigator-router/src/backend.rs @@ -4,7 +4,7 @@ use crate::RouterError; use crate::config::{AuthHeader, ResolvedRoute}; -/// Response from a proxied HTTP request to a backend. +/// Response from a proxied HTTP request to a backend (fully buffered). #[derive(Debug)] pub struct ProxyResponse { pub status: u16, @@ -12,21 +12,55 @@ pub struct ProxyResponse { pub body: bytes::Bytes, } -/// Forward a raw HTTP request to the backend configured in `route`. +/// Response from a proxied HTTP request where the body can be streamed +/// incrementally via [`StreamingProxyResponse::next_chunk`]. +pub struct StreamingProxyResponse { + pub status: u16, + pub headers: Vec<(String, String)>, + /// Either a live response to stream from, or a pre-buffered body (for mock routes). + body: StreamingBody, +} + +enum StreamingBody { + /// Live upstream response — call `chunk().await` to read incrementally. + Live(reqwest::Response), + /// Pre-buffered body (e.g. from mock routes). Drained on first `next_chunk()`. + Buffered(Option), +} + +impl StreamingProxyResponse { + /// Create from a fully-buffered [`ProxyResponse`] (for mock routes). + pub fn from_buffered(resp: ProxyResponse) -> Self { + Self { + status: resp.status, + headers: resp.headers, + body: StreamingBody::Buffered(Some(resp.body)), + } + } + + /// Read the next body chunk. Returns `None` when the body is exhausted. + pub async fn next_chunk(&mut self) -> Result, RouterError> { + match &mut self.body { + StreamingBody::Live(response) => response.chunk().await.map_err(|e| { + RouterError::UpstreamProtocol(format!("failed to read response chunk: {e}")) + }), + StreamingBody::Buffered(buf) => Ok(buf.take()), + } + } +} + +/// Build and send an HTTP request to the backend configured in `route`. /// -/// Rewrites the auth header with the route's API key (using the -/// route's configured [`AuthHeader`] mechanism) and the `Host` header -/// to match the backend endpoint. The original path is appended to -/// the route's endpoint URL. -pub async fn proxy_to_backend( +/// Returns the [`reqwest::Response`] with status, headers, and an un-consumed +/// body stream. Shared by both the buffered and streaming public APIs. +async fn send_backend_request( client: &reqwest::Client, route: &ResolvedRoute, - _source_protocol: &str, method: &str, path: &str, headers: Vec<(String, String)>, body: bytes::Bytes, -) -> Result { +) -> Result { let url = build_backend_url(&route.endpoint, path); let reqwest_method: reqwest::Method = method @@ -93,7 +127,7 @@ pub async fn proxy_to_backend( }; builder = builder.body(body); - let response = builder.send().await.map_err(|e| { + builder.send().await.map_err(|e| { if e.is_timeout() { RouterError::UpstreamUnavailable(format!("request to {url} timed out")) } else if e.is_connect() { @@ -101,14 +135,35 @@ pub async fn proxy_to_backend( } else { RouterError::Internal(format!("HTTP request failed: {e}")) } - })?; + }) +} +/// Extract status and headers from a [`reqwest::Response`]. +fn extract_response_metadata(response: &reqwest::Response) -> (u16, Vec<(String, String)>) { let status = response.status().as_u16(); - let resp_headers: Vec<(String, String)> = response + let headers: Vec<(String, String)> = response .headers() .iter() .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) .collect(); + (status, headers) +} + +/// Forward a raw HTTP request to the backend configured in `route`. +/// +/// Buffers the entire response body before returning. Suitable for +/// non-streaming responses or mock routes. +pub async fn proxy_to_backend( + client: &reqwest::Client, + route: &ResolvedRoute, + _source_protocol: &str, + method: &str, + path: &str, + headers: Vec<(String, String)>, + body: bytes::Bytes, +) -> Result { + let response = send_backend_request(client, route, method, path, headers, body).await?; + let (status, resp_headers) = extract_response_metadata(&response); let resp_body = response .bytes() .await @@ -121,6 +176,30 @@ pub async fn proxy_to_backend( }) } +/// Forward a raw HTTP request to the backend, returning response headers +/// immediately without buffering the body. +/// +/// The caller streams the body incrementally via +/// [`StreamingProxyResponse::response`] using `chunk().await`. +pub async fn proxy_to_backend_streaming( + client: &reqwest::Client, + route: &ResolvedRoute, + _source_protocol: &str, + method: &str, + path: &str, + headers: Vec<(String, String)>, + body: bytes::Bytes, +) -> Result { + let response = send_backend_request(client, route, method, path, headers, body).await?; + let (status, resp_headers) = extract_response_metadata(&response); + + Ok(StreamingProxyResponse { + status, + headers: resp_headers, + body: StreamingBody::Live(response), + }) +} + fn build_backend_url(endpoint: &str, path: &str) -> String { let base = endpoint.trim_end_matches('/'); if base.ends_with("/v1") && (path == "/v1" || path.starts_with("/v1/")) { diff --git a/crates/navigator-router/src/lib.rs b/crates/navigator-router/src/lib.rs index 835f148d..4edd4f87 100644 --- a/crates/navigator-router/src/lib.rs +++ b/crates/navigator-router/src/lib.rs @@ -7,7 +7,7 @@ mod mock; use std::time::Duration; -pub use backend::ProxyResponse; +pub use backend::{ProxyResponse, StreamingProxyResponse}; use config::{ResolvedRoute, RouterConfig}; use tracing::info; @@ -95,6 +95,51 @@ impl Router { ) .await } + + /// Streaming variant of [`proxy_with_candidates`](Self::proxy_with_candidates). + /// + /// Returns response headers immediately without buffering the body. + /// The caller streams body chunks via [`StreamingProxyResponse::response`]. + pub async fn proxy_with_candidates_streaming( + &self, + source_protocol: &str, + method: &str, + path: &str, + headers: Vec<(String, String)>, + body: bytes::Bytes, + candidates: &[ResolvedRoute], + ) -> Result { + let normalized_source = source_protocol.trim().to_ascii_lowercase(); + let route = candidates + .iter() + .find(|r| r.protocols.iter().any(|p| p == &normalized_source)) + .ok_or_else(|| RouterError::NoCompatibleRoute(source_protocol.to_string()))?; + + info!( + protocols = %route.protocols.join(","), + endpoint = %route.endpoint, + method = %method, + path = %path, + "routing proxy inference request (streaming)" + ); + + if mock::is_mock_route(route) { + info!(endpoint = %route.endpoint, "returning mock response (buffered)"); + let buffered = mock::mock_response(route, &normalized_source); + return Ok(StreamingProxyResponse::from_buffered(buffered)); + } + + backend::proxy_to_backend_streaming( + &self.client, + route, + &normalized_source, + method, + path, + headers, + body, + ) + .await + } } #[cfg(test)] diff --git a/crates/navigator-sandbox/src/l7/inference.rs b/crates/navigator-sandbox/src/l7/inference.rs index adc7437f..022fe4fa 100644 --- a/crates/navigator-sandbox/src/l7/inference.rs +++ b/crates/navigator-sandbox/src/l7/inference.rs @@ -251,6 +251,58 @@ pub fn format_http_response(status: u16, headers: &[(String, String)], body: &[u bytes } +/// Format HTTP/1.1 response headers for a chunked (streaming) response. +/// +/// Emits the status line, supplied headers (stripping any `content-length` or +/// `transfer-encoding` the upstream may have sent), and a +/// `transfer-encoding: chunked` header. The body is **not** included — the +/// caller writes chunks separately via [`format_chunk`] and +/// [`format_chunk_terminator`]. +pub fn format_http_response_header(status: u16, headers: &[(String, String)]) -> Vec { + use std::fmt::Write; + + let status_text = match status { + 200 => "OK", + 400 => "Bad Request", + 403 => "Forbidden", + 411 => "Length Required", + 413 => "Payload Too Large", + 500 => "Internal Server Error", + 502 => "Bad Gateway", + 503 => "Service Unavailable", + _ => "Unknown", + }; + + let mut response = format!("HTTP/1.1 {status} {status_text}\r\n"); + for (name, value) in headers { + // Skip framing headers — we always emit chunked TE. + if name.eq_ignore_ascii_case("content-length") + || name.eq_ignore_ascii_case("transfer-encoding") + { + continue; + } + let _ = write!(response, "{name}: {value}\r\n"); + } + let _ = write!(response, "transfer-encoding: chunked\r\n"); + response.push_str("\r\n"); + response.into_bytes() +} + +/// Format a single HTTP chunked transfer-encoding segment. +/// +/// Returns `\r\n\r\n`. +pub fn format_chunk(data: &[u8]) -> Vec { + let mut buf = format!("{:x}\r\n", data.len()).into_bytes(); + buf.extend_from_slice(data); + buf.extend_from_slice(b"\r\n"); + buf +} + +/// The HTTP chunked transfer-encoding terminator: `0\r\n\r\n`. +pub fn format_chunk_terminator() -> &'static [u8] { + b"0\r\n\r\n" +} + #[cfg(test)] mod tests { use super::*; @@ -372,4 +424,64 @@ mod tests { assert!(response_str.contains("content-length: 11\r\n")); assert!(response_str.ends_with("{\"ok\":true}")); } + + #[test] + fn format_response_header_chunked() { + let headers = vec![ + ("content-type".to_string(), "text/event-stream".to_string()), + ("x-request-id".to_string(), "abc123".to_string()), + ]; + let header = format_http_response_header(200, &headers); + let header_str = String::from_utf8_lossy(&header); + assert!(header_str.starts_with("HTTP/1.1 200 OK\r\n")); + assert!(header_str.contains("content-type: text/event-stream\r\n")); + assert!(header_str.contains("x-request-id: abc123\r\n")); + assert!(header_str.contains("transfer-encoding: chunked\r\n")); + assert!(header_str.ends_with("\r\n")); + // Must NOT contain content-length + assert!(!header_str.to_lowercase().contains("content-length")); + } + + #[test] + fn format_response_header_strips_upstream_framing() { + let headers = vec![ + ("content-length".to_string(), "9999".to_string()), + ("transfer-encoding".to_string(), "chunked".to_string()), + ("content-type".to_string(), "application/json".to_string()), + ]; + let header = format_http_response_header(200, &headers); + let header_str = String::from_utf8_lossy(&header); + // Should not contain the upstream content-length or transfer-encoding values + assert!(!header_str.contains("content-length: 9999")); + // Should contain exactly one transfer-encoding: chunked (ours) + assert_eq!(header_str.matches("transfer-encoding: chunked").count(), 1); + } + + #[test] + fn format_chunk_basic() { + let data = b"hello"; + let chunk = format_chunk(data); + assert_eq!(chunk, b"5\r\nhello\r\n"); + } + + #[test] + fn format_chunk_empty() { + // Empty chunk is NOT the terminator — it's a zero-length data segment + let chunk = format_chunk(b""); + assert_eq!(chunk, b"0\r\n\r\n"); + } + + #[test] + fn format_chunk_terminator_value() { + assert_eq!(format_chunk_terminator(), b"0\r\n\r\n"); + } + + #[test] + fn format_chunk_large_hex() { + let data = vec![0x41u8; 256]; // 0x100 bytes + let chunk = format_chunk(&data); + assert!(chunk.starts_with(b"100\r\n")); + assert!(chunk.ends_with(b"\r\n")); + assert_eq!(chunk.len(), 3 + 2 + 256 + 2); // "100" + \r\n + data + \r\n + } } diff --git a/crates/navigator-sandbox/src/proxy.rs b/crates/navigator-sandbox/src/proxy.rs index 1480e5e0..dee3fdc0 100644 --- a/crates/navigator-sandbox/src/proxy.rs +++ b/crates/navigator-sandbox/src/proxy.rs @@ -924,7 +924,7 @@ async fn route_inference_request( match ctx .router - .proxy_with_candidates( + .proxy_with_candidates_streaming( &pattern.protocol, &request.method, &normalized_path, @@ -934,11 +934,36 @@ async fn route_inference_request( ) .await { - Ok(resp) => { - let resp_headers = - sanitize_inference_response_headers(resp.headers.into_iter().collect()); - let response = format_http_response(resp.status, &resp_headers, &resp.body); - write_all(tls_client, &response).await?; + Ok(mut resp) => { + use crate::l7::inference::{ + format_chunk, format_chunk_terminator, format_http_response_header, + }; + + let resp_headers = sanitize_inference_response_headers( + std::mem::take(&mut resp.headers).into_iter().collect(), + ); + + // Write response headers immediately (chunked TE). + let header_bytes = format_http_response_header(resp.status, &resp_headers); + write_all(tls_client, &header_bytes).await?; + + // Stream body chunks as they arrive from the upstream. + loop { + match resp.next_chunk().await { + Ok(Some(chunk)) => { + let encoded = format_chunk(&chunk); + write_all(tls_client, &encoded).await?; + } + Ok(None) => break, + Err(e) => { + warn!(error = %e, "error reading upstream response chunk"); + break; + } + } + } + + // Terminate the chunked stream. + write_all(tls_client, format_chunk_terminator()).await?; } Err(e) => { warn!(error = %e, "inference endpoint detected but upstream service failed"); diff --git a/examples/local-inference/README.md b/examples/local-inference/README.md index f6785772..6f21f65c 100644 --- a/examples/local-inference/README.md +++ b/examples/local-inference/README.md @@ -17,110 +17,73 @@ OpenShell intercepts and reroutes it to the configured backend. | File | Description | |---|---| -| `inference.py` | Python script that calls the OpenAI SDK through `https://inference.local/v1` | -| `sandbox-policy.yaml` | Minimal sandbox policy for the example | +| `inference.py` | Python script that tests streaming and non-streaming inference through `inference.local` | +| `sandbox-policy.yaml` | Minimal sandbox policy (no network access except `inference.local`) | | `routes.yaml` | Example YAML route file for standalone (no-cluster) mode | -## Quick Start +## Quick Start (NVIDIA) -There are two ways to run inference routing: **with a cluster** (managed -routes, multi-sandbox) or **standalone** (single sandbox, routes from a file). - -### Standalone (no cluster) - -Run the sandbox binary directly with a route file — no OpenShell cluster needed: +Requires a running OpenShell gateway and `NVIDIA_API_KEY` set in your shell. ```bash -# 1. Edit routes.yaml to point at your local LLM (e.g. LM Studio on :1234) -# See examples/inference/routes.yaml - -# 2. Run the sandbox with --inference-routes -navigator-sandbox \ - --inference-routes examples/inference/routes.yaml \ - --policy-rules \ - --policy-data examples/inference/sandbox-policy.yaml \ - -- python examples/inference/inference.py -``` - -The sandbox loads routes from the YAML file at startup and routes inference -requests locally — no gRPC server or cluster required. - -### With a cluster +# 1. Create a provider using your NVIDIA credentials +openshell provider create --name nvidia --type nvidia --credential NVIDIA_API_KEY -#### 1. Start a OpenShell cluster +# 2. Configure inference routing +openshell inference set --provider nvidia --model meta/llama-3.1-8b-instruct -```bash -mise run cluster -openshell status -``` - -#### 2. Configure cluster inference - -First make sure a provider record exists for the backend you want to use: - -```bash -openshell provider list -``` - -Then configure the cluster-managed `inference.local` route: - -```bash -# Example: use an existing provider record -openshell cluster inference set \ - --provider openai-prod \ - --model nvidia/nemotron-3-nano-30b-a3b -``` - -Verify the active config: - -```bash -openshell cluster inference get -``` - -#### 3. Run the example inside a sandbox - -```bash +# 3. Run the test script in a sandbox openshell sandbox create \ - --policy examples/inference/sandbox-policy.yaml \ - --keep \ - --name inference-demo \ - -- python examples/inference/inference.py + --policy examples/local-inference/sandbox-policy.yaml \ + --upload examples/local-inference/inference.py \ + -- python3 /sandbox/inference.py ``` -The script targets `https://inference.local/v1` directly. OpenShell -intercepts that connection and routes it to whatever backend cluster -inference is configured to use. - -Expected output: +Expected output (with the streaming buffering bug present): ``` -model= -content=NAV_OK +============================================================ +NON-STREAMING REQUEST +============================================================ + model = meta/llama-3.1-8b-instruct + content = Glowing screens abide + Whirring circuits, silent mind + Tech's gentle grasp + total = 0.96s + +============================================================ +STREAMING REQUEST +============================================================ + TTFB = 0.54s + model = meta/llama-3.1-8b-instruct + content = Glowing screens abide + Code and circuits whisper + Silent digital + total = 0.54s + + ** BUG: TTFB is 99% of total time — response was buffered, not streamed ** ``` -#### 4. (Optional) Interactive session +When streaming works correctly, TTFB should be sub-second while total time +stays the same (tokens arrive incrementally). -```bash -openshell sandbox connect inference-demo -# Inside the sandbox: -python examples/inference/inference.py -``` +## Standalone (no cluster) -#### 5. Cleanup +Run the sandbox binary directly with a route file — no OpenShell cluster needed: ```bash -openshell sandbox delete inference-demo -``` - -## Customizing Routes +# 1. Edit routes.yaml to point at your local LLM (e.g. LM Studio on :1234) -Edit `routes.yaml` to change which backend endpoint/model standalone mode uses. -In cluster mode, use `openshell cluster inference set` instead. +# 2. Run the sandbox with --inference-routes +navigator-sandbox \ + --inference-routes examples/local-inference/routes.yaml \ + --policy-rules \ + --policy-data examples/local-inference/sandbox-policy.yaml \ + -- python examples/local-inference/inference.py +``` ## Supported Protocols -OpenShell detects and routes the following inference API patterns: - | Pattern | Protocol | Kind | |---|---|---| | `POST /v1/chat/completions` | `openai_chat_completions` | Chat completion | diff --git a/examples/local-inference/inference.py b/examples/local-inference/inference.py index 2cf07e9b..39b47d27 100644 --- a/examples/local-inference/inference.py +++ b/examples/local-inference/inference.py @@ -1,16 +1,92 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from openai import OpenAI +"""Test inference routing through inference.local (streaming and non-streaming). + +Exercises both modes to surface the response-buffering bug where the proxy +calls response.bytes().await on a streaming response, inflating TTFB from +sub-second to the full generation time. +""" + +import subprocess +import sys +import time + +subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "openai"]) + +from openai import OpenAI # noqa: E402 client = OpenAI(api_key="dummy", base_url="https://inference.local/v1") -response = client.chat.completions.create( - model="router", - messages=[{"role": "user", "content": "Reply with exactly: NAV_OK"}], - temperature=0, -) +PROMPT = "Write a short haiku about computers." +MESSAGES = [{"role": "user", "content": PROMPT}] + + +def test_non_streaming(): + print("=" * 60) + print("NON-STREAMING REQUEST") + print("=" * 60) + + t0 = time.monotonic() + response = client.chat.completions.create( + model="router", + messages=MESSAGES, + temperature=0, + ) + elapsed = time.monotonic() - t0 + + content = (response.choices[0].message.content or "").strip() + print(f" model = {response.model}") + print(f" content = {content}") + print(f" total = {elapsed:.2f}s") + print() + + +def test_streaming(): + print("=" * 60) + print("STREAMING REQUEST") + print("=" * 60) + + t0 = time.monotonic() + ttfb = None + chunks = [] + + stream = client.chat.completions.create( + model="router", + messages=MESSAGES, + temperature=0, + stream=True, + ) + + for chunk in stream: + if ttfb is None: + ttfb = time.monotonic() - t0 + print(f" TTFB = {ttfb:.2f}s") + + delta = chunk.choices[0].delta if chunk.choices else None + if delta and delta.content: + chunks.append(delta.content) + + elapsed = time.monotonic() - t0 + content = "".join(chunks).strip() + + print(f" model = {chunk.model}") + print(f" content = {content}") + print(f" total = {elapsed:.2f}s") + print() + + # Flag the bug: if TTFB is close to total time, response was buffered. + if ttfb and elapsed > 0.5 and ttfb > elapsed * 0.8: + print( + " ** BUG: TTFB is {:.0f}% of total time — response was buffered, not streamed **".format( + ttfb / elapsed * 100 + ) + ) + elif ttfb and ttfb < 2.0: + print(" OK: TTFB looks healthy (sub-2s)") + print() + -content = (response.choices[0].message.content or "").strip() -print(f"model={response.model}") -print(f"content={content}") +if __name__ == "__main__": + test_non_streaming() + test_streaming() diff --git a/examples/local-inference/sandbox-policy.yaml b/examples/local-inference/sandbox-policy.yaml index 549a31c9..280d8916 100644 --- a/examples/local-inference/sandbox-policy.yaml +++ b/examples/local-inference/sandbox-policy.yaml @@ -25,4 +25,12 @@ process: run_as_user: sandbox run_as_group: sandbox -# No network policies means all outbound connections are denied and only inference.local is allowed. +# Allow PyPI access so pip can install dependencies inside the sandbox. +network_policies: + pypi: + name: PyPI + endpoints: + - { host: pypi.org, port: 443 } + - { host: files.pythonhosted.org, port: 443 } + binaries: + - path: /usr/bin/python3.12 From 99b5c3a0a1b561c006e283b116b84d112dc59090 Mon Sep 17 00:00:00 2001 From: John Myers <9696606+johntmyers@users.noreply.github.com> Date: Thu, 12 Mar 2026 09:47:50 -0700 Subject: [PATCH 2/3] docs: update architecture docs and example for inference streaming --- architecture/inference-routing.md | 38 +++++++++++++++++++++++---- architecture/sandbox.md | 6 ++--- examples/local-inference/inference.py | 13 ++++++--- 3 files changed, 46 insertions(+), 11 deletions(-) diff --git a/architecture/inference-routing.md b/architecture/inference-routing.md index 6224fd64..60007866 100644 --- a/architecture/inference-routing.md +++ b/architecture/inference-routing.md @@ -25,9 +25,14 @@ sequenceDiagram Router->>Router: Select route by protocol Router->>Router: Rewrite auth + model Router->>Backend: HTTPS request - Backend->>Router: Response - Router->>Proxy: ProxyResponse - Proxy->>Agent: HTTP response over TLS tunnel + Backend->>Router: Response headers + body stream + Router->>Proxy: StreamingProxyResponse (headers first) + Proxy->>Agent: HTTP/1.1 headers (chunked TE) + loop Each body chunk + Router->>Proxy: chunk via next_chunk() + Proxy->>Agent: Chunked-encoded frame + end + Proxy->>Agent: Chunk terminator (0\r\n\r\n) ``` ## Provider Profiles @@ -149,8 +154,8 @@ If no pattern matches, the proxy returns `403 Forbidden` with `{"error": "connec Files: -- `crates/navigator-router/src/lib.rs` -- `Router`, `proxy_with_candidates()` -- `crates/navigator-router/src/backend.rs` -- `proxy_to_backend()`, URL construction +- `crates/navigator-router/src/lib.rs` -- `Router`, `proxy_with_candidates()`, `proxy_with_candidates_streaming()` +- `crates/navigator-router/src/backend.rs` -- `proxy_to_backend()`, `proxy_to_backend_streaming()`, URL construction - `crates/navigator-router/src/config.rs` -- `RouteConfig`, `ResolvedRoute`, YAML loading ### Route selection @@ -174,6 +179,29 @@ Before forwarding inference requests, the proxy strips sensitive and hop-by-hop - **Request**: `authorization`, `x-api-key`, `host`, `content-length`, and hop-by-hop headers (`connection`, `keep-alive`, `proxy-authenticate`, `proxy-authorization`, `proxy-connection`, `te`, `trailer`, `transfer-encoding`, `upgrade`). - **Response**: `content-length` and hop-by-hop headers. +### Response streaming + +The router supports two response modes: + +- **Buffered** (`proxy_with_candidates()`): Reads the entire upstream response body into memory before returning a `ProxyResponse { status, headers, body: Bytes }`. Used by mock routes and in-process system inference calls where latency is not a concern. +- **Streaming** (`proxy_with_candidates_streaming()`): Returns a `StreamingProxyResponse` as soon as response headers arrive from the backend. The body is exposed as a `StreamingBody` enum with a `next_chunk()` method that yields `Option` incrementally. + +`StreamingBody` has two variants: + +| Variant | Source | Behavior | +|---------|--------|----------| +| `Live(reqwest::Response)` | Real HTTP backend | Calls `response.chunk()` to yield each body fragment as it arrives from the network | +| `Buffered(Option)` | Mock routes or fallback | Yields the entire body on the first call, then `None` | + +The sandbox proxy (`route_inference_request()` in `proxy.rs`) uses the streaming path for all inference requests: + +1. Calls `proxy_with_candidates_streaming()` to get headers immediately. +2. Formats and sends the HTTP/1.1 response header with `Transfer-Encoding: chunked` via `format_http_response_header()`. +3. Loops on `body.next_chunk()`, wrapping each fragment in HTTP chunked encoding via `format_chunk()`. +4. Sends the chunk terminator (`0\r\n\r\n`) via `format_chunk_terminator()`. + +This eliminates full-body buffering for streaming responses (SSE). Time-to-first-byte is determined by the backend's first chunk latency rather than the full generation time. + ### Mock routes File: `crates/navigator-router/src/mock.rs` diff --git a/architecture/sandbox.md b/architecture/sandbox.md index d9c5e0a4..b286737c 100644 --- a/architecture/sandbox.md +++ b/architecture/sandbox.md @@ -654,10 +654,10 @@ The interception steps: 4. **Header sanitization**: For matched inference requests, the proxy strips credential headers (`Authorization`, `x-api-key`) and framing/hop-by-hop headers (`host`, `content-length`, `transfer-encoding`, `connection`, etc.). The router rebuilds correct framing for the forwarded body. -5. **Local routing**: Matched requests are executed by calling `Router::proxy_with_candidates()` directly, passing the detected protocol, HTTP method, path, sanitized headers, body, and the cached `ResolvedRoute` list from `InferenceContext`. The router selects the first route whose `protocols` list contains the source protocol (see [Inference Router](inference-routing.md#inference-router) for route selection details). When forwarding to the backend, the router rewrites the request: the route's `api_key` replaces the `Authorization` header, the `Host` header is set to the backend endpoint, and the `"model"` field in the JSON request body is replaced with the route's configured `model` value. If the request body is not valid JSON or does not contain a `"model"` key, the body is forwarded unchanged. +5. **Local routing**: Matched requests are executed by calling `Router::proxy_with_candidates_streaming()`, passing the detected protocol, HTTP method, path, sanitized headers, body, and the cached `ResolvedRoute` list from `InferenceContext`. The router selects the first route whose `protocols` list contains the source protocol (see [Inference Routing -- Response streaming](inference-routing.md#response-streaming) for details). When forwarding to the backend, the router rewrites the request: the route's `api_key` replaces the `Authorization` header, the `Host` header is set to the backend endpoint, and the `"model"` field in the JSON request body is replaced with the route's configured `model` value. If the request body is not valid JSON or does not contain a `"model"` key, the body is forwarded unchanged. -6. **Response handling**: - - On success: the router's response (status code, headers, body) is formatted as an HTTP/1.1 response and sent back to the client after stripping response framing/hop-by-hop headers (`transfer-encoding`, `content-length`, `connection`, etc.) +6. **Response handling (streaming)**: + - On success: response headers are sent back to the client immediately as an HTTP/1.1 response with `Transfer-Encoding: chunked`, using `format_http_response_header()`. Framing/hop-by-hop headers are stripped from the upstream response. Body chunks are then forwarded incrementally as they arrive from the backend via `StreamingProxyResponse::next_chunk()`, each wrapped in HTTP chunked encoding by `format_chunk()`. The stream is terminated with a `0\r\n\r\n` chunk terminator. This ensures time-to-first-byte reflects the backend's first token latency rather than the full generation time. - On router failure: the error is mapped to an HTTP status code via `router_error_to_http()` and returned as a JSON error body (see error table below) - Empty route cache: returns `503` JSON error (`{"error": "cluster inference is not configured"}`) - Non-inference requests: returns `403 Forbidden` with a JSON error body (`{"error": "connection not allowed by policy"}`) diff --git a/examples/local-inference/inference.py b/examples/local-inference/inference.py index 39b47d27..a184244e 100644 --- a/examples/local-inference/inference.py +++ b/examples/local-inference/inference.py @@ -18,7 +18,10 @@ client = OpenAI(api_key="dummy", base_url="https://inference.local/v1") -PROMPT = "Write a short haiku about computers." +PROMPT = ( + "Write a 500-word essay on the history of computing, " + "from Charles Babbage's Analytical Engine to modern GPUs." +) MESSAGES = [{"role": "user", "content": PROMPT}] @@ -36,8 +39,10 @@ def test_non_streaming(): elapsed = time.monotonic() - t0 content = (response.choices[0].message.content or "").strip() + words = content.split() print(f" model = {response.model}") - print(f" content = {content}") + print(f" words = {len(words)}") + print(f" preview = {' '.join(words[:20])}...") print(f" total = {elapsed:.2f}s") print() @@ -70,8 +75,10 @@ def test_streaming(): elapsed = time.monotonic() - t0 content = "".join(chunks).strip() + words = content.split() print(f" model = {chunk.model}") - print(f" content = {content}") + print(f" words = {len(words)}") + print(f" preview = {' '.join(words[:20])}...") print(f" total = {elapsed:.2f}s") print() From eec3485c497dc023762f47c964eccd222ec5b1e8 Mon Sep 17 00:00:00 2001 From: John Myers <9696606+johntmyers@users.noreply.github.com> Date: Thu, 12 Mar 2026 10:34:20 -0700 Subject: [PATCH 3/3] test(example): add direct NVIDIA endpoint tests via L7 TLS intercept Expand inference example to 4 test cases: inference.local and direct endpoint, each streaming and non-streaming. The direct path exercises the L7 REST relay (relay_chunked) to verify it already streams correctly. NVIDIA_API_KEY is picked up from the sandbox env when started with --provider nvidia. --- examples/local-inference/inference.py | 70 +++++++++++++++----- examples/local-inference/sandbox-policy.yaml | 15 +++++ 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/examples/local-inference/inference.py b/examples/local-inference/inference.py index a184244e..2d04569b 100644 --- a/examples/local-inference/inference.py +++ b/examples/local-inference/inference.py @@ -1,13 +1,29 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -"""Test inference routing through inference.local (streaming and non-streaming). - -Exercises both modes to surface the response-buffering bug where the proxy -calls response.bytes().await on a streaming response, inflating TTFB from -sub-second to the full generation time. +"""Test inference routing through both inference.local and direct endpoint access. + +Exercises four scenarios to verify streaming works correctly: + 1. inference.local — non-streaming + 2. inference.local — streaming + 3. Direct NVIDIA endpoint (L7 TLS intercept) — non-streaming + 4. Direct NVIDIA endpoint (L7 TLS intercept) — streaming + +The direct endpoint tests verify that the L7 REST relay path (relay_chunked / +relay_until_eof) streams responses incrementally, in contrast with the +inference.local interception path which previously buffered the entire body. + +Usage: + # inference.local only (no provider attached): + openshell sandbox create --policy sandbox-policy.yaml --upload inference.py \ + -- python3 /sandbox/inference.py + + # All 4 tests (attach the nvidia provider so NVIDIA_API_KEY is available): + openshell sandbox create --provider nvidia --policy sandbox-policy.yaml \ + --upload inference.py -- python3 /sandbox/inference.py """ +import os import subprocess import sys import time @@ -16,8 +32,6 @@ from openai import OpenAI # noqa: E402 -client = OpenAI(api_key="dummy", base_url="https://inference.local/v1") - PROMPT = ( "Write a 500-word essay on the history of computing, " "from Charles Babbage's Analytical Engine to modern GPUs." @@ -25,14 +39,14 @@ MESSAGES = [{"role": "user", "content": PROMPT}] -def test_non_streaming(): +def run_non_streaming(client: OpenAI, label: str, model: str) -> None: print("=" * 60) - print("NON-STREAMING REQUEST") + print(f"NON-STREAMING — {label}") print("=" * 60) t0 = time.monotonic() response = client.chat.completions.create( - model="router", + model=model, messages=MESSAGES, temperature=0, ) @@ -47,9 +61,9 @@ def test_non_streaming(): print() -def test_streaming(): +def run_streaming(client: OpenAI, label: str, model: str) -> None: print("=" * 60) - print("STREAMING REQUEST") + print(f"STREAMING — {label}") print("=" * 60) t0 = time.monotonic() @@ -57,7 +71,7 @@ def test_streaming(): chunks = [] stream = client.chat.completions.create( - model="router", + model=model, messages=MESSAGES, temperature=0, stream=True, @@ -94,6 +108,32 @@ def test_streaming(): print() +DIRECT_URL = "https://integrate.api.nvidia.com/v1" +DIRECT_MODEL = "meta/llama-3.1-8b-instruct" + + +def main() -> None: + # --- inference.local tests (router injects auth + model) --- + local_client = OpenAI(api_key="dummy", base_url="https://inference.local/v1") + + run_non_streaming(local_client, "inference.local", model="router") + run_streaming(local_client, "inference.local", model="router") + + # --- Direct endpoint tests (L7 TLS intercept path) --- + # The API key is available when the sandbox is started with --provider nvidia. + api_key = os.environ.get("NVIDIA_API_KEY") + if api_key: + direct_client = OpenAI(api_key=api_key, base_url=DIRECT_URL) + + run_non_streaming(direct_client, f"direct ({DIRECT_URL})", model=DIRECT_MODEL) + run_streaming(direct_client, f"direct ({DIRECT_URL})", model=DIRECT_MODEL) + else: + print("=" * 60) + print("SKIPPED — direct endpoint tests (NVIDIA_API_KEY not set)") + print("=" * 60) + print(" Attach the nvidia provider to enable: --provider nvidia") + print() + + if __name__ == "__main__": - test_non_streaming() - test_streaming() + main() diff --git a/examples/local-inference/sandbox-policy.yaml b/examples/local-inference/sandbox-policy.yaml index 280d8916..583c736f 100644 --- a/examples/local-inference/sandbox-policy.yaml +++ b/examples/local-inference/sandbox-policy.yaml @@ -34,3 +34,18 @@ network_policies: - { host: files.pythonhosted.org, port: 443 } binaries: - path: /usr/bin/python3.12 + + # Direct access to the NVIDIA inference API (L7 TLS intercept). + # Used to verify that the L7 REST relay path streams responses correctly + # (contrast with the inference.local interception path above). + nvidia_direct: + name: NVIDIA API (L7 intercept) + endpoints: + - host: integrate.api.nvidia.com + port: 443 + protocol: rest + tls: terminate + enforcement: enforce + access: full + binaries: + - path: /usr/bin/python3.12