diff --git a/architecture/inference-routing.md b/architecture/inference-routing.md index 6224fd64..60007866 100644 --- a/architecture/inference-routing.md +++ b/architecture/inference-routing.md @@ -25,9 +25,14 @@ sequenceDiagram Router->>Router: Select route by protocol Router->>Router: Rewrite auth + model Router->>Backend: HTTPS request - Backend->>Router: Response - Router->>Proxy: ProxyResponse - Proxy->>Agent: HTTP response over TLS tunnel + Backend->>Router: Response headers + body stream + Router->>Proxy: StreamingProxyResponse (headers first) + Proxy->>Agent: HTTP/1.1 headers (chunked TE) + loop Each body chunk + Router->>Proxy: chunk via next_chunk() + Proxy->>Agent: Chunked-encoded frame + end + Proxy->>Agent: Chunk terminator (0\r\n\r\n) ``` ## Provider Profiles @@ -149,8 +154,8 @@ If no pattern matches, the proxy returns `403 Forbidden` with `{"error": "connec Files: -- `crates/navigator-router/src/lib.rs` -- `Router`, `proxy_with_candidates()` -- `crates/navigator-router/src/backend.rs` -- `proxy_to_backend()`, URL construction +- `crates/navigator-router/src/lib.rs` -- `Router`, `proxy_with_candidates()`, `proxy_with_candidates_streaming()` +- `crates/navigator-router/src/backend.rs` -- `proxy_to_backend()`, `proxy_to_backend_streaming()`, URL construction - `crates/navigator-router/src/config.rs` -- `RouteConfig`, `ResolvedRoute`, YAML loading ### Route selection @@ -174,6 +179,29 @@ Before forwarding inference requests, the proxy strips sensitive and hop-by-hop - **Request**: `authorization`, `x-api-key`, `host`, `content-length`, and hop-by-hop headers (`connection`, `keep-alive`, `proxy-authenticate`, `proxy-authorization`, `proxy-connection`, `te`, `trailer`, `transfer-encoding`, `upgrade`). - **Response**: `content-length` and hop-by-hop headers. +### Response streaming + +The router supports two response modes: + +- **Buffered** (`proxy_with_candidates()`): Reads the entire upstream response body into memory before returning a `ProxyResponse { status, headers, body: Bytes }`. Used by mock routes and in-process system inference calls where latency is not a concern. +- **Streaming** (`proxy_with_candidates_streaming()`): Returns a `StreamingProxyResponse` as soon as response headers arrive from the backend. The body is exposed as a `StreamingBody` enum with a `next_chunk()` method that yields `Option` incrementally. + +`StreamingBody` has two variants: + +| Variant | Source | Behavior | +|---------|--------|----------| +| `Live(reqwest::Response)` | Real HTTP backend | Calls `response.chunk()` to yield each body fragment as it arrives from the network | +| `Buffered(Option)` | Mock routes or fallback | Yields the entire body on the first call, then `None` | + +The sandbox proxy (`route_inference_request()` in `proxy.rs`) uses the streaming path for all inference requests: + +1. Calls `proxy_with_candidates_streaming()` to get headers immediately. +2. Formats and sends the HTTP/1.1 response header with `Transfer-Encoding: chunked` via `format_http_response_header()`. +3. Loops on `body.next_chunk()`, wrapping each fragment in HTTP chunked encoding via `format_chunk()`. +4. Sends the chunk terminator (`0\r\n\r\n`) via `format_chunk_terminator()`. + +This eliminates full-body buffering for streaming responses (SSE). Time-to-first-byte is determined by the backend's first chunk latency rather than the full generation time. + ### Mock routes File: `crates/navigator-router/src/mock.rs` diff --git a/architecture/sandbox.md b/architecture/sandbox.md index d9c5e0a4..b286737c 100644 --- a/architecture/sandbox.md +++ b/architecture/sandbox.md @@ -654,10 +654,10 @@ The interception steps: 4. **Header sanitization**: For matched inference requests, the proxy strips credential headers (`Authorization`, `x-api-key`) and framing/hop-by-hop headers (`host`, `content-length`, `transfer-encoding`, `connection`, etc.). The router rebuilds correct framing for the forwarded body. -5. **Local routing**: Matched requests are executed by calling `Router::proxy_with_candidates()` directly, passing the detected protocol, HTTP method, path, sanitized headers, body, and the cached `ResolvedRoute` list from `InferenceContext`. The router selects the first route whose `protocols` list contains the source protocol (see [Inference Router](inference-routing.md#inference-router) for route selection details). When forwarding to the backend, the router rewrites the request: the route's `api_key` replaces the `Authorization` header, the `Host` header is set to the backend endpoint, and the `"model"` field in the JSON request body is replaced with the route's configured `model` value. If the request body is not valid JSON or does not contain a `"model"` key, the body is forwarded unchanged. +5. **Local routing**: Matched requests are executed by calling `Router::proxy_with_candidates_streaming()`, passing the detected protocol, HTTP method, path, sanitized headers, body, and the cached `ResolvedRoute` list from `InferenceContext`. The router selects the first route whose `protocols` list contains the source protocol (see [Inference Routing -- Response streaming](inference-routing.md#response-streaming) for details). When forwarding to the backend, the router rewrites the request: the route's `api_key` replaces the `Authorization` header, the `Host` header is set to the backend endpoint, and the `"model"` field in the JSON request body is replaced with the route's configured `model` value. If the request body is not valid JSON or does not contain a `"model"` key, the body is forwarded unchanged. -6. **Response handling**: - - On success: the router's response (status code, headers, body) is formatted as an HTTP/1.1 response and sent back to the client after stripping response framing/hop-by-hop headers (`transfer-encoding`, `content-length`, `connection`, etc.) +6. **Response handling (streaming)**: + - On success: response headers are sent back to the client immediately as an HTTP/1.1 response with `Transfer-Encoding: chunked`, using `format_http_response_header()`. Framing/hop-by-hop headers are stripped from the upstream response. Body chunks are then forwarded incrementally as they arrive from the backend via `StreamingProxyResponse::next_chunk()`, each wrapped in HTTP chunked encoding by `format_chunk()`. The stream is terminated with a `0\r\n\r\n` chunk terminator. This ensures time-to-first-byte reflects the backend's first token latency rather than the full generation time. - On router failure: the error is mapped to an HTTP status code via `router_error_to_http()` and returned as a JSON error body (see error table below) - Empty route cache: returns `503` JSON error (`{"error": "cluster inference is not configured"}`) - Non-inference requests: returns `403 Forbidden` with a JSON error body (`{"error": "connection not allowed by policy"}`) diff --git a/crates/navigator-router/src/backend.rs b/crates/navigator-router/src/backend.rs index 23c4cd02..a9fe579a 100644 --- a/crates/navigator-router/src/backend.rs +++ b/crates/navigator-router/src/backend.rs @@ -4,7 +4,7 @@ use crate::RouterError; use crate::config::{AuthHeader, ResolvedRoute}; -/// Response from a proxied HTTP request to a backend. +/// Response from a proxied HTTP request to a backend (fully buffered). #[derive(Debug)] pub struct ProxyResponse { pub status: u16, @@ -12,21 +12,55 @@ pub struct ProxyResponse { pub body: bytes::Bytes, } -/// Forward a raw HTTP request to the backend configured in `route`. +/// Response from a proxied HTTP request where the body can be streamed +/// incrementally via [`StreamingProxyResponse::next_chunk`]. +pub struct StreamingProxyResponse { + pub status: u16, + pub headers: Vec<(String, String)>, + /// Either a live response to stream from, or a pre-buffered body (for mock routes). + body: StreamingBody, +} + +enum StreamingBody { + /// Live upstream response — call `chunk().await` to read incrementally. + Live(reqwest::Response), + /// Pre-buffered body (e.g. from mock routes). Drained on first `next_chunk()`. + Buffered(Option), +} + +impl StreamingProxyResponse { + /// Create from a fully-buffered [`ProxyResponse`] (for mock routes). + pub fn from_buffered(resp: ProxyResponse) -> Self { + Self { + status: resp.status, + headers: resp.headers, + body: StreamingBody::Buffered(Some(resp.body)), + } + } + + /// Read the next body chunk. Returns `None` when the body is exhausted. + pub async fn next_chunk(&mut self) -> Result, RouterError> { + match &mut self.body { + StreamingBody::Live(response) => response.chunk().await.map_err(|e| { + RouterError::UpstreamProtocol(format!("failed to read response chunk: {e}")) + }), + StreamingBody::Buffered(buf) => Ok(buf.take()), + } + } +} + +/// Build and send an HTTP request to the backend configured in `route`. /// -/// Rewrites the auth header with the route's API key (using the -/// route's configured [`AuthHeader`] mechanism) and the `Host` header -/// to match the backend endpoint. The original path is appended to -/// the route's endpoint URL. -pub async fn proxy_to_backend( +/// Returns the [`reqwest::Response`] with status, headers, and an un-consumed +/// body stream. Shared by both the buffered and streaming public APIs. +async fn send_backend_request( client: &reqwest::Client, route: &ResolvedRoute, - _source_protocol: &str, method: &str, path: &str, headers: Vec<(String, String)>, body: bytes::Bytes, -) -> Result { +) -> Result { let url = build_backend_url(&route.endpoint, path); let reqwest_method: reqwest::Method = method @@ -93,7 +127,7 @@ pub async fn proxy_to_backend( }; builder = builder.body(body); - let response = builder.send().await.map_err(|e| { + builder.send().await.map_err(|e| { if e.is_timeout() { RouterError::UpstreamUnavailable(format!("request to {url} timed out")) } else if e.is_connect() { @@ -101,14 +135,35 @@ pub async fn proxy_to_backend( } else { RouterError::Internal(format!("HTTP request failed: {e}")) } - })?; + }) +} +/// Extract status and headers from a [`reqwest::Response`]. +fn extract_response_metadata(response: &reqwest::Response) -> (u16, Vec<(String, String)>) { let status = response.status().as_u16(); - let resp_headers: Vec<(String, String)> = response + let headers: Vec<(String, String)> = response .headers() .iter() .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) .collect(); + (status, headers) +} + +/// Forward a raw HTTP request to the backend configured in `route`. +/// +/// Buffers the entire response body before returning. Suitable for +/// non-streaming responses or mock routes. +pub async fn proxy_to_backend( + client: &reqwest::Client, + route: &ResolvedRoute, + _source_protocol: &str, + method: &str, + path: &str, + headers: Vec<(String, String)>, + body: bytes::Bytes, +) -> Result { + let response = send_backend_request(client, route, method, path, headers, body).await?; + let (status, resp_headers) = extract_response_metadata(&response); let resp_body = response .bytes() .await @@ -121,6 +176,30 @@ pub async fn proxy_to_backend( }) } +/// Forward a raw HTTP request to the backend, returning response headers +/// immediately without buffering the body. +/// +/// The caller streams the body incrementally via +/// [`StreamingProxyResponse::response`] using `chunk().await`. +pub async fn proxy_to_backend_streaming( + client: &reqwest::Client, + route: &ResolvedRoute, + _source_protocol: &str, + method: &str, + path: &str, + headers: Vec<(String, String)>, + body: bytes::Bytes, +) -> Result { + let response = send_backend_request(client, route, method, path, headers, body).await?; + let (status, resp_headers) = extract_response_metadata(&response); + + Ok(StreamingProxyResponse { + status, + headers: resp_headers, + body: StreamingBody::Live(response), + }) +} + fn build_backend_url(endpoint: &str, path: &str) -> String { let base = endpoint.trim_end_matches('/'); if base.ends_with("/v1") && (path == "/v1" || path.starts_with("/v1/")) { diff --git a/crates/navigator-router/src/lib.rs b/crates/navigator-router/src/lib.rs index 835f148d..4edd4f87 100644 --- a/crates/navigator-router/src/lib.rs +++ b/crates/navigator-router/src/lib.rs @@ -7,7 +7,7 @@ mod mock; use std::time::Duration; -pub use backend::ProxyResponse; +pub use backend::{ProxyResponse, StreamingProxyResponse}; use config::{ResolvedRoute, RouterConfig}; use tracing::info; @@ -95,6 +95,51 @@ impl Router { ) .await } + + /// Streaming variant of [`proxy_with_candidates`](Self::proxy_with_candidates). + /// + /// Returns response headers immediately without buffering the body. + /// The caller streams body chunks via [`StreamingProxyResponse::response`]. + pub async fn proxy_with_candidates_streaming( + &self, + source_protocol: &str, + method: &str, + path: &str, + headers: Vec<(String, String)>, + body: bytes::Bytes, + candidates: &[ResolvedRoute], + ) -> Result { + let normalized_source = source_protocol.trim().to_ascii_lowercase(); + let route = candidates + .iter() + .find(|r| r.protocols.iter().any(|p| p == &normalized_source)) + .ok_or_else(|| RouterError::NoCompatibleRoute(source_protocol.to_string()))?; + + info!( + protocols = %route.protocols.join(","), + endpoint = %route.endpoint, + method = %method, + path = %path, + "routing proxy inference request (streaming)" + ); + + if mock::is_mock_route(route) { + info!(endpoint = %route.endpoint, "returning mock response (buffered)"); + let buffered = mock::mock_response(route, &normalized_source); + return Ok(StreamingProxyResponse::from_buffered(buffered)); + } + + backend::proxy_to_backend_streaming( + &self.client, + route, + &normalized_source, + method, + path, + headers, + body, + ) + .await + } } #[cfg(test)] diff --git a/crates/navigator-sandbox/src/l7/inference.rs b/crates/navigator-sandbox/src/l7/inference.rs index adc7437f..022fe4fa 100644 --- a/crates/navigator-sandbox/src/l7/inference.rs +++ b/crates/navigator-sandbox/src/l7/inference.rs @@ -251,6 +251,58 @@ pub fn format_http_response(status: u16, headers: &[(String, String)], body: &[u bytes } +/// Format HTTP/1.1 response headers for a chunked (streaming) response. +/// +/// Emits the status line, supplied headers (stripping any `content-length` or +/// `transfer-encoding` the upstream may have sent), and a +/// `transfer-encoding: chunked` header. The body is **not** included — the +/// caller writes chunks separately via [`format_chunk`] and +/// [`format_chunk_terminator`]. +pub fn format_http_response_header(status: u16, headers: &[(String, String)]) -> Vec { + use std::fmt::Write; + + let status_text = match status { + 200 => "OK", + 400 => "Bad Request", + 403 => "Forbidden", + 411 => "Length Required", + 413 => "Payload Too Large", + 500 => "Internal Server Error", + 502 => "Bad Gateway", + 503 => "Service Unavailable", + _ => "Unknown", + }; + + let mut response = format!("HTTP/1.1 {status} {status_text}\r\n"); + for (name, value) in headers { + // Skip framing headers — we always emit chunked TE. + if name.eq_ignore_ascii_case("content-length") + || name.eq_ignore_ascii_case("transfer-encoding") + { + continue; + } + let _ = write!(response, "{name}: {value}\r\n"); + } + let _ = write!(response, "transfer-encoding: chunked\r\n"); + response.push_str("\r\n"); + response.into_bytes() +} + +/// Format a single HTTP chunked transfer-encoding segment. +/// +/// Returns `\r\n\r\n`. +pub fn format_chunk(data: &[u8]) -> Vec { + let mut buf = format!("{:x}\r\n", data.len()).into_bytes(); + buf.extend_from_slice(data); + buf.extend_from_slice(b"\r\n"); + buf +} + +/// The HTTP chunked transfer-encoding terminator: `0\r\n\r\n`. +pub fn format_chunk_terminator() -> &'static [u8] { + b"0\r\n\r\n" +} + #[cfg(test)] mod tests { use super::*; @@ -372,4 +424,64 @@ mod tests { assert!(response_str.contains("content-length: 11\r\n")); assert!(response_str.ends_with("{\"ok\":true}")); } + + #[test] + fn format_response_header_chunked() { + let headers = vec![ + ("content-type".to_string(), "text/event-stream".to_string()), + ("x-request-id".to_string(), "abc123".to_string()), + ]; + let header = format_http_response_header(200, &headers); + let header_str = String::from_utf8_lossy(&header); + assert!(header_str.starts_with("HTTP/1.1 200 OK\r\n")); + assert!(header_str.contains("content-type: text/event-stream\r\n")); + assert!(header_str.contains("x-request-id: abc123\r\n")); + assert!(header_str.contains("transfer-encoding: chunked\r\n")); + assert!(header_str.ends_with("\r\n")); + // Must NOT contain content-length + assert!(!header_str.to_lowercase().contains("content-length")); + } + + #[test] + fn format_response_header_strips_upstream_framing() { + let headers = vec![ + ("content-length".to_string(), "9999".to_string()), + ("transfer-encoding".to_string(), "chunked".to_string()), + ("content-type".to_string(), "application/json".to_string()), + ]; + let header = format_http_response_header(200, &headers); + let header_str = String::from_utf8_lossy(&header); + // Should not contain the upstream content-length or transfer-encoding values + assert!(!header_str.contains("content-length: 9999")); + // Should contain exactly one transfer-encoding: chunked (ours) + assert_eq!(header_str.matches("transfer-encoding: chunked").count(), 1); + } + + #[test] + fn format_chunk_basic() { + let data = b"hello"; + let chunk = format_chunk(data); + assert_eq!(chunk, b"5\r\nhello\r\n"); + } + + #[test] + fn format_chunk_empty() { + // Empty chunk is NOT the terminator — it's a zero-length data segment + let chunk = format_chunk(b""); + assert_eq!(chunk, b"0\r\n\r\n"); + } + + #[test] + fn format_chunk_terminator_value() { + assert_eq!(format_chunk_terminator(), b"0\r\n\r\n"); + } + + #[test] + fn format_chunk_large_hex() { + let data = vec![0x41u8; 256]; // 0x100 bytes + let chunk = format_chunk(&data); + assert!(chunk.starts_with(b"100\r\n")); + assert!(chunk.ends_with(b"\r\n")); + assert_eq!(chunk.len(), 3 + 2 + 256 + 2); // "100" + \r\n + data + \r\n + } } diff --git a/crates/navigator-sandbox/src/proxy.rs b/crates/navigator-sandbox/src/proxy.rs index 1480e5e0..dee3fdc0 100644 --- a/crates/navigator-sandbox/src/proxy.rs +++ b/crates/navigator-sandbox/src/proxy.rs @@ -924,7 +924,7 @@ async fn route_inference_request( match ctx .router - .proxy_with_candidates( + .proxy_with_candidates_streaming( &pattern.protocol, &request.method, &normalized_path, @@ -934,11 +934,36 @@ async fn route_inference_request( ) .await { - Ok(resp) => { - let resp_headers = - sanitize_inference_response_headers(resp.headers.into_iter().collect()); - let response = format_http_response(resp.status, &resp_headers, &resp.body); - write_all(tls_client, &response).await?; + Ok(mut resp) => { + use crate::l7::inference::{ + format_chunk, format_chunk_terminator, format_http_response_header, + }; + + let resp_headers = sanitize_inference_response_headers( + std::mem::take(&mut resp.headers).into_iter().collect(), + ); + + // Write response headers immediately (chunked TE). + let header_bytes = format_http_response_header(resp.status, &resp_headers); + write_all(tls_client, &header_bytes).await?; + + // Stream body chunks as they arrive from the upstream. + loop { + match resp.next_chunk().await { + Ok(Some(chunk)) => { + let encoded = format_chunk(&chunk); + write_all(tls_client, &encoded).await?; + } + Ok(None) => break, + Err(e) => { + warn!(error = %e, "error reading upstream response chunk"); + break; + } + } + } + + // Terminate the chunked stream. + write_all(tls_client, format_chunk_terminator()).await?; } Err(e) => { warn!(error = %e, "inference endpoint detected but upstream service failed"); diff --git a/examples/local-inference/README.md b/examples/local-inference/README.md index f6785772..6f21f65c 100644 --- a/examples/local-inference/README.md +++ b/examples/local-inference/README.md @@ -17,110 +17,73 @@ OpenShell intercepts and reroutes it to the configured backend. | File | Description | |---|---| -| `inference.py` | Python script that calls the OpenAI SDK through `https://inference.local/v1` | -| `sandbox-policy.yaml` | Minimal sandbox policy for the example | +| `inference.py` | Python script that tests streaming and non-streaming inference through `inference.local` | +| `sandbox-policy.yaml` | Minimal sandbox policy (no network access except `inference.local`) | | `routes.yaml` | Example YAML route file for standalone (no-cluster) mode | -## Quick Start +## Quick Start (NVIDIA) -There are two ways to run inference routing: **with a cluster** (managed -routes, multi-sandbox) or **standalone** (single sandbox, routes from a file). - -### Standalone (no cluster) - -Run the sandbox binary directly with a route file — no OpenShell cluster needed: +Requires a running OpenShell gateway and `NVIDIA_API_KEY` set in your shell. ```bash -# 1. Edit routes.yaml to point at your local LLM (e.g. LM Studio on :1234) -# See examples/inference/routes.yaml - -# 2. Run the sandbox with --inference-routes -navigator-sandbox \ - --inference-routes examples/inference/routes.yaml \ - --policy-rules \ - --policy-data examples/inference/sandbox-policy.yaml \ - -- python examples/inference/inference.py -``` - -The sandbox loads routes from the YAML file at startup and routes inference -requests locally — no gRPC server or cluster required. - -### With a cluster +# 1. Create a provider using your NVIDIA credentials +openshell provider create --name nvidia --type nvidia --credential NVIDIA_API_KEY -#### 1. Start a OpenShell cluster +# 2. Configure inference routing +openshell inference set --provider nvidia --model meta/llama-3.1-8b-instruct -```bash -mise run cluster -openshell status -``` - -#### 2. Configure cluster inference - -First make sure a provider record exists for the backend you want to use: - -```bash -openshell provider list -``` - -Then configure the cluster-managed `inference.local` route: - -```bash -# Example: use an existing provider record -openshell cluster inference set \ - --provider openai-prod \ - --model nvidia/nemotron-3-nano-30b-a3b -``` - -Verify the active config: - -```bash -openshell cluster inference get -``` - -#### 3. Run the example inside a sandbox - -```bash +# 3. Run the test script in a sandbox openshell sandbox create \ - --policy examples/inference/sandbox-policy.yaml \ - --keep \ - --name inference-demo \ - -- python examples/inference/inference.py + --policy examples/local-inference/sandbox-policy.yaml \ + --upload examples/local-inference/inference.py \ + -- python3 /sandbox/inference.py ``` -The script targets `https://inference.local/v1` directly. OpenShell -intercepts that connection and routes it to whatever backend cluster -inference is configured to use. - -Expected output: +Expected output (with the streaming buffering bug present): ``` -model= -content=NAV_OK +============================================================ +NON-STREAMING REQUEST +============================================================ + model = meta/llama-3.1-8b-instruct + content = Glowing screens abide + Whirring circuits, silent mind + Tech's gentle grasp + total = 0.96s + +============================================================ +STREAMING REQUEST +============================================================ + TTFB = 0.54s + model = meta/llama-3.1-8b-instruct + content = Glowing screens abide + Code and circuits whisper + Silent digital + total = 0.54s + + ** BUG: TTFB is 99% of total time — response was buffered, not streamed ** ``` -#### 4. (Optional) Interactive session +When streaming works correctly, TTFB should be sub-second while total time +stays the same (tokens arrive incrementally). -```bash -openshell sandbox connect inference-demo -# Inside the sandbox: -python examples/inference/inference.py -``` +## Standalone (no cluster) -#### 5. Cleanup +Run the sandbox binary directly with a route file — no OpenShell cluster needed: ```bash -openshell sandbox delete inference-demo -``` - -## Customizing Routes +# 1. Edit routes.yaml to point at your local LLM (e.g. LM Studio on :1234) -Edit `routes.yaml` to change which backend endpoint/model standalone mode uses. -In cluster mode, use `openshell cluster inference set` instead. +# 2. Run the sandbox with --inference-routes +navigator-sandbox \ + --inference-routes examples/local-inference/routes.yaml \ + --policy-rules \ + --policy-data examples/local-inference/sandbox-policy.yaml \ + -- python examples/local-inference/inference.py +``` ## Supported Protocols -OpenShell detects and routes the following inference API patterns: - | Pattern | Protocol | Kind | |---|---|---| | `POST /v1/chat/completions` | `openai_chat_completions` | Chat completion | diff --git a/examples/local-inference/inference.py b/examples/local-inference/inference.py index 2cf07e9b..2d04569b 100644 --- a/examples/local-inference/inference.py +++ b/examples/local-inference/inference.py @@ -1,16 +1,139 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from openai import OpenAI +"""Test inference routing through both inference.local and direct endpoint access. -client = OpenAI(api_key="dummy", base_url="https://inference.local/v1") +Exercises four scenarios to verify streaming works correctly: + 1. inference.local — non-streaming + 2. inference.local — streaming + 3. Direct NVIDIA endpoint (L7 TLS intercept) — non-streaming + 4. Direct NVIDIA endpoint (L7 TLS intercept) — streaming -response = client.chat.completions.create( - model="router", - messages=[{"role": "user", "content": "Reply with exactly: NAV_OK"}], - temperature=0, +The direct endpoint tests verify that the L7 REST relay path (relay_chunked / +relay_until_eof) streams responses incrementally, in contrast with the +inference.local interception path which previously buffered the entire body. + +Usage: + # inference.local only (no provider attached): + openshell sandbox create --policy sandbox-policy.yaml --upload inference.py \ + -- python3 /sandbox/inference.py + + # All 4 tests (attach the nvidia provider so NVIDIA_API_KEY is available): + openshell sandbox create --provider nvidia --policy sandbox-policy.yaml \ + --upload inference.py -- python3 /sandbox/inference.py +""" + +import os +import subprocess +import sys +import time + +subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "openai"]) + +from openai import OpenAI # noqa: E402 + +PROMPT = ( + "Write a 500-word essay on the history of computing, " + "from Charles Babbage's Analytical Engine to modern GPUs." ) +MESSAGES = [{"role": "user", "content": PROMPT}] + + +def run_non_streaming(client: OpenAI, label: str, model: str) -> None: + print("=" * 60) + print(f"NON-STREAMING — {label}") + print("=" * 60) + + t0 = time.monotonic() + response = client.chat.completions.create( + model=model, + messages=MESSAGES, + temperature=0, + ) + elapsed = time.monotonic() - t0 + + content = (response.choices[0].message.content or "").strip() + words = content.split() + print(f" model = {response.model}") + print(f" words = {len(words)}") + print(f" preview = {' '.join(words[:20])}...") + print(f" total = {elapsed:.2f}s") + print() + + +def run_streaming(client: OpenAI, label: str, model: str) -> None: + print("=" * 60) + print(f"STREAMING — {label}") + print("=" * 60) + + t0 = time.monotonic() + ttfb = None + chunks = [] + + stream = client.chat.completions.create( + model=model, + messages=MESSAGES, + temperature=0, + stream=True, + ) + + for chunk in stream: + if ttfb is None: + ttfb = time.monotonic() - t0 + print(f" TTFB = {ttfb:.2f}s") + + delta = chunk.choices[0].delta if chunk.choices else None + if delta and delta.content: + chunks.append(delta.content) + + elapsed = time.monotonic() - t0 + content = "".join(chunks).strip() + + words = content.split() + print(f" model = {chunk.model}") + print(f" words = {len(words)}") + print(f" preview = {' '.join(words[:20])}...") + print(f" total = {elapsed:.2f}s") + print() + + # Flag the bug: if TTFB is close to total time, response was buffered. + if ttfb and elapsed > 0.5 and ttfb > elapsed * 0.8: + print( + " ** BUG: TTFB is {:.0f}% of total time — response was buffered, not streamed **".format( + ttfb / elapsed * 100 + ) + ) + elif ttfb and ttfb < 2.0: + print(" OK: TTFB looks healthy (sub-2s)") + print() + + +DIRECT_URL = "https://integrate.api.nvidia.com/v1" +DIRECT_MODEL = "meta/llama-3.1-8b-instruct" + + +def main() -> None: + # --- inference.local tests (router injects auth + model) --- + local_client = OpenAI(api_key="dummy", base_url="https://inference.local/v1") + + run_non_streaming(local_client, "inference.local", model="router") + run_streaming(local_client, "inference.local", model="router") + + # --- Direct endpoint tests (L7 TLS intercept path) --- + # The API key is available when the sandbox is started with --provider nvidia. + api_key = os.environ.get("NVIDIA_API_KEY") + if api_key: + direct_client = OpenAI(api_key=api_key, base_url=DIRECT_URL) + + run_non_streaming(direct_client, f"direct ({DIRECT_URL})", model=DIRECT_MODEL) + run_streaming(direct_client, f"direct ({DIRECT_URL})", model=DIRECT_MODEL) + else: + print("=" * 60) + print("SKIPPED — direct endpoint tests (NVIDIA_API_KEY not set)") + print("=" * 60) + print(" Attach the nvidia provider to enable: --provider nvidia") + print() + -content = (response.choices[0].message.content or "").strip() -print(f"model={response.model}") -print(f"content={content}") +if __name__ == "__main__": + main() diff --git a/examples/local-inference/sandbox-policy.yaml b/examples/local-inference/sandbox-policy.yaml index 549a31c9..583c736f 100644 --- a/examples/local-inference/sandbox-policy.yaml +++ b/examples/local-inference/sandbox-policy.yaml @@ -25,4 +25,27 @@ process: run_as_user: sandbox run_as_group: sandbox -# No network policies means all outbound connections are denied and only inference.local is allowed. +# Allow PyPI access so pip can install dependencies inside the sandbox. +network_policies: + pypi: + name: PyPI + endpoints: + - { host: pypi.org, port: 443 } + - { host: files.pythonhosted.org, port: 443 } + binaries: + - path: /usr/bin/python3.12 + + # Direct access to the NVIDIA inference API (L7 TLS intercept). + # Used to verify that the L7 REST relay path streams responses correctly + # (contrast with the inference.local interception path above). + nvidia_direct: + name: NVIDIA API (L7 intercept) + endpoints: + - host: integrate.api.nvidia.com + port: 443 + protocol: rest + tls: terminate + enforcement: enforce + access: full + binaries: + - path: /usr/bin/python3.12