Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions examples/ai-fixtures/cluster-health.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: mission-control-investigate
description: |
Get cluster health
prompt: |
Tell me about the general health of the kubernetes cluster, whats happening and what changed recently
baseline: direct
repeat: 1
captureKubernetesProxy: true
mcpProxy:
capture: true
headers:
Accept: text/markdown

defaults:
timeout: 3m
permissionMode: bypassPermissions
noSessionPersistence: true
model: claude-sonnet-4-6

runs:
- name: direct
promptCaching: false
tools: [Bash, Read]
allowedTools:
- Bash(kubectl *)
- Bash(aws *)
- Bash(curl *)
- Read

- name: mission-control
promptCaching: true
tools: [default]
mcpConfig:
- .mcp.json
allowedTools:
- mcp__mission-control__*
115 changes: 115 additions & 0 deletions pkg/ai/fixture/aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// ABOUTME: Per-Run statistics accumulator — adds Summary records over N iterations and finalizes means.

package fixture

import "math"

type aggregate struct {
N int
Success bool
Error string
SessionID string
DurationMean float64
DurationStdev float64
CostMean float64

Input int
Output int
CacheRead int
CacheWrite int

ToolCalls int
MCPCalls int
BashCalls int
KubectlCalls int
KubectlAPICalls int
MCPAPICalls int

KubectlAPILog []KubectlAPIEntry
MCPAPILog []MCPAPIEntry
ToolCallLog []ToolCallEntry
ToolCounts map[string]int

durations []float64
costs []float64
}

func (a *aggregate) add(s Summary) {
a.N++
a.durations = append(a.durations, s.DurationMS)
a.costs = append(a.costs, s.CostUSD)
if s.SessionID != "" {
a.SessionID = s.SessionID
}
if s.Error != "" && a.Error == "" {
a.Error = s.Error
}
if s.Success {
a.Success = true
}
a.Input += s.Input
a.Output += s.Output
a.CacheRead += s.CacheRead
a.CacheWrite += s.CacheWrite
a.ToolCalls += s.ToolCalls
a.MCPCalls += s.MCPCalls
a.BashCalls += s.BashCalls
a.KubectlCalls += s.KubectlCalls
a.KubectlAPICalls += s.KubectlAPICalls
a.KubectlAPILog = append(a.KubectlAPILog, s.KubectlAPILog...)
a.MCPAPICalls += s.MCPAPICalls
a.MCPAPILog = append(a.MCPAPILog, s.MCPAPILog...)
a.ToolCallLog = append(a.ToolCallLog, s.ToolCallLog...)
if a.ToolCounts == nil {
a.ToolCounts = map[string]int{}
}
for k, v := range s.ToolCounts {
a.ToolCounts[k] += v
}
}

func (a *aggregate) finalize() {
if a.N == 0 {
return
}
a.DurationMean = mean(a.durations)
a.DurationStdev = stdev(a.durations, a.DurationMean)
a.CostMean = mean(a.costs)
// Per-iteration averages for token/tool aggregate counts.
a.Input /= a.N
a.Output /= a.N
a.CacheRead /= a.N
a.CacheWrite /= a.N
a.ToolCalls /= a.N
a.MCPCalls /= a.N
a.BashCalls /= a.N
a.KubectlCalls /= a.N
a.KubectlAPICalls /= a.N
a.MCPAPICalls /= a.N
for k, v := range a.ToolCounts {
a.ToolCounts[k] = v / a.N
}
}

func mean(xs []float64) float64 {
if len(xs) == 0 {
return 0
}
var s float64
for _, x := range xs {
s += x
}
return s / float64(len(xs))
}

func stdev(xs []float64, m float64) float64 {
if len(xs) < 2 {
return 0
}
var s float64
for _, x := range xs {
d := x - m
s += d * d
}
return math.Sqrt(s / float64(len(xs)-1))
}
105 changes: 105 additions & 0 deletions pkg/ai/fixture/api_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// ABOUTME: Parsers for the JSONL files produced by kubeproxy and mcpproxy.
// ABOUTME: Tool calls themselves come from the live stream-json output, not from these.

package fixture

import (
"bufio"
"bytes"
"encoding/json"
"os"
"time"
)

// scanJSONL streams a JSONL file and feeds each line to decode. decode returns
// (parsed, true) when the line should be appended to the result, or (_, false)
// when it should be skipped. Returns an empty slice when the file is missing
// or unreadable — callers treat that as "no log available."
func scanJSONL[T any](path string, decode func([]byte) (T, bool)) []T {
data, err := os.ReadFile(path)
if err != nil {
return nil
}
scanner := bufio.NewScanner(bytes.NewReader(data))
scanner.Buffer(make([]byte, jsonlScannerInitialBuf), jsonlScannerMaxBuf)
var out []T
for scanner.Scan() {
v, ok := decode(scanner.Bytes())
if !ok {
continue
}
out = append(out, v)
}
return out
}

// readKubectlAPILog parses a kubeproxy JSONL file into chronological API entries.
func readKubectlAPILog(path string) []KubectlAPIEntry {
return scanJSONL(path, decodeKubectlEntry)
}

func decodeKubectlEntry(line []byte) (KubectlAPIEntry, bool) {
var ev struct {
Type string `json:"type"`
Time time.Time `json:"time"`
Method string `json:"method"`
Path string `json:"path"`
Query string `json:"query"`
Status int `json:"status"`
Duration string `json:"duration"`
Bytes int64 `json:"bytes"`
}
if err := json.Unmarshal(line, &ev); err != nil || ev.Type != "request" {
return KubectlAPIEntry{}, false
}
return KubectlAPIEntry{
Time: ev.Time,
Method: ev.Method,
URL: joinURL(ev.Path, ev.Query),
Status: ev.Status,
Duration: ev.Duration,
Bytes: ev.Bytes,
}, true
}

// readMCPAPILog parses an mcpproxy JSONL file into chronological API entries.
func readMCPAPILog(path string) []MCPAPIEntry {
return scanJSONL(path, decodeMCPEntry)
}

func decodeMCPEntry(line []byte) (MCPAPIEntry, bool) {
var ev struct {
Type string `json:"type"`
Time time.Time `json:"time"`
Server string `json:"server"`
Method string `json:"method"`
Path string `json:"path"`
Query string `json:"query"`
Status int `json:"status"`
Duration string `json:"duration"`
Bytes int64 `json:"bytes"`
RPCMethod string `json:"rpcMethod"`
Tool string `json:"tool"`
}
if err := json.Unmarshal(line, &ev); err != nil || ev.Type != "request" {
return MCPAPIEntry{}, false
}
return MCPAPIEntry{
Time: ev.Time,
Server: ev.Server,
Method: ev.Method,
URL: joinURL(ev.Path, ev.Query),
RPCMethod: ev.RPCMethod,
Tool: ev.Tool,
Status: ev.Status,
Duration: ev.Duration,
Bytes: ev.Bytes,
}, true
}

func joinURL(path, query string) string {
if query == "" {
return path
}
return path + "?" + query
}
59 changes: 59 additions & 0 deletions pkg/ai/fixture/cost.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// ABOUTME: Cost resolution and money/byte formatters.
// ABOUTME: Falls back to OpenRouter pricing when the claude CLI doesn't report a cost.

package fixture

import (
"fmt"

"github.com/flanksource/captain/pkg/ai/pricing"
)

// resolveCost returns the per-iteration cost for a row. If the underlying
// claude CLI didn't report a cost (CostMean == 0) we fall back to the
// OpenRouter-backed pricing registry to estimate from the token counts.
// Returns (cost, estimated) — estimated is true when the value is from the
// fallback path.
func resolveCost(model string, a *aggregate) (float64, bool) {
if a.CostMean > 0 {
return a.CostMean, false
}
if a.Input == 0 && a.Output == 0 && a.CacheRead == 0 && a.CacheWrite == 0 {
return 0, false
}
res, err := pricing.CalculateCost(model, a.Input, a.Output, 0, a.CacheRead, a.CacheWrite)
if err != nil {
return 0, false
}
return res.TotalCost, true
}

func formatCostWithEstimate(v float64, estimated bool) string {
s := formatUSD(v)
if estimated && v > 0 {
return s + " (est)"
}
return s
}

func formatUSD(v float64) string {
if v == 0 {
return "$0"
}
return fmt.Sprintf("$%.4f", v)
}

// humanBytes renders a byte count with KB/MB/GB/... suffixes.
func humanBytes(n int64) string {
const unit = 1024
if n < unit {
return fmt.Sprintf("%d B", n)
}
div, exp := int64(unit), 0
for x := n / unit; x >= unit; x /= unit {
div *= unit
exp++
}
suffix := "KMGTPE"[exp]
return fmt.Sprintf("%.1f %cB", float64(n)/float64(div), suffix)
}
71 changes: 71 additions & 0 deletions pkg/ai/fixture/cost_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package fixture

import "testing"

func TestResolveCost_UsesReportedWhenPresent(t *testing.T) {
a := &aggregate{CostMean: 0.42, Input: 100, Output: 200}
got, est := resolveCost("claude-sonnet-4-6", a)
if got != 0.42 || est {
t.Errorf("got %v, est=%v, want 0.42, false", got, est)
}
}

func TestResolveCost_FallbackZeroWhenNoTokens(t *testing.T) {
a := &aggregate{}
got, est := resolveCost("claude-sonnet-4-6", a)
if got != 0 || est {
t.Errorf("got %v, est=%v, want 0, false", got, est)
}
}

func TestResolveCost_FallbackComputesFromTokens(t *testing.T) {
// claude-sonnet-4-6 pricing: $3/M input, $15/M output. Cache R/W: $0.30 / $3.75 per M.
a := &aggregate{Input: 1_000_000, Output: 1_000_000}
got, est := resolveCost("claude-sonnet-4-6", a)
if !est {
t.Fatal("estimated should be true on fallback")
}
want := 3.0 + 15.0
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}

func TestResolveCost_FallbackUnknownModelReturnsZero(t *testing.T) {
a := &aggregate{Input: 100, Output: 100}
got, est := resolveCost("definitely-not-a-real-model-zzz", a)
if got != 0 || est {
t.Errorf("got %v, est=%v, want 0, false (model unknown → no estimate)", got, est)
}
}

func TestFormatCostWithEstimate(t *testing.T) {
if got := formatCostWithEstimate(0, false); got != "$0" {
t.Errorf("zero cost: got %q", got)
}
if got := formatCostWithEstimate(0.0234, false); got != "$0.0234" {
t.Errorf("real cost: got %q", got)
}
if got := formatCostWithEstimate(0.0234, true); got != "$0.0234 (est)" {
t.Errorf("estimated cost: got %q", got)
}
if got := formatCostWithEstimate(0, true); got != "$0" {
t.Errorf("zero estimated should not show (est) suffix: got %q", got)
}
}

func TestHumanBytes(t *testing.T) {
cases := map[int64]string{
0: "0 B",
512: "512 B",
1024: "1.0 KB",
1024 * 1024: "1.0 MB",
1536: "1.5 KB",
1024 * 1024 * 5: "5.0 MB",
}
for in, want := range cases {
if got := humanBytes(in); got != want {
t.Errorf("humanBytes(%d) = %q, want %q", in, got, want)
}
}
}
Loading
Loading