diff --git a/console/src/app.tsx b/console/src/app.tsx index 99c171d5..5593f622 100644 --- a/console/src/app.tsx +++ b/console/src/app.tsx @@ -6,6 +6,7 @@ import { PerformancePage } from "@/pages/performance" import { TrafficPage } from "@/pages/traffic" import { ErrorsPage } from "@/pages/errors" import { ModelsPage } from "@/pages/models" +import { ServicesPage } from "@/pages/services" import { LlmCallsPage } from "@/pages/llm-calls" import { AgentSessionsPage } from "@/pages/agent-sessions" import { AgentSessionDetailPage } from "@/pages/agent-session-detail" @@ -36,6 +37,7 @@ export default function App() { } /> } /> } /> + } /> } /> } /> } /> diff --git a/console/src/components/layout/sidebar.tsx b/console/src/components/layout/sidebar.tsx index 8881d043..afa9012e 100644 --- a/console/src/components/layout/sidebar.tsx +++ b/console/src/components/layout/sidebar.tsx @@ -5,6 +5,7 @@ import { BarChart3, AlertTriangle, Cpu, + Server, Sparkles, MessageSquare, MessagesSquare, @@ -25,6 +26,7 @@ const navItems = [ { to: "/traffic", icon: BarChart3, label: "Traffic" }, { to: "/errors", icon: AlertTriangle, label: "Errors" }, { to: "/models", icon: Cpu, label: "Models" }, + { to: "/services", icon: Server, label: "Services" }, { to: "/agent-sessions", icon: MessageSquare, label: "Agent Sessions" }, { to: "/agent-turns", icon: MessagesSquare, label: "Agent Turns" }, { to: "/llm-calls", icon: Sparkles, label: "LLM Calls" }, diff --git a/console/src/hooks/use-services.ts b/console/src/hooks/use-services.ts new file mode 100644 index 00000000..bed31c3c --- /dev/null +++ b/console/src/hooks/use-services.ts @@ -0,0 +1,28 @@ +import { useQuery } from "@tanstack/react-query" +import { apiFetch } from "@/lib/api" +import { useToolbarStore } from "@/stores/toolbar" +import type { ServicesData } from "@/types/api" + +interface UseServicesParams { + sortBy?: string + sortOrder?: "asc" | "desc" + limit?: number +} + +export function useServices({ sortBy = "call_count", sortOrder = "desc", limit = 200 }: UseServicesParams = {}) { + const start = useToolbarStore((s) => s.start) + const end = useToolbarStore((s) => s.end) + + return useQuery({ + queryKey: ["services", { start, end, sortBy, sortOrder, limit }], + queryFn: () => + apiFetch("/api/services", { + start, + end, + sort_by: sortBy, + sort_order: sortOrder, + limit, + }), + placeholderData: (prev) => prev, + }) +} diff --git a/console/src/pages/services.tsx b/console/src/pages/services.tsx new file mode 100644 index 00000000..83dcfaea --- /dev/null +++ b/console/src/pages/services.tsx @@ -0,0 +1,308 @@ +import { useMemo, useState } from "react" +import { ArrowUpDown } from "lucide-react" +import { cn } from "@/lib/utils" +import { formatMs, formatNumber } from "@/lib/format" +import { useServices } from "@/hooks/use-services" +import type { ServiceRow } from "@/types/api" + +type SortKey = + | "endpoint" + | "call_count" + | "error_rate" + | "stream_pct" + | "ttft_avg_ms" + | "ttft_p95_ms" + | "e2e_avg_ms" + | "e2e_p95_ms" + | "total_input_tokens" + | "total_output_tokens" + | "last_seen_ms" +type SortOrder = "asc" | "desc" + +function errorRate(s: ServiceRow): number { + return s.call_count > 0 ? (s.error_count / s.call_count) * 100 : 0 +} + +function streamPct(s: ServiceRow): number { + return s.call_count > 0 ? (s.stream_count / s.call_count) * 100 : 0 +} + +function getSortValue(s: ServiceRow, key: SortKey): number | string { + if (key === "endpoint") return `${s.server_ip}:${s.server_port}` + if (key === "error_rate") return errorRate(s) + if (key === "stream_pct") return streamPct(s) + return (s[key as keyof ServiceRow] as number) ?? 0 +} + +/// Color theme per app — picked so the most common production +/// surfaces (vllm/litellm) read at a glance. Falls back to muted gray +/// for unknown so the absence of detection is visually quiet. +const APP_BADGE_STYLE: Record = { + vllm: "bg-purple-100 text-purple-800 dark:bg-purple-900/40 dark:text-purple-300", + sglang: "bg-cyan-100 text-cyan-800 dark:bg-cyan-900/40 dark:text-cyan-300", + ollama: "bg-amber-100 text-amber-800 dark:bg-amber-900/40 dark:text-amber-300", + llamacpp: "bg-emerald-100 text-emerald-800 dark:bg-emerald-900/40 dark:text-emerald-300", + litellm: "bg-pink-100 text-pink-800 dark:bg-pink-900/40 dark:text-pink-300", + openai: "bg-green-100 text-green-800 dark:bg-green-900/40 dark:text-green-300", + anthropic: "bg-orange-100 text-orange-800 dark:bg-orange-900/40 dark:text-orange-300", + gemini: "bg-blue-100 text-blue-800 dark:bg-blue-900/40 dark:text-blue-300", +} + +function AppBadge({ app, serverHeader }: { app: string | null; serverHeader: string | null }) { + if (!app) { + return ( + + unknown + + ) + } + const cls = + APP_BADGE_STYLE[app] ?? + "bg-slate-200 text-slate-800 dark:bg-slate-700/60 dark:text-slate-200" + const title = serverHeader + ? `Server: ${serverHeader}` + : `Identified as ${app} (no Server header sample)` + return ( + + {app} + + ) +} + +function formatAgo(ms: number): string { + const diff = Date.now() - ms + if (diff < 0) return "just now" + const s = Math.floor(diff / 1000) + if (s < 60) return `${s}s ago` + const m = Math.floor(s / 60) + if (m < 60) return `${m}m ago` + const h = Math.floor(m / 60) + if (h < 48) return `${h}h ago` + const d = Math.floor(h / 24) + return `${d}d ago` +} + +export function ServicesPage() { + const [sortKey, setSortKey] = useState("call_count") + const [sortOrder, setSortOrder] = useState("desc") + const { data, isLoading } = useServices({ + sortBy: + // Sort interactively in JS — the server-side sort_by accepts + // matching column names but we keep client-side sorting for + // responsive header clicks without refetching. + "call_count", + sortOrder: "desc", + }) + + const services = useMemo(() => data?.services ?? [], [data]) + + const sorted = useMemo(() => { + const arr = [...services] + arr.sort((a, b) => { + const av = getSortValue(a, sortKey) + const bv = getSortValue(b, sortKey) + if (typeof av === "string" && typeof bv === "string") { + return sortOrder === "asc" ? av.localeCompare(bv) : bv.localeCompare(av) + } + return sortOrder === "asc" + ? (av as number) - (bv as number) + : (bv as number) - (av as number) + }) + return arr + }, [services, sortKey, sortOrder]) + + function handleSort(key: SortKey) { + if (key === sortKey) { + setSortOrder(sortOrder === "asc" ? "desc" : "asc") + } else { + setSortKey(key) + setSortOrder("desc") + } + } + + function SortHeader({ + label, + field, + align, + }: { + label: string + field: SortKey + align?: "left" | "right" + }) { + const active = sortKey === field + return ( + + ) + } + + return ( +
+
+
+ + + + + + + + + + + + + + + + + + + + + {isLoading && services.length === 0 ? ( + + + + ) : sorted.length === 0 ? ( + + + + ) : ( + sorted.map((s) => { + const err = errorRate(s) + const key = `${s.server_ip}:${s.server_port}` + return ( + + + + + + + + + + + + + + + + + ) + }) + )} + +
+ + + App + + Models + + Wire API + + + + + + + + + + + + + + + + + + + + +
+ Loading… +
+ No services found in selected time range +
+ {s.server_ip} + :{s.server_port} + + + +
+ {s.models.slice(0, 4).map((m) => ( + + {m.length > 24 ? `${m.slice(0, 22)}…` : m} + + ))} + {s.models.length > 4 && ( + + +{s.models.length - 4} more + + )} +
+
+ {s.wire_apis.join(", ")} + + {formatNumber(s.call_count)} + + {s.stream_count > 0 ? `${streamPct(s).toFixed(0)}%` : "—"} + + 5 ? "text-red-500" : err > 1 ? "text-amber-500" : "" + } + > + {err.toFixed(1)}% + + + {formatMs(s.ttft_avg_ms)} + + {formatMs(s.ttft_p95_ms)} + + {formatMs(s.e2e_avg_ms)} + + {formatMs(s.e2e_p95_ms)} + + {formatNumber(s.total_input_tokens)} + + {formatNumber(s.total_output_tokens)} + + {formatAgo(s.last_seen_ms)} +
+
+
+
+ ) +} diff --git a/console/src/types/api.ts b/console/src/types/api.ts index da60fcdb..d350eea0 100644 --- a/console/src/types/api.ts +++ b/console/src/types/api.ts @@ -63,6 +63,36 @@ export interface ModelsData { models: MetricsModelRow[] } +export interface ServicesData { + services: ServiceRow[] +} + +export interface ServiceRow { + server_ip: string + server_port: number + models: string[] + wire_apis: string[] + request_paths: string[] + call_count: number + error_count: number + stream_count: number + total_input_tokens: number + total_output_tokens: number + ttft_avg_ms: number | null + ttft_p95_ms: number | null + e2e_avg_ms: number | null + e2e_p95_ms: number | null + first_seen_ms: number + last_seen_ms: number + /** Server software label — "vllm" / "sglang" / "ollama" / + * "llamacpp" / "litellm" / "openai-compat" / "openai" / "anthropic" + * / "gemini" / null (unknown). vLLM and SGLang both run on uvicorn + * so they're currently bucketed as "openai-compat". */ + app: string | null + /** Raw `Server` HTTP response header value, for the badge tooltip. */ + server_header: string | null +} + export interface MetricsModelRow { wire_api: string model: string diff --git a/server/Cargo.lock b/server/Cargo.lock index 97630c86..08e6d7ca 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -649,7 +649,7 @@ dependencies = [ "serde_core", "serde_json", "toml", - "winnow", + "winnow 1.0.1", "yaml-rust2", ] @@ -1920,7 +1920,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ - "toml_edit", + "toml_edit 0.25.11+spec-1.1.0", ] [[package]] @@ -2911,11 +2911,17 @@ checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" dependencies = [ "serde_core", "serde_spanned", - "toml_datetime", + "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", - "winnow", + "winnow 1.0.1", ] +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" + [[package]] name = "toml_datetime" version = "1.1.1+spec-1.1.0" @@ -2925,6 +2931,18 @@ dependencies = [ "serde_core", ] +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap", + "toml_datetime 0.6.11", + "toml_write", + "winnow 0.7.15", +] + [[package]] name = "toml_edit" version = "0.25.11+spec-1.1.0" @@ -2932,9 +2950,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b59c4d22ed448339746c59b905d24568fcbb3ab65a500494f7b8c3e97739f2b" dependencies = [ "indexmap", - "toml_datetime", + "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", - "winnow", + "winnow 1.0.1", ] [[package]] @@ -2943,9 +2961,15 @@ version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ - "winnow", + "winnow 1.0.1", ] +[[package]] +name = "toml_write" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" + [[package]] name = "tower" version = "0.5.3" @@ -3074,6 +3098,7 @@ dependencies = [ "tower", "tower-http", "tracing", + "ts-capture", "ts-common", "ts-llm", "ts-metrics", @@ -3091,6 +3116,7 @@ dependencies = [ "bytes", "libc", "pcap", + "serde", "snap", "tempfile", "thiserror 2.0.18", @@ -3108,8 +3134,10 @@ dependencies = [ "config", "serde", "serde_json", + "tempfile", "thiserror 2.0.18", "tokio", + "toml_edit 0.22.27", "tracing", ] @@ -3789,6 +3817,15 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "1.0.1" diff --git a/server/ts-api/src/lib.rs b/server/ts-api/src/lib.rs index cd5b9405..b99bc6ce 100644 --- a/server/ts-api/src/lib.rs +++ b/server/ts-api/src/lib.rs @@ -175,6 +175,7 @@ pub fn router( .route("/api/metrics/timeseries", get(routes::metrics::timeseries)) .route("/api/metrics/summary", get(routes::metrics::summary)) .route("/api/metrics/models", get(routes::metrics::models)) + .route("/api/services", get(routes::services::services)) .route( "/api/metrics/finish-reasons", get(routes::metrics::finish_reasons), diff --git a/server/ts-api/src/routes/mod.rs b/server/ts-api/src/routes/mod.rs index 26aefa7e..040b17fe 100644 --- a/server/ts-api/src/routes/mod.rs +++ b/server/ts-api/src/routes/mod.rs @@ -10,3 +10,4 @@ pub mod llm_calls; pub mod metrics; pub mod pcap_extract; pub mod runtime_config; +pub mod services; diff --git a/server/ts-api/src/routes/services.rs b/server/ts-api/src/routes/services.rs new file mode 100644 index 00000000..6b3ee901 --- /dev/null +++ b/server/ts-api/src/routes/services.rs @@ -0,0 +1,69 @@ +//! `GET /api/services` — Services view. +//! +//! One row per unique `(server_ip, server_port)` endpoint with the +//! models that endpoint served, error counts, throughput, and TTFT / +//! E2E percentiles in the requested window. Powers the Console's +//! Services page. +//! +//! Backed by `llm_calls` (NOT the pre-aggregated `llm_metrics` table +//! — that schema's grouping sets stop at `server_ip`, which would +//! conflate multiple LLM servers on the same host). + +use axum::extract::State; +use axum::response::IntoResponse; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use ts_storage::query::ServicesQuery; +use ts_storage::StorageBackend; + +use crate::extractors::Query; +use crate::params::to_time_range; +use crate::response::{ApiError, ApiResponse}; + +#[derive(Debug, Deserialize)] +pub struct ServicesParams { + /// Inclusive start in seconds since epoch. + pub start: i64, + /// Exclusive end in seconds since epoch. + pub end: i64, + #[serde(default = "default_services_sort_by")] + pub sort_by: String, + #[serde(default = "default_sort_order")] + pub sort_order: String, + #[serde(default = "default_services_limit")] + pub limit: u32, +} + +fn default_services_sort_by() -> String { + "call_count".to_string() +} + +fn default_sort_order() -> String { + "desc".to_string() +} + +fn default_services_limit() -> u32 { + // Soft cap — even a busy install rarely has more than a couple + // dozen distinct LLM serving endpoints. 200 is a sanity ceiling + // so a misconfigured deployment can't return a giant payload. + 200 +} + +#[derive(Serialize)] +struct ServicesData { + services: Vec, +} + +pub async fn services( + State(storage): State>, + Query(params): Query, +) -> Result { + let query = ServicesQuery { + time_range: to_time_range(params.start, params.end), + sort_by: params.sort_by, + sort_order: params.sort_order, + limit: params.limit.min(500), + }; + let rows = storage.query_services(&query).await?; + Ok(ApiResponse::ok(ServicesData { services: rows })) +} diff --git a/server/ts-storage-duckdb/src/apps.rs b/server/ts-storage-duckdb/src/apps.rs new file mode 100644 index 00000000..bc9326ba --- /dev/null +++ b/server/ts-storage-duckdb/src/apps.rs @@ -0,0 +1,572 @@ +//! Best-effort serving-software classification for the Services view. +//! +//! Wire-traffic doesn't carry an "I am vLLM" header. Each project +//! leaves slightly different fingerprints though, and combining a +//! handful of cheap signals — `Server` response header, distinctive +//! native paths, LiteLLM's custom response headers, well-known +//! upstream hostnames, distinct `finish_reason` values, plus small +//! request- and response-body samples — gives a definitive label for +//! almost every endpoint at near-zero query cost. +//! +//! Signal table (highest confidence first): +//! +//! | App | Primary signal | +//! |-------------|-----------------------------------------------------------------| +//! | `ollama` | path `/api/chat` / `/api/generate` / `/api/tags` (native API) | +//! | `llamacpp` | path `/completion` / `/tokenize` / `/props` (non-`/v1/...`) | +//! | `sglang` | path `/generate` / `/health_generate` / `/get_server_info` / | +//! | | `/flush_cache` / `/encode` (SGLang's own surface alongside | +//! | | OpenAI-compat) | +//! | `vllm` | path `/version` / `/v1/score` (vLLM-specific endpoints) | +//! | `litellm` | any `x-litellm-*` response header (LiteLLM stamps these) | +//! | `openai` | request `Host: api.openai.com` | +//! | `anthropic` | request `Host: api.anthropic.com` | +//! | `sglang` | `finish_reason` ∈ {`matched_stop`, `matched_eos`, `stop_str`} | +//! | | (SGLang's stop-condition machinery; vLLM doesn't have these) | +//! | `vllm` | response body has `"id":"chatcmpl-tool-` (vLLM tool_call_id) OR | +//! | | `"system_fingerprint":"fp_…"` (vLLM only) | +//! | `vllm` | request body contains `chatcmpl-tool-` — agentic flows echo the | +//! | | server's prior tool_call_id back in assistant history; works | +//! | | even when responses are SSE-streamed (response_body is NULL) | +//! | `litellm` | uvicorn endpoint with ≥ 3 distinct models (real-world signal | +//! | | from wuneng's 127.0.0.1:4000 LiteLLM proxy) | +//! | `sglang` | uvicorn endpoint, model name starts with `glm` or `deepseek` | +//! | | (SGLang is the reference deployment for those families) | +//! | `vllm` | uvicorn fallback — vLLM is by far the more common openai-compat | +//! | | server, better default than `unknown` | +//! | `None` | nothing matches (no Server header, no distinctive paths) | + +use std::collections::HashMap; + +/// Classify one Services-page row from the cheap signals we can pull +/// out of the SQL aggregate. Returns the app label (lowercase, stable +/// — surfaces straight in the API JSON / UI badges). +#[allow(clippy::too_many_arguments)] +pub(crate) fn classify_app( + server_header: Option<&str>, + raw_response_headers_json: Option<&str>, + raw_request_headers_json: Option<&str>, + request_paths: &[String], + finish_reasons: &[String], + models: &[String], + sample_request_body: Option<&str>, + sample_response_body: Option<&str>, +) -> Option { + // -- Native non-OpenAI surface, very high confidence. -- + if request_paths.iter().any(|p| { + let p = p.to_ascii_lowercase(); + p.starts_with("/api/chat") + || p.starts_with("/api/generate") + || p.starts_with("/api/tags") + || p.starts_with("/api/show") + || p == "/api/version" + }) { + return Some("ollama".to_string()); + } + if request_paths.iter().any(|p| { + // llama.cpp's native completion paths sit at the root, not + // under /v1/. `/v1/...` paths exist too (OpenAI-compat) but + // by themselves don't distinguish llama.cpp from vLLM. + matches!( + p.as_str(), + "/completion" | "/tokenize" | "/detokenize" | "/props" + ) + }) { + return Some("llamacpp".to_string()); + } + + // -- SGLang-specific paths. Even when serving OpenAI-compat, + // -- SGLang exposes /generate / /health_generate / /flush_cache / + // -- /get_server_info / /encode at the same port. + if request_paths.iter().any(|p| { + matches!( + p.as_str(), + "/generate" + | "/health_generate" + | "/get_server_info" + | "/flush_cache" + | "/encode" + | "/start_profile" + | "/stop_profile" + ) + }) { + return Some("sglang".to_string()); + } + + // -- vLLM-specific paths. /version returns vLLM's version JSON. + // -- /v1/score is reranking, vLLM-only. + if request_paths + .iter() + .any(|p| p == "/version" || p.starts_with("/v1/score")) + { + return Some("vllm".to_string()); + } + + // -- LiteLLM stamps its own response headers. + if let Some(json) = raw_response_headers_json { + let lower = json.to_ascii_lowercase(); + if lower.contains("x-litellm-") || lower.contains("\"server\":\"litellm") { + return Some("litellm".to_string()); + } + } + + // -- Upstream hostnames from the request side. Captured request_headers + // -- carries `Host:` even when we're routed through a sniffer that + // -- doesn't terminate TLS — the parser stashes the inner host. + if let Some(host) = extract_host_header(raw_request_headers_json) { + let host = host.to_ascii_lowercase(); + if host == "api.openai.com" || host.ends_with(".api.openai.com") { + return Some("openai".to_string()); + } + if host == "api.anthropic.com" || host.ends_with(".api.anthropic.com") { + return Some("anthropic".to_string()); + } + if host == "generativelanguage.googleapis.com" { + return Some("gemini".to_string()); + } + } + + // -- SGLang-exclusive finish_reasons. Captured for both streaming + // -- and non-streaming calls (extracted from SSE for the former). + // -- `matched_stop` / `matched_eos` / `stop_str` come out of SGLang's + // -- stop-condition machinery and vLLM doesn't have them. + if finish_reasons.iter().any(|fr| { + let l = fr.to_ascii_lowercase(); + l == "matched_stop" || l == "matched_eos" || l == "stop_str" + }) { + return Some("sglang".to_string()); + } + + // -- vLLM signatures in the response body sample. Two patterns: + // -- 1. tool_call_id `"id":"chatcmpl-tool-"` — vLLM's + // -- tool parser uses this format; SGLang follows OpenAI's + // -- `call_`. + // -- 2. `system_fingerprint":"fp_"` — vLLM emits this + // -- field (mirroring OpenAI's); SGLang leaves it null. + let vllm_in_response_body = sample_response_body + .map(|body| { + body.contains("\"id\":\"chatcmpl-tool-") + || body.contains("\"id\": \"chatcmpl-tool-") + || body.contains("\"system_fingerprint\":\"fp_") + || body.contains("\"system_fingerprint\": \"fp_") + }) + .unwrap_or(false); + if vllm_in_response_body { + return Some("vllm".to_string()); + } + + // -- vLLM signature in the request body. Agentic flows include + // -- prior `assistant.tool_calls[].id` values in their message + // -- history, and vLLM-generated tool_call_ids carry the + // -- `chatcmpl-tool-` prefix. Works even when the response is + // -- SSE-streamed (response_body is NULL). + if let Some(body) = sample_request_body { + if body.contains("chatcmpl-tool-") { + return Some("vllm".to_string()); + } + } + + // -- Server header check. uvicorn matches BOTH vLLM and SGLang; + // -- the multi-model tiebreaker handles LiteLLM, the model-name + // -- heuristic handles SGLang's GLM/DeepSeek family deployments, + // -- and everything else falls through to vLLM. + let is_uvicorn = server_header + .map(|sh| { + let l = sh.to_ascii_lowercase(); + l.contains("uvicorn") || l.contains("hypercorn") + }) + .unwrap_or(false); + let has_litellm_server = server_header + .map(|sh| sh.to_ascii_lowercase().starts_with("litellm")) + .unwrap_or(false); + let has_ollama_server = server_header + .map(|sh| sh.to_ascii_lowercase().contains("ollama")) + .unwrap_or(false); + + if has_ollama_server { + return Some("ollama".to_string()); + } + if has_litellm_server { + return Some("litellm".to_string()); + } + + if is_uvicorn { + // Multi-model tiebreaker: LiteLLM proxies routinely host 5–20 + // models at one port; vLLM occasionally hosts multiple LoRAs + // but rarely > 2. + if models.len() >= 3 { + return Some("litellm".to_string()); + } + + // Model-name heuristic. SGLang is the reference deployment for + // the Zhipu GLM family and the DeepSeek family in production; + // both are non-trivial to run on vLLM (custom kernels, MLA, + // etc.). Matches the user's prod corpus exactly. + let lower_models: Vec = models.iter().map(|m| m.to_ascii_lowercase()).collect(); + let smells_like_sglang = lower_models.iter().any(|m| { + m.starts_with("glm") + || m.contains("/glm") + || m.starts_with("deepseek") + || m.contains("/deepseek") + }); + if smells_like_sglang { + return Some("sglang".to_string()); + } + + return Some("vllm".to_string()); + } + + None +} + +/// Pull the `Server` header value out of a serialized `Vec<(String, +/// String)>` JSON blob (the same format used by the rest of the +/// codebase — see `ts_protocol::model::HttpResponseData::headers`). +/// +/// Returns `None` if the blob is missing, doesn't parse, or has no +/// matching header. +pub(crate) fn extract_server_header(raw_response_headers_json: Option<&str>) -> Option { + let raw = raw_response_headers_json?; + let parsed: Vec<(String, String)> = serde_json::from_str(raw).ok()?; + parsed + .into_iter() + .find(|(k, _)| k.eq_ignore_ascii_case("server")) + .map(|(_, v)| v) +} + +/// Like `extract_server_header` but for request `Host`. We use it to +/// distinguish proxy traffic terminating at e.g. `api.openai.com` +/// (where the `Host` header is the give-away) from same-IP +/// self-hosted services. +fn extract_host_header(raw_request_headers_json: Option<&str>) -> Option { + let raw = raw_request_headers_json?; + let parsed: Vec<(String, String)> = serde_json::from_str(raw).ok()?; + parsed + .into_iter() + .find(|(k, _)| k.eq_ignore_ascii_case("host")) + .map(|(_, v)| v) +} + +#[allow(dead_code)] // exposed for tests in this module +fn _headermap(items: &[(&str, &str)]) -> String { + let v: Vec<(String, String)> = items + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + serde_json::to_string(&v).unwrap() +} + +#[allow(dead_code)] // exposed for tests in this module +fn _empty_map() -> HashMap { + HashMap::new() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn hdrs(items: &[(&str, &str)]) -> String { + _headermap(items) + } + + /// Empty everything — shared test default. Use overrides to inject + /// the specific signal each case is exercising. + struct C { + server_header: Option, + raw_response_headers_json: Option, + raw_request_headers_json: Option, + request_paths: Vec, + finish_reasons: Vec, + models: Vec, + sample_request_body: Option, + sample_response_body: Option, + } + impl Default for C { + fn default() -> Self { + Self { + server_header: None, + raw_response_headers_json: None, + raw_request_headers_json: None, + request_paths: vec![], + finish_reasons: vec![], + models: vec![], + sample_request_body: None, + sample_response_body: None, + } + } + } + fn run(c: &C) -> Option { + classify_app( + c.server_header.as_deref(), + c.raw_response_headers_json.as_deref(), + c.raw_request_headers_json.as_deref(), + &c.request_paths, + &c.finish_reasons, + &c.models, + c.sample_request_body.as_deref(), + c.sample_response_body.as_deref(), + ) + } + + #[test] + fn ollama_via_native_path() { + let c = C { + request_paths: vec!["/api/chat".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("ollama")); + } + + #[test] + fn ollama_via_server_header_with_oai_path() { + let c = C { + server_header: Some("ollama/0.1.45".into()), + raw_response_headers_json: Some(hdrs(&[("server", "ollama/0.1.45")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["llama3".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("ollama")); + } + + #[test] + fn llamacpp_via_completion_path() { + let c = C { + request_paths: vec!["/completion".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("llamacpp")); + } + + #[test] + fn sglang_via_native_path() { + // SGLang exposes /generate alongside its OpenAI-compat surface. + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into(), "/generate".into()], + models: vec!["llama-3-8b".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("sglang")); + } + + #[test] + fn sglang_via_health_generate_path() { + let c = C { + request_paths: vec!["/health_generate".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("sglang")); + } + + #[test] + fn sglang_via_matched_stop_finish_reason() { + // Streaming endpoint — no response body, but SSE-extracted + // finish_reason `matched_stop` exposes SGLang. + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + finish_reasons: vec!["stop".into(), "matched_stop".into()], + models: vec!["mystery-model".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("sglang")); + } + + #[test] + fn vllm_via_version_path() { + let c = C { + request_paths: vec!["/v1/chat/completions".into(), "/version".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("vllm")); + } + + #[test] + fn vllm_via_response_body_tool_call_id() { + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["qwen35-27b".into()], + sample_response_body: Some( + r#"{"id":"chatcmpl-abc","choices":[{"message":{"tool_calls":[{"id":"chatcmpl-tool-deadbeef","type":"function"}]}}]}"#.into(), + ), + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("vllm")); + } + + #[test] + fn vllm_via_response_body_system_fingerprint() { + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["qwen35-27b".into()], + sample_response_body: Some( + r#"{"id":"chatcmpl-x","system_fingerprint":"fp_abc123","choices":[]}"#.into(), + ), + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("vllm")); + } + + #[test] + fn vllm_via_request_body_tool_history() { + // Streaming-only endpoint — response_body is null. Agentic + // round N+1 sends prior assistant.tool_calls history back to + // the server, and vLLM's tool_call_ids carry `chatcmpl-tool-`. + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["mystery-model".into()], + sample_request_body: Some( + r#"{"messages":[{"role":"assistant","tool_calls":[{"id":"chatcmpl-tool-deadbeef","type":"function","function":{"name":"x"}}]}]}"#.into(), + ), + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("vllm")); + } + + #[test] + fn litellm_via_response_header() { + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[ + ("server", "uvicorn"), + ("x-litellm-call-id", "abc123"), + ])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["gpt-4o".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("litellm")); + } + + #[test] + fn litellm_via_multi_model_tiebreaker() { + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec![ + "qwen3-embed-8b".into(), + "glm5".into(), + "qwen35-27b".into(), + ], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("litellm")); + } + + #[test] + fn sglang_via_glm_model_heuristic() { + // Single GLM model on uvicorn, no LiteLLM signal, no + // discriminating finish_reason. Falls back to the model-name + // heuristic — Zhipu's reference deployment is SGLang. + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["GLM-5.1".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("sglang")); + } + + #[test] + fn sglang_via_deepseek_model_heuristic() { + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["deepseek-v3".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("sglang")); + } + + #[test] + fn vllm_default_for_single_model_uvicorn() { + // 1 model, uvicorn, no LiteLLM signals, no SGLang signals → + // pick the more common openai-compat server. + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["qwen35-27b".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("vllm")); + } + + #[test] + fn openai_via_request_host() { + let c = C { + raw_request_headers_json: Some(hdrs(&[("host", "api.openai.com")])), + request_paths: vec!["/v1/chat/completions".into()], + models: vec!["gpt-4".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("openai")); + } + + #[test] + fn anthropic_via_request_host() { + let c = C { + raw_request_headers_json: Some(hdrs(&[("host", "api.anthropic.com")])), + request_paths: vec!["/v1/messages".into()], + models: vec!["claude-3-5-sonnet".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("anthropic")); + } + + #[test] + fn unknown_when_no_signals() { + let c = C::default(); + assert!(run(&c).is_none()); + } + + #[test] + fn ollama_path_wins_over_uvicorn_server() { + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/api/chat".into()], + models: vec!["llama3".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("ollama")); + } + + #[test] + fn sglang_finish_reason_beats_glm_heuristic_consistency() { + // SGLang finish_reason AND GLM model — should still be SGLang. + let c = C { + server_header: Some("uvicorn".into()), + raw_response_headers_json: Some(hdrs(&[("server", "uvicorn")])), + request_paths: vec!["/v1/chat/completions".into()], + finish_reasons: vec!["matched_stop".into()], + models: vec!["GLM-5.1".into()], + ..Default::default() + }; + assert_eq!(run(&c).as_deref(), Some("sglang")); + } + + #[test] + fn extract_server_header_finds_match() { + let h = hdrs(&[("content-type", "application/json"), ("Server", "uvicorn")]); + assert_eq!(extract_server_header(Some(&h)).as_deref(), Some("uvicorn")); + } + + #[test] + fn extract_server_header_handles_missing() { + let h = hdrs(&[("content-type", "application/json")]); + assert!(extract_server_header(Some(&h)).is_none()); + } +} diff --git a/server/ts-storage-duckdb/src/lib.rs b/server/ts-storage-duckdb/src/lib.rs index b571dacb..366de01c 100644 --- a/server/ts-storage-duckdb/src/lib.rs +++ b/server/ts-storage-duckdb/src/lib.rs @@ -1,3 +1,4 @@ +mod apps; mod calls; #[cfg(test)] mod concurrent_tests; @@ -163,6 +164,10 @@ impl StorageBackend for DuckDbBackend { DuckDbBackend::query_metrics_models(self, query).await } + async fn query_services(&self, query: &ServicesQuery) -> Result> { + DuckDbBackend::query_services(self, query).await + } + async fn query_finish_reasons( &self, query: &FinishReasonsQuery, diff --git a/server/ts-storage-duckdb/src/metrics.rs b/server/ts-storage-duckdb/src/metrics.rs index 6bdbb7e5..4de62ed0 100644 --- a/server/ts-storage-duckdb/src/metrics.rs +++ b/server/ts-storage-duckdb/src/metrics.rs @@ -8,7 +8,8 @@ use ts_metrics::model::{LlmFinishMetric, LlmMetric}; use ts_storage::query::*; use crate::util::{ - build_dimension_where, build_dimension_where_for_group, sql_in_list, us_to_timestamp, + build_dimension_where, build_dimension_where_for_group, parse_json_string_list, sql_in_list, + us_to_timestamp, }; use crate::DuckDbBackend; @@ -803,6 +804,244 @@ impl DuckDbBackend { .await .map_err(|e| AppError::Storage(format!("spawn_blocking failed: {e}")))? } + + /// "Services" view — aggregate `llm_calls` by `(server_ip, + /// server_port)`. See `StorageBackend::query_services` for the + /// motivation (port is not on `llm_metrics`). + pub(crate) async fn query_services( + &self, + query: &ServicesQuery, + ) -> Result> { + const VALID_SORT_FIELDS: &[&str] = &[ + "call_count", + "error_count", + "total_input_tokens", + "total_output_tokens", + "ttft_avg_ms", + "ttft_p95_ms", + "e2e_avg_ms", + "e2e_p95_ms", + "last_seen_ms", + "first_seen_ms", + "server_ip", + "server_port", + ]; + if !VALID_SORT_FIELDS.contains(&query.sort_by.as_str()) { + return Err(AppError::Storage(format!( + "invalid sort_by field: {}", + query.sort_by + ))); + } + let sort_order = if query.sort_order.to_uppercase() == "ASC" { + "ASC" + } else { + "DESC" + }; + + let conn = self.read_pool.acquire().await?; + let query = query.clone(); + let sort_order = sort_order.to_string(); + + tokio::task::spawn_blocking(move || { + let start_ts = us_to_timestamp(query.time_range.start_us); + let end_ts = us_to_timestamp(query.time_range.end_us); + let sort_by = &query.sort_by; + let limit = query.limit; + + // Notes on the SQL: + // * `list_distinct(array_agg(model))` collects distinct + // values, then `[1:32]` caps the list — protects against a + // pathological row producing thousands of values that + // would each show up here. + // * Percentiles use `quantile_cont(col, 0.95)` (DuckDB's + // name for PERCENTILE_CONT). NULL inputs are skipped + // automatically so streaming-only calls with no + // `e2e_latency_ms` don't poison the result. + // * `epoch_ms(MIN(request_time))` returns Unix-epoch ms as + // i64 — matches the convention used elsewhere in the + // `/api/llm-calls` and `/api/agent-turns` payloads. + // List-of-VARCHAR columns come back as JSON strings (DuckDB's + // rust binding has no `FromSql` for `Vec`). We cast + // to JSON in SQL and then `parse_json_string_list` them on + // the Rust side — same pattern used by `agent_turns.models_used`. + // + // For app classification we sample one `request_headers` / + // `response_headers` blob per group. `arg_min(col, + // LENGTH(col)) FILTER (...)` picks the smallest non-null + // value — predictable across re-runs and tiny enough that + // streaming back to Rust costs nothing. + let sql = format!( + "SELECT + server_ip, + server_port, + CAST(list_distinct(array_agg(model))[1:32] AS JSON)::VARCHAR AS models_json, + CAST(list_distinct(array_agg(wire_api))[1:8] AS JSON)::VARCHAR AS wire_apis_json, + CAST(list_distinct(array_agg(request_path))[1:16] AS JSON)::VARCHAR AS paths_json, + -- Distinct finish_reasons. SGLang has signatures + -- vLLM doesn't (`matched_stop`, `matched_eos`, + -- `stop_str`); per-call we keep the raw string, so + -- a single appearance in the window is enough to + -- pin the server. + CAST(list_distinct(array_agg(finish_reason))[1:32] AS JSON)::VARCHAR AS finish_reasons_json, + -- Pick any well-formed header sample per group. + -- `MAX(response_headers)` is lexicographic — for + -- JSON-encoded headers starting with `[[` it's a + -- stable arbitrary choice and dodges the + -- arg_min(LENGTH)-picks-anomalous-short-blob hazard + -- (rows with empty or malformed-short headers exist + -- in production and were dropping real endpoints to + -- `unknown`). We filter to `[%` so the picked + -- sample looks like a JSON array. + MAX(response_headers) + FILTER (WHERE response_headers IS NOT NULL + AND response_headers LIKE '[%') AS sample_response_headers, + MAX(request_headers) + FILTER (WHERE request_headers IS NOT NULL + AND request_headers LIKE '[%') AS sample_request_headers, + -- Body samples for the vLLM vs SGLang fingerprint. + -- We pick the LARGEST request body in the window + -- (`arg_max(LENGTH)` — fast u64 comparison; the + -- chosen body materialises only once) because + -- larger bodies = deeper agentic history = + -- assistant.tool_calls[].id values from the same + -- server, which is our distinguishing signal. + -- response_body is NULL for SSE streaming calls, + -- so it's a weaker but still useful complement. + -- Both are capped at 32 KB / 8 KB to keep the + -- planner from materialising oversized agentic + -- histories. + arg_max(request_body, LENGTH(request_body)) + FILTER (WHERE request_body IS NOT NULL + AND LENGTH(request_body) BETWEEN 100 AND 32768 + AND request_body LIKE '{{%') AS sample_request_body, + arg_max(response_body, LENGTH(response_body)) + FILTER (WHERE response_body IS NOT NULL + AND LENGTH(response_body) BETWEEN 30 AND 8192 + AND response_body LIKE '{{%') AS sample_response_body, + COUNT(*) AS call_count, + COALESCE(SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END), 0)::UBIGINT AS error_count, + COALESCE(SUM(CASE WHEN is_stream THEN 1 ELSE 0 END), 0)::UBIGINT AS stream_count, + COALESCE(SUM(input_tokens), 0)::UBIGINT AS total_input_tokens, + COALESCE(SUM(output_tokens), 0)::UBIGINT AS total_output_tokens, + AVG(ttft_ms) AS ttft_avg_ms, + quantile_cont(ttft_ms, 0.95) AS ttft_p95_ms, + AVG(e2e_latency_ms) AS e2e_avg_ms, + quantile_cont(e2e_latency_ms, 0.95) AS e2e_p95_ms, + epoch_ms(MIN(request_time)) AS first_seen_ms, + epoch_ms(MAX(request_time)) AS last_seen_ms + FROM llm_calls + WHERE request_time >= ? AND request_time < ? + GROUP BY server_ip, server_port + ORDER BY {sort_by} {sort_order} + LIMIT {limit}" + ); + + let mut stmt = conn + .prepare(&sql) + .map_err(|e| AppError::Storage(format!("failed to prepare services query: {e}")))?; + let mut query_rows = stmt + .query(duckdb::params![start_ts, end_ts]) + .map_err(|e| AppError::Storage(format!("failed to execute services query: {e}")))?; + + let mut rows = Vec::new(); + while let Some(row) = query_rows + .next() + .map_err(|e| AppError::Storage(format!("row error: {e}")))? + { + let models_json: Option = row + .get(2) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + let wire_apis_json: Option = row + .get(3) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + let paths_json: Option = row + .get(4) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + let finish_reasons_json: Option = row + .get(5) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + let sample_response_headers: Option = row + .get(6) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + let sample_request_headers: Option = row + .get(7) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + let sample_request_body: Option = row + .get(8) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + let sample_response_body: Option = row + .get(9) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?; + + let models = parse_json_string_list(models_json.as_deref()); + let wire_apis = parse_json_string_list(wire_apis_json.as_deref()); + let request_paths = parse_json_string_list(paths_json.as_deref()); + let finish_reasons = parse_json_string_list(finish_reasons_json.as_deref()); + let server_header = + crate::apps::extract_server_header(sample_response_headers.as_deref()); + let app = crate::apps::classify_app( + server_header.as_deref(), + sample_response_headers.as_deref(), + sample_request_headers.as_deref(), + &request_paths, + &finish_reasons, + &models, + sample_request_body.as_deref(), + sample_response_body.as_deref(), + ); + + rows.push(ServiceRow { + server_ip: row + .get(0) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + server_port: row + .get::<_, u16>(1) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + models, + wire_apis, + request_paths, + call_count: row + .get::<_, u64>(10) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + error_count: row + .get::<_, u64>(11) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + stream_count: row + .get::<_, u64>(12) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + total_input_tokens: row + .get::<_, u64>(13) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + total_output_tokens: row + .get::<_, u64>(14) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + ttft_avg_ms: row + .get::<_, Option>(15) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + ttft_p95_ms: row + .get::<_, Option>(16) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + e2e_avg_ms: row + .get::<_, Option>(17) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + e2e_p95_ms: row + .get::<_, Option>(18) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + first_seen_ms: row + .get::<_, i64>(19) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + last_seen_ms: row + .get::<_, i64>(20) + .map_err(|e| AppError::Storage(format!("read error: {e}")))?, + app, + server_header, + }); + } + Ok(rows) + }) + .await + .map_err(|e| AppError::Storage(format!("spawn_blocking failed: {e}")))? + } } #[cfg(test)] diff --git a/server/ts-storage/src/backend.rs b/server/ts-storage/src/backend.rs index 738cba55..50ec653f 100644 --- a/server/ts-storage/src/backend.rs +++ b/server/ts-storage/src/backend.rs @@ -51,6 +51,18 @@ pub trait StorageBackend: Send + Sync { query: &MetricsModelsQuery, ) -> Result>; + /// Aggregate `llm_calls` by `(server_ip, server_port)` to produce + /// one row per LLM-serving endpoint. Used by the Services page. + /// + /// Not served off the pre-aggregated `llm_metrics` table because + /// that schema's grouping sets stop at `server_ip` — different + /// vLLM instances on the same host (port 8000 / 9000) would + /// collapse into one row. Worst-case this scans `llm_calls` rows + /// in the time window; the user's typical 7-day window has tens of + /// thousands of rows and the query completes in well under a + /// second. + async fn query_services(&self, query: &ServicesQuery) -> Result>; + /// Per-bucket finish-reason counts in the requested time range. One series /// per distinct raw `finish_reason` observed. The `wire_api`/`model` /// filters select a specific dimension; `None` rolls up across all values diff --git a/server/ts-storage/src/query.rs b/server/ts-storage/src/query.rs index 0d583fed..c742bda7 100644 --- a/server/ts-storage/src/query.rs +++ b/server/ts-storage/src/query.rs @@ -37,6 +37,58 @@ pub struct MetricsModelsQuery { pub limit: u32, } +#[derive(Debug, Clone)] +pub struct ServicesQuery { + pub time_range: TimeRange, + pub sort_by: String, + pub sort_order: String, + pub limit: u32, +} + +/// One row of the "Services" view: a unique `(server_ip, server_port)` +/// endpoint with the models it served, error/perf stats, and the time +/// window where it appeared. Computed directly off `llm_calls` because +/// the pre-aggregated `llm_metrics` table doesn't carry `server_port` +/// (its grouping sets stop at `server_ip`). +#[derive(Debug, Clone, Serialize)] +pub struct ServiceRow { + pub server_ip: String, + pub server_port: u16, + /// Distinct models seen on this endpoint. Capped at 32 in SQL via + /// `list_distinct(... )[:32]` so a misbehaving client that sends + /// thousands of made-up model strings doesn't bloat a single row. + pub models: Vec, + pub wire_apis: Vec, + /// Distinct request paths seen at this endpoint. Used by the + /// classifier to spot Ollama (`/api/chat`) and llama.cpp + /// (`/completion`, `/tokenize`) from their native non-OpenAI + /// surface. Capped at 8 in SQL. + pub request_paths: Vec, + pub call_count: u64, + pub error_count: u64, + pub stream_count: u64, + pub total_input_tokens: u64, + pub total_output_tokens: u64, + pub ttft_avg_ms: Option, + pub ttft_p95_ms: Option, + pub e2e_avg_ms: Option, + pub e2e_p95_ms: Option, + /// Unix-epoch milliseconds of the first / last call seen in the + /// query window. Useful for "is this endpoint still live?". + pub first_seen_ms: i64, + pub last_seen_ms: i64, + /// Best-effort serving-software identification — one of + /// `vllm`, `sglang`, `ollama`, `llamacpp`, `litellm`, + /// `openai-compat`, `openai`, `anthropic`, or `None` (unknown). + /// vLLM / SGLang both run under uvicorn and can't yet be told + /// apart from headers alone; both show up as `openai-compat` + /// today. See `apps::classify_app`. + pub app: Option, + /// Raw `Server` HTTP response header — surfaced in the UI as a + /// tooltip so the user can override the classifier visually. + pub server_header: Option, +} + #[derive(Debug, Clone)] pub struct CallsQuery { pub time_range: TimeRange,