diff --git a/src/agent/__tests__/judge-query.test.ts b/src/agent/__tests__/judge-query.test.ts index 6de3135..73a63fb 100644 --- a/src/agent/__tests__/judge-query.test.ts +++ b/src/agent/__tests__/judge-query.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from "bun:test"; import { z } from "zod/v4"; -import { __absorbUsageForTest, parseJsonFromResponse } from "../judge-query.ts"; +import { __absorbUsageForTest, buildSystemPrompt, parseJsonFromResponse } from "../judge-query.ts"; // parseJsonFromResponse is the shape-normalization layer for judge subprocess output. // Models sometimes return markdown fences, leading prose, or trailing whitespace even @@ -117,6 +117,27 @@ describe("parseJsonFromResponse", () => { expect(partial.costUsd).toBe(0); }); + test("buildSystemPrompt returns the claude_code preset envelope by default", () => { + const prompt = buildSystemPrompt("evaluate this", false); + expect(typeof prompt).toBe("object"); + if (typeof prompt === "object") { + expect(prompt.type).toBe("preset"); + expect(prompt.preset).toBe("claude_code"); + expect(prompt.append).toBe("evaluate this"); + } + }); + + test("buildSystemPrompt returns a plain string when omitPreset is true", () => { + // This is the C3 fix shape: the SDK accepts a plain-string systemPrompt + // per its docstring ("Use a custom system prompt") and that path skips + // the preset's base prompt and full tool catalog. The gate is the only + // caller that flips this on, because it is a pure pass/skip evaluation + // with no tool use. Other judges keep the preset. + const prompt = buildSystemPrompt("evaluate this", true); + expect(typeof prompt).toBe("string"); + expect(prompt).toBe("evaluate this"); + }); + test("parses nested structures", () => { const Nested = z.object({ flags: z.array(z.object({ category: z.string(), severity: z.enum(["critical", "warning", "info"]) })), diff --git a/src/agent/judge-query.ts b/src/agent/judge-query.ts index f7d65b1..64b7654 100644 --- a/src/agent/judge-query.ts +++ b/src/agent/judge-query.ts @@ -16,6 +16,17 @@ export type JudgeQueryOptions = { schema: z.ZodType; model?: string; maxTokens?: number; + /** + * When true, the SDK is invoked with a plain-string `systemPrompt` instead + * of the `claude_code` preset envelope. The preset bundles the full Claude + * Code base prompt and tool catalog, which costs thousands of input tokens + * per call. Pure evaluation calls that do not use any tools (the + * conditional firing gate is the canonical example) should set this so + * Haiku only sees the judge prompt and the session summary, not the entire + * Claude Code surface area. Other judges that legitimately want the preset + * keep the default. + */ + omitPreset?: boolean; }; export type JudgeQueryResult = { @@ -142,17 +153,15 @@ export async function runJudgeQuery( // whenever ANTHROPIC_API_KEY happened to be set in the shell. const providerEnv = buildProviderEnv(config); + const systemPrompt = buildSystemPrompt(judgePrompt, options.omitPreset === true); + const queryStream = query({ prompt: options.userMessage, options: { model: resolvedModel, permissionMode: "bypassPermissions", allowDangerouslySkipPermissions: true, - systemPrompt: { - type: "preset" as const, - preset: "claude_code" as const, - append: judgePrompt, - }, + systemPrompt, maxTurns: 1, effort: "low", persistSession: false, @@ -308,6 +317,29 @@ export function __absorbUsageForTest( return partial; } +/** + * Compose the SDK `systemPrompt` value. The default path keeps the + * `claude_code` preset envelope so judges designed against the Claude Code + * tool catalog continue to receive the full base prompt. When the caller + * sets `omitPreset`, the SDK is handed a plain string instead, which the + * SDK documents as "Use a custom system prompt" and which skips the preset's + * base prompt and tool descriptions entirely. The gate uses this to drop + * two orders of magnitude of input tokens per call. + */ +export function buildSystemPrompt( + judgePrompt: string, + omitPreset: boolean, +): string | { type: "preset"; preset: "claude_code"; append: string } { + if (omitPreset) { + return judgePrompt; + } + return { + type: "preset" as const, + preset: "claude_code" as const, + append: judgePrompt, + }; +} + function buildJudgePrompt(systemPrompt: string, schemaJson: unknown): string { return [ systemPrompt, diff --git a/src/evolution/__tests__/cadence.test.ts b/src/evolution/__tests__/cadence.test.ts index 1d196fc..2b55e88 100644 --- a/src/evolution/__tests__/cadence.test.ts +++ b/src/evolution/__tests__/cadence.test.ts @@ -42,9 +42,15 @@ function newDb(): Database { } function makeSummary(overrides: Partial = {}): SessionSummary { + const sessionId = overrides.session_id ?? "s1"; + // Default the session_key off the session_id so each fixture row gets a + // distinct key. The queue dedups by session_key, so two enqueues with the + // same key collapse into a single row, which is correct production + // behavior but breaks any test that uses session_id alone to distinguish + // rows. return { - session_id: "s1", - session_key: "slack:C1:T1", + session_id: sessionId, + session_key: `slack:C-${sessionId}:T-${sessionId}`, user_id: "u1", user_messages: ["help"], assistant_messages: ["ok"], @@ -297,6 +303,42 @@ describe("EvolutionCadence", () => { } }); + test("failed pipeline rows stay in the queue while ok rows are deleted", async () => { + const config = setupEnv(); + const db = newDb(); + const queue = new EvolutionQueue(db); + let calls = 0; + const engine = fakeEngine({ + onRun: async (session) => { + calls += 1; + if (session.session_id === "fail-me") { + throw new Error("simulated transient pipeline failure"); + } + return { version: 1, changes_applied: [], changes_rejected: [] }; + }, + }); + const cadence = new EvolutionCadence(engine, queue, config, { cadenceMinutes: 1_000_000, demandTriggerDepth: 999 }); + cadence.start(); + try { + // Distinct session_keys so the dedup-on-enqueue from M3 does not + // collapse them into a single row. + queue.enqueue(makeSummary({ session_id: "ok-1", session_key: "slack:C1:T1" }), DECISION); + queue.enqueue(makeSummary({ session_id: "fail-me", session_key: "slack:C2:T2" }), DECISION); + queue.enqueue(makeSummary({ session_id: "ok-2", session_key: "slack:C3:T3" }), DECISION); + const result = await cadence.triggerNow(); + expect(result?.processed).toBe(3); + expect(result?.failureCount).toBe(1); + expect(calls).toBe(3); + + // Only the failing row remains; the two ok rows were deleted. + const remaining = queue.drainAll(); + expect(remaining).toHaveLength(1); + expect(remaining[0].session_id).toBe("fail-me"); + } finally { + cadence.stop(); + } + }); + test("queue_stats counters increment after each drain", async () => { const config = setupEnv(); const db = newDb(); diff --git a/src/evolution/__tests__/engine.test.ts b/src/evolution/__tests__/engine.test.ts index c72edb7..ec39dcb 100644 --- a/src/evolution/__tests__/engine.test.ts +++ b/src/evolution/__tests__/engine.test.ts @@ -201,17 +201,14 @@ describe("EvolutionEngine", () => { const session = makeSession({ outcome: "success" }); await engine.afterSession(session); - // Phase 0 M4: `updateAfterSession` is called twice on the normal run - // path. Once at the top of `afterSession` so the mutex-skip path also - // updates counters (without that, dashboards undercount during bursts), - // and again inside `runCycle` with the real `hadCorrections` value. - // Both calls increment `session_count`, so the normal path double-counts - // by one. This is the tradeoff the Phase 0 reviewer explicitly accepted - // because Phase 2 replaces the drop-on-floor mutex with a real queue - // and removes this bookkeeping entirely. + // Phase 1+2 collapsed `updateAfterSession` to a single call inside + // `runCycle`, after observation extraction supplies the real + // `hadCorrections` value. The Phase 0 M4 increments at the top of + // `afterSessionInternal` and inside `enqueueIfWorthy` were obsolete + // once the persistent queue replaced the drop-on-floor mutex path. const metrics = engine.getMetrics(); - expect(metrics.session_count).toBe(2); - expect(metrics.success_count).toBe(2); + expect(metrics.session_count).toBe(1); + expect(metrics.success_count).toBe(1); }); test("constitution violation is rejected", async () => { @@ -249,6 +246,34 @@ describe("EvolutionEngine", () => { expect(metrics.rollback_count).toBe(1); }); + test("rollback fires the runtime refresh callback with the rolled-back state", async () => { + // Codex finding on PR #63: the apply path refreshes the runtime via + // `notifyConfigApplied` but the auto-rollback branch was reverting disk + // state without firing the callback, leaving the runtime serving the + // rolled-forward snapshot. The fix wires the same callback into + // `rollback()` so any path that mutates disk state also refreshes the + // runtime. `getConfig()` re-reads from disk on every call, so the + // callback observes the post-rollback content. + const engine = new EvolutionEngine(CONFIG_PATH); + const refreshes: Array<{ version: number; userProfile: string }> = []; + engine.setOnConfigApplied(() => { + const config = engine.getConfig(); + refreshes.push({ version: engine.getCurrentVersion(), userProfile: config.userProfile }); + }); + + await engine.afterSession(makeSession({ user_messages: ["No, use TypeScript not JavaScript"] })); + expect(refreshes.length).toBe(1); + expect(refreshes[0].version).toBeGreaterThan(0); + expect(refreshes[0].userProfile).toContain("TypeScript"); + + engine.rollback(0); + // Two callback invocations now: one from the original apply, one from + // the rollback. The second sees the version-0 state on disk. + expect(refreshes.length).toBe(2); + expect(refreshes[1].version).toBe(0); + expect(refreshes[1].userProfile).not.toContain("TypeScript"); + }); + test("preference is detected and applied", async () => { const engine = new EvolutionEngine(CONFIG_PATH); const session = makeSession({ @@ -261,6 +286,46 @@ describe("EvolutionEngine", () => { expect(userProfile.toLowerCase()).toContain("vim"); }); + test("setOnConfigApplied fires after a successful afterSession that applies changes", async () => { + // C1 contract: the engine owns the runtime refresh callback and fires + // it from the apply path. Production wiring in src/index.ts uses this + // exact setter to refresh the runtime's evolvedConfig snapshot from + // disk. Test against the engine directly so a future refactor that + // breaks the callback shape fails here, not at boot time on a VM. + const engine = new EvolutionEngine(CONFIG_PATH); + const versions: number[] = []; + engine.setOnConfigApplied(() => { + versions.push(engine.getCurrentVersion()); + }); + await engine.afterSession(makeSession({ user_messages: ["No, use TypeScript not JavaScript"] })); + expect(versions.length).toBeGreaterThanOrEqual(1); + expect(versions.at(-1)).toBe(engine.getCurrentVersion()); + }); + + test("setOnConfigApplied does not fire when no changes are applied", async () => { + const engine = new EvolutionEngine(CONFIG_PATH); + let calls = 0; + engine.setOnConfigApplied(() => { + calls += 1; + }); + // A neutral session with no correction signals applies zero changes. + await engine.afterSession(makeSession({ user_messages: ["What time is it?"] })); + expect(calls).toBe(0); + }); + + test("setOnConfigApplied callback errors do not wedge the pipeline", async () => { + // A telemetry/refresh failure in the callback must not poison the + // evolution result. The engine swallows and warns, the pipeline + // returns normally, and the next applied change retries the refresh. + const engine = new EvolutionEngine(CONFIG_PATH); + engine.setOnConfigApplied(() => { + throw new Error("simulated runtime refresh failure"); + }); + const result = await engine.afterSession(makeSession({ user_messages: ["No, use TypeScript not JavaScript"] })); + expect(result.changes_applied.length).toBeGreaterThan(0); + expect(result.version).toBeGreaterThan(0); + }); + test("evolved config is available in getConfig after changes", async () => { const engine = new EvolutionEngine(CONFIG_PATH); const session = makeSession({ diff --git a/src/evolution/__tests__/gate.test.ts b/src/evolution/__tests__/gate.test.ts index 70c199f..e61ed33 100644 --- a/src/evolution/__tests__/gate.test.ts +++ b/src/evolution/__tests__/gate.test.ts @@ -65,6 +65,7 @@ type FakeRuntime = { schema: unknown; model?: string; maxTokens?: number; + omitPreset?: boolean; }) => Promise; }; @@ -93,6 +94,41 @@ describe("decideGate Haiku happy path", () => { expect(decision.reason).toContain("naming"); }); + test("gate forwards omitPreset=true so judgeQuery skips the claude_code preset envelope", async () => { + // The gate is a pure pass/skip evaluation that never reads files or + // runs tools, so it must opt out of the `claude_code` system prompt + // preset that bundles the full Claude Code base prompt and tool + // catalog. Live fleet data showed gate cost running 20-180x the + // research target until this flag was wired through. Asserting the + // flag at the call site is the durable defense against a future + // refactor accidentally re-introducing the preset overhead. + const captured: Array<{ omitPreset?: boolean; model?: string; maxTokens?: number }> = []; + const runtime: FakeRuntime = { + judgeQuery: async (options) => { + captured.push({ + omitPreset: options.omitPreset, + model: options.model, + maxTokens: options.maxTokens, + }); + return { + verdict: "pass" as const, + confidence: 0.9, + reasoning: "", + data: { evolve: false, reason: "routine" }, + model: "claude-haiku-4-5", + inputTokens: 420, + outputTokens: 28, + costUsd: 0.0005, + durationMs: 900, + }; + }, + }; + await decideGate(makeSession(), runtime as unknown as AgentRuntime); + expect(captured).toHaveLength(1); + expect(captured[0].omitPreset).toBe(true); + expect(captured[0].maxTokens).toBe(200); + }); + test("evolve=false returns skip source=haiku", async () => { const runtime: FakeRuntime = { judgeQuery: async () => ({ diff --git a/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts b/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts index d5c28cc..8777803 100644 --- a/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts +++ b/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { mkdirSync, rmSync, writeFileSync } from "node:fs"; import { JudgeSubprocessError } from "../../agent/judge-query.ts"; import type { AgentRuntime } from "../../agent/runtime.ts"; import type { PhantomConfig } from "../../config/types.ts"; @@ -356,11 +356,17 @@ describe("Phase 0 mutex guard", () => { } }); - test("mutex skip path still bumps session_count so dashboards do not undercount", async () => { - // M4: without this the normal-vs-skip paths diverge and operators - // watching session_count in the dashboard see undercounting during - // bursts. The skip path now updates session metrics with - // hadCorrections=false at the top of afterSession. + test("mutex skip path returns a skipped result without spawning a new cycle", async () => { + // Phase 1+2 collapsed `updateAfterSession` to a single call inside + // `runCycle`, after observation extraction supplies the real + // `hadCorrections` value. The previous Phase 0 M4 top-of-afterSession + // increment was load-bearing only while the mutex could permanently + // drop a session; the persistent queue replaces that drop-on-floor + // path so an enqueued session eventually reaches `runCycle` and the + // counter increments exactly once. This test pins the new shape: the + // active cycle still holds the mutex, the second call sees the guard + // and returns a skipped result with zero applied changes, and no + // extra runCycle is launched on top of the hanging one. type Releaser = (reason: unknown) => void; const releaseHangRef: { fn: Releaser | null } = { fn: null }; const hang = new Promise((_resolve, reject) => { @@ -376,15 +382,12 @@ describe("Phase 0 mutex guard", () => { const callA = engine.afterSession(makeSession({ session_id: "active" })); await Promise.resolve(); await Promise.resolve(); - await engine.afterSession(makeSession({ session_id: "skip-1" })); - await engine.afterSession(makeSession({ session_id: "skip-2" })); - - const metricsPath = `${TEST_DIR}/phantom-config/meta/metrics.json`; - const metrics = JSON.parse(readFileSync(metricsPath, "utf-8")); - // Three afterSession calls, three counter increments from the top-of- - // afterSession update. The active call is still hanging so its inner - // updateAfterSession has not run yet, so we assert exactly 3. - expect(metrics.session_count).toBe(3); + const skipResult1 = await engine.afterSession(makeSession({ session_id: "skip-1" })); + const skipResult2 = await engine.afterSession(makeSession({ session_id: "skip-2" })); + + expect(skipResult1.changes_applied).toHaveLength(0); + expect(skipResult2.changes_applied).toHaveLength(0); + expect((engine as unknown as { activeCycleSkipCount: number }).activeCycleSkipCount).toBe(2); releaseHangRef.fn?.(new Error("released by test")); try { diff --git a/src/evolution/__tests__/phase-1-2-integration.test.ts b/src/evolution/__tests__/phase-1-2-integration.test.ts new file mode 100644 index 0000000..6d31f04 --- /dev/null +++ b/src/evolution/__tests__/phase-1-2-integration.test.ts @@ -0,0 +1,347 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdirSync, rmSync, writeFileSync } from "node:fs"; +import type { AgentRuntime } from "../../agent/runtime.ts"; +import { MIGRATIONS } from "../../db/schema.ts"; +import { EvolutionCadence } from "../cadence.ts"; +import { EvolutionEngine } from "../engine.ts"; +import { EvolutionQueue } from "../queue.ts"; +import type { SessionSummary } from "../types.ts"; + +// Phase 1+2 end-to-end integration. Wires the REAL EvolutionEngine, REAL +// EvolutionQueue over an in-memory SQLite, and REAL EvolutionCadence with a +// short cadence window. Only the AgentRuntime is stubbed, and only enough to +// answer `judgeQuery` (the gate Haiku call) and `getPhantomConfig` (the judge +// mode resolver). Every coordination bug in the Phase 1+2 fix punch list is +// catchable here: +// +// - C1: runtime.setEvolvedConfig is called after the drain applies changes +// - C2: a session whose pipeline throws stays in the queue +// - M1: metrics.session_count is 1 per drained session, not 3 +// - M3: dedup-by-session-key on the enqueue side +// +// One test per assertion keeps the failure message specific. They share a +// fixture builder so the surface area stays small. + +const TEST_DIR = "/tmp/phantom-test-phase-1-2-integration"; +const CONFIG_PATH = `${TEST_DIR}/config/evolution.yaml`; + +function setupTestEnvironment(): void { + mkdirSync(`${TEST_DIR}/config`, { recursive: true }); + mkdirSync(`${TEST_DIR}/phantom-config/meta`, { recursive: true }); + mkdirSync(`${TEST_DIR}/phantom-config/strategies`, { recursive: true }); + mkdirSync(`${TEST_DIR}/phantom-config/memory`, { recursive: true }); + + writeFileSync( + CONFIG_PATH, + [ + "cadence:", + " reflection_interval: 1", + " consolidation_interval: 10", + "gates:", + " drift_threshold: 0.7", + " max_file_lines: 200", + " auto_rollback_threshold: 0.1", + " auto_rollback_window: 5", + "reflection:", + ' model: "claude-sonnet-4-20250514"', + "judges:", + // `never` so the engine takes the heuristic observation path. We + // only need a stub runtime for the gate; the rest of the pipeline + // runs deterministic TypeScript code. + ' enabled: "never"', + "paths:", + ` config_dir: "${TEST_DIR}/phantom-config"`, + ` constitution: "${TEST_DIR}/phantom-config/constitution.md"`, + ` version_file: "${TEST_DIR}/phantom-config/meta/version.json"`, + ` metrics_file: "${TEST_DIR}/phantom-config/meta/metrics.json"`, + ` evolution_log: "${TEST_DIR}/phantom-config/meta/evolution-log.jsonl"`, + ` golden_suite: "${TEST_DIR}/phantom-config/meta/golden-suite.jsonl"`, + ` session_log: "${TEST_DIR}/phantom-config/memory/session-log.jsonl"`, + ].join("\n"), + "utf-8", + ); + + writeFileSync( + `${TEST_DIR}/phantom-config/constitution.md`, + [ + "# Phantom Constitution", + "", + "1. Honesty: Never deceive the user.", + "2. Safety: Never execute harmful commands.", + "3. Privacy: Never share user data.", + "4. Transparency: No hidden changes.", + "5. Boundaries: You are not a person.", + "6. Accountability: Every change is logged.", + "7. Consent: Do not modify the constitution.", + "8. Proportionality: Minimal changes.", + ].join("\n"), + "utf-8", + ); + writeFileSync(`${TEST_DIR}/phantom-config/persona.md`, "# Persona\n\n- Be direct.\n", "utf-8"); + writeFileSync( + `${TEST_DIR}/phantom-config/user-profile.md`, + "# User Profile\n\nPreferences learned from interactions.\n", + "utf-8", + ); + writeFileSync(`${TEST_DIR}/phantom-config/domain-knowledge.md`, "# Domain Knowledge\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/strategies/task-patterns.md`, "# Task Patterns\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/strategies/tool-preferences.md`, "# Tool Preferences\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/strategies/error-recovery.md`, "# Error Recovery\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/memory/session-log.jsonl`, "", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/memory/principles.md`, "# Principles\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/memory/corrections.md`, "# Corrections\n", "utf-8"); + + writeFileSync( + `${TEST_DIR}/phantom-config/meta/version.json`, + JSON.stringify({ + version: 0, + parent: null, + timestamp: "2026-03-25T00:00:00Z", + changes: [], + metrics_at_change: { session_count: 0, success_rate_7d: 0, correction_rate_7d: 0 }, + }), + "utf-8", + ); + writeFileSync( + `${TEST_DIR}/phantom-config/meta/metrics.json`, + JSON.stringify({ + session_count: 0, + success_count: 0, + failure_count: 0, + correction_count: 0, + evolution_count: 0, + rollback_count: 0, + last_session_at: null, + last_evolution_at: null, + success_rate_7d: 0, + correction_rate_7d: 0, + sessions_since_consolidation: 0, + }), + "utf-8", + ); + writeFileSync(`${TEST_DIR}/phantom-config/meta/evolution-log.jsonl`, "", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/meta/golden-suite.jsonl`, "", "utf-8"); +} + +function newDb(): Database { + const db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + for (const stmt of MIGRATIONS) db.run(stmt); + return db; +} + +type StubRuntime = { + judgeQuery: (options: { + systemPrompt: string; + userMessage: string; + schema: unknown; + model?: string; + maxTokens?: number; + omitPreset?: boolean; + }) => Promise; +}; + +function fireGateRuntime(): StubRuntime { + return { + judgeQuery: async () => ({ + verdict: "pass" as const, + confidence: 0.9, + reasoning: "", + data: { evolve: true, reason: "user taught a workflow" }, + model: "claude-haiku-4-5", + inputTokens: 320, + outputTokens: 28, + costUsd: 0.0006, + durationMs: 800, + }), + }; +} + +function fireWorthySession(overrides: Partial = {}): SessionSummary { + // The heuristic observation extractor in `reflection.ts` looks for + // correction-shaped phrases in user messages ("no, use X not Y"). Using + // such a message guarantees the engine actually applies a change to + // `user-profile.md`, which is what makes the C1 callback fire and what + // lets the integration test assert against real applied state. + return { + session_id: "integ-1", + session_key: "slack:Cint:Tint", + user_id: "user-int", + user_messages: ["No, always use TypeScript not JavaScript"], + assistant_messages: ["Got it"], + tools_used: [], + files_tracked: [], + outcome: "success", + cost_usd: 0.05, + started_at: "2026-04-14T10:00:00Z", + ended_at: "2026-04-14T10:01:00Z", + ...overrides, + }; +} + +describe("phase 1+2 integration", () => { + beforeEach(() => setupTestEnvironment()); + afterEach(() => rmSync(TEST_DIR, { recursive: true, force: true })); + + test("end-to-end drain wires every coordination concern", async () => { + const db = newDb(); + const runtime = fireGateRuntime(); + const engine = new EvolutionEngine(CONFIG_PATH, runtime as unknown as AgentRuntime); + const queue = new EvolutionQueue(db); + const cadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), { + cadenceMinutes: 1_000_000, + demandTriggerDepth: 999, + }); + + // C1 sentinel: count callback invocations and capture each version we + // see at refresh time. The production wiring in `src/index.ts` uses + // this same setter to refresh `runtime.evolvedConfig` from disk. + const refreshes: number[] = []; + engine.setOnConfigApplied(() => { + refreshes.push(engine.getCurrentVersion()); + }); + engine.setQueueWiring(queue, () => cadence.onEnqueue()); + + cadence.start(); + try { + const result = await engine.enqueueIfWorthy(fireWorthySession()); + expect(result.enqueued).toBe(true); + expect(result.decision.fire).toBe(true); + expect(queue.depth()).toBe(1); + + const drainResult = await cadence.triggerNow(); + expect(drainResult).not.toBeNull(); + expect(drainResult?.processed).toBe(1); + expect(drainResult?.successCount).toBe(1); + + // C1: the drain produced an applied change, so the refresh + // callback fired exactly once with the new version. + expect(refreshes.length).toBeGreaterThanOrEqual(1); + expect(refreshes.at(-1)).toBe(engine.getCurrentVersion()); + expect(engine.getCurrentVersion()).toBeGreaterThan(0); + + // M1: a single fired session bumps `session_count` exactly once. + // Pre-fix this was 3 because `updateAfterSession` was called from + // `enqueueIfWorthy`, the top of `afterSessionInternal`, and inside + // `runCycle`. Post-fix only the runCycle call survives. + const metrics = engine.getMetrics(); + expect(metrics.session_count).toBe(1); + expect(metrics.success_count).toBe(1); + + // Queue is empty after a successful drain. + expect(queue.depth()).toBe(0); + } finally { + cadence.stop(); + } + }); + + test("a pipeline failure leaves the row in the queue for the next drain (C2)", async () => { + const db = newDb(); + const runtime = fireGateRuntime(); + const engine = new EvolutionEngine(CONFIG_PATH, runtime as unknown as AgentRuntime); + const queue = new EvolutionQueue(db); + const cadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), { + cadenceMinutes: 1_000_000, + demandTriggerDepth: 999, + }); + + // Force the engine pipeline to throw on the first drain. We patch + // `runSingleSessionPipeline` directly on the instance so the cadence's + // processBatch loop sees the failure shape it would see from a real + // JudgeSubprocessError or a `CycleAborted` rethrow that was not caught. + let throwOnce = true; + const original = engine.runSingleSessionPipeline.bind(engine); + engine.runSingleSessionPipeline = async (session: SessionSummary) => { + if (throwOnce) { + throwOnce = false; + throw new Error("simulated transient pipeline failure"); + } + return original(session); + }; + + engine.setQueueWiring(queue, () => cadence.onEnqueue()); + cadence.start(); + try { + await engine.enqueueIfWorthy(fireWorthySession({ session_id: "fail-then-ok" })); + expect(queue.depth()).toBe(1); + + const firstDrain = await cadence.triggerNow(); + expect(firstDrain?.failureCount).toBe(1); + // The failed row stays in the queue for the next drain instead of + // being silently deleted by markProcessed. + expect(queue.depth()).toBe(1); + + // Second drain succeeds and clears the row. + const secondDrain = await cadence.triggerNow(); + expect(secondDrain?.successCount).toBe(1); + expect(queue.depth()).toBe(0); + } finally { + cadence.stop(); + } + }); + + test("retried session_key is counted exactly once across drains", async () => { + // Codex finding on PR #63: the C2 fix preserves a failed row in the + // queue, so a transient pipeline failure followed by a retry would + // re-enter `runCycle` under the same `session_key` and double-count + // `session_count`. The dedup guard in `runCycle` caps the increment at + // one per unique `session_key` for the engine's lifetime. + const db = newDb(); + const runtime = fireGateRuntime(); + const engine = new EvolutionEngine(CONFIG_PATH, runtime as unknown as AgentRuntime); + const queue = new EvolutionQueue(db); + const cadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), { + cadenceMinutes: 1_000_000, + demandTriggerDepth: 999, + }); + + let throwOnce = true; + const original = engine.runSingleSessionPipeline.bind(engine); + engine.runSingleSessionPipeline = async (session: SessionSummary) => { + if (throwOnce) { + throwOnce = false; + throw new Error("simulated transient pipeline failure"); + } + return original(session); + }; + + engine.setQueueWiring(queue, () => cadence.onEnqueue()); + cadence.start(); + try { + await engine.enqueueIfWorthy(fireWorthySession({ session_id: "retry-once" })); + + const firstDrain = await cadence.triggerNow(); + expect(firstDrain?.failureCount).toBe(1); + // The first drain threw before reaching `updateAfterSession`, so + // `session_count` is still 0. The row is preserved for retry. + expect(engine.getMetrics().session_count).toBe(0); + expect(queue.depth()).toBe(1); + + const secondDrain = await cadence.triggerNow(); + expect(secondDrain?.successCount).toBe(1); + // The retry is the first call that reaches `updateAfterSession`, + // so it claims the dedup slot for this `session_key` and increments + // exactly once. + expect(engine.getMetrics().session_count).toBe(1); + + // A subsequent enqueue under the SAME `session_key` must not + // increment again. Use a different `session_id` (which is what the + // queue dedup also uses to distinguish turns within a conversation) + // so the row is fresh but the dedup key is the same. + await engine.enqueueIfWorthy(fireWorthySession({ session_id: "retry-once-followup" })); + await cadence.triggerNow(); + expect(engine.getMetrics().session_count).toBe(1); + + // A new `session_key` is counted normally, proving the guard does + // not over-suppress. + await engine.enqueueIfWorthy( + fireWorthySession({ session_id: "other-session", session_key: "slack:Cint:Tother" }), + ); + await cadence.triggerNow(); + expect(engine.getMetrics().session_count).toBe(2); + } finally { + cadence.stop(); + } + }); +}); diff --git a/src/evolution/__tests__/queue.test.ts b/src/evolution/__tests__/queue.test.ts index 10042e3..f6e031a 100644 --- a/src/evolution/__tests__/queue.test.ts +++ b/src/evolution/__tests__/queue.test.ts @@ -67,9 +67,9 @@ describe("EvolutionQueue", () => { test("drainAll returns rows oldest-first", () => { const queue = new EvolutionQueue(db); - queue.enqueue(makeSummary({ session_id: "a" }), DECISION); - queue.enqueue(makeSummary({ session_id: "b" }), DECISION); - queue.enqueue(makeSummary({ session_id: "c" }), DECISION); + queue.enqueue(makeSummary({ session_id: "a", session_key: "slack:Ca:Ta" }), DECISION); + queue.enqueue(makeSummary({ session_id: "b", session_key: "slack:Cb:Tb" }), DECISION); + queue.enqueue(makeSummary({ session_id: "c", session_key: "slack:Cc:Tc" }), DECISION); const drained = queue.drainAll(); expect(drained).toHaveLength(3); expect(drained[0].session_id).toBe("a"); @@ -84,9 +84,9 @@ describe("EvolutionQueue", () => { test("markProcessed deletes only the specified ids", () => { const queue = new EvolutionQueue(db); - queue.enqueue(makeSummary({ session_id: "a" }), DECISION); - queue.enqueue(makeSummary({ session_id: "b" }), DECISION); - queue.enqueue(makeSummary({ session_id: "c" }), DECISION); + queue.enqueue(makeSummary({ session_id: "a", session_key: "slack:Ca:Ta" }), DECISION); + queue.enqueue(makeSummary({ session_id: "b", session_key: "slack:Cb:Tb" }), DECISION); + queue.enqueue(makeSummary({ session_id: "c", session_key: "slack:Cc:Tc" }), DECISION); const drained = queue.drainAll(); queue.markProcessed([drained[0].id, drained[2].id]); const remaining = queue.drainAll(); @@ -130,6 +130,50 @@ describe("EvolutionQueue", () => { } }); + test("enqueueing the same session_key twice keeps only the latest summary", () => { + const queue = new EvolutionQueue(db); + queue.enqueue( + makeSummary({ + session_id: "turn-3", + session_key: "slack:C1:T1", + user_messages: ["short turn 3"], + }), + DECISION, + ); + queue.enqueue( + makeSummary({ + session_id: "turn-15", + session_key: "slack:C1:T1", + user_messages: ["full conversation up through turn 15"], + cost_usd: 0.42, + }), + DECISION, + ); + expect(queue.depth()).toBe(1); + const drained = queue.drainAll(); + expect(drained).toHaveLength(1); + // The most recent enqueue wins. Without dedup, a busy multi-turn + // session would burn the full Sonnet judge pipeline once per turn that + // crossed the gate, against progressively shorter snapshots of the + // same conversation. + expect(drained[0].session_id).toBe("turn-15"); + expect(drained[0].session_summary.user_messages).toEqual(["full conversation up through turn 15"]); + expect(drained[0].session_summary.cost_usd).toBeCloseTo(0.42, 5); + }); + + test("dedup is scoped per session_key, not per session_id", () => { + const queue = new EvolutionQueue(db); + queue.enqueue(makeSummary({ session_id: "a", session_key: "slack:C1:T1" }), DECISION); + queue.enqueue(makeSummary({ session_id: "b", session_key: "slack:C2:T2" }), DECISION); + queue.enqueue(makeSummary({ session_id: "c", session_key: "slack:C1:T1" }), DECISION); + expect(queue.depth()).toBe(2); + const drained = queue.drainAll(); + const keys = drained.map((d) => d.session_key).sort(); + expect(keys).toEqual(["slack:C1:T1", "slack:C2:T2"]); + const c1Row = drained.find((d) => d.session_key === "slack:C1:T1"); + expect(c1Row?.session_id).toBe("c"); + }); + test("clear truncates the queue", () => { const queue = new EvolutionQueue(db); queue.enqueue(makeSummary(), DECISION); diff --git a/src/evolution/cadence.ts b/src/evolution/cadence.ts index 01d4598..b0999ba 100644 --- a/src/evolution/cadence.ts +++ b/src/evolution/cadence.ts @@ -165,7 +165,24 @@ export class EvolutionCadence { `[evolution] draining batch of ${queued.length} sessions (trigger=${trigger}, cadence=${this.cadenceConfig.cadenceMinutes}min)`, ); const result = await processBatch(queued, this.engine); - this.queue.markProcessed(queued.map((q) => q.id)); + + // Only delete rows whose pipeline succeeded. Failed rows stay in the + // queue so the next drain retries them; deleting on failure would + // silently drop the exact sessions the safety floor exists to protect + // (transient judge subprocess errors, network blips, CycleAborted from + // the Phase 0 failure ceiling). A future PR will add a `failure_count` + // column and a poison-pill ceiling so a single bad row cannot loop + // forever, but the dedup-by-session_key on enqueue already bounds the + // loss for repeated sessions. + const okIds: number[] = []; + for (const entry of result.results) { + if (entry.ok) { + okIds.push(entry.id); + } else { + console.warn(`[evolution] queue row id=${entry.id} pipeline failed, leaving in queue: ${entry.error}`); + } + } + this.queue.markProcessed(okIds); const appliedCount = result.results.reduce((sum, r) => { if (r.ok) return sum + r.result.changes_applied.length; diff --git a/src/evolution/engine.ts b/src/evolution/engine.ts index dc36ce6..baa81ad 100644 --- a/src/evolution/engine.ts +++ b/src/evolution/engine.ts @@ -62,10 +62,26 @@ export class EvolutionEngine { private activeCycleSessionId: string | null = null; private activeCycleSkipCount = 0; + // Dedup set for the M1 single-count contract under the C2 retry path. + // A row that fails mid-cycle stays in the queue and re-enters runCycle + // under the same `session_key`; without this guard, `session_count` would + // be incremented once per retry. Bounded by unique session_keys ever seen + // by this engine instance (low thousands per year). Process restart clears + // it, which means a row pending retry across a restart double-counts on + // its first post-restart drain. Acceptable per-restart blip. + private countedSessionKeys = new Set(); + // Phase 1 + 2 wiring. The queue is optional so engine unit tests that do // not care about batching can still construct a bare engine. private queue: EvolutionQueue | null = null; private onEnqueue: (() => void) | null = null; + // Fires after a cycle (including a partial-apply CycleAborted recovery) + // applies at least one change to disk. Wired in `src/index.ts` so the + // AgentRuntime in-memory `evolvedConfig` snapshot refreshes whenever the + // queue drain produces new state. Without this, the queued path would + // rewrite `phantom-config/` files but the live agent would keep using its + // boot-time snapshot until the process restarts. + private onConfigApplied: (() => void) | null = null; // `runtime` is optional so existing tests and heuristic-only deployments can // construct an engine without wiring a full AgentRuntime. When the engine @@ -97,6 +113,17 @@ export class EvolutionEngine { this.onEnqueue = onEnqueue; } + /** + * Register a callback that fires whenever a cycle applies at least one + * change to disk. Wired from `src/index.ts` to refresh the AgentRuntime's + * in-memory evolved config snapshot. Matches the setter shape used by + * `setQueueWiring` so the engine still owns the lifecycle and the cadence + * stays ignorant of the runtime. + */ + setOnConfigApplied(callback: () => void): void { + this.onConfigApplied = callback; + } + private resolveJudgeMode(): boolean { const setting = this.config.judges?.enabled ?? "auto"; if (setting === "never") return false; @@ -164,7 +191,6 @@ export class EvolutionEngine { * serializing gate calls through the mutex would defeat the purpose. */ async enqueueIfWorthy(session: SessionSummary): Promise { - updateAfterSession(this.config, session.outcome, false); const decision = await decideGate(session, this.runtime); appendGateLog(this.config, session, decision); recordGateDecision(this.config, decision); @@ -213,17 +239,12 @@ export class EvolutionEngine { } private async afterSessionInternal(session: SessionSummary): Promise { - // Always bump the session counter so the dashboard's `session_count` - // reflects every turn that arrived, including skipped ones. On the skip - // path we pass `hadCorrections=false` because no observation extraction - // has run yet. The normal path also calls `updateAfterSession` inside - // `runCycle` with the real `hadCorrections` value after observations - // are extracted, so the correction signal remains accurate. The small - // double-count on `session_count` during a running cycle is accepted: - // Phase 2 replaces the drop-on-floor model with a real cadence queue - // and this bookkeeping goes away. - updateAfterSession(this.config, session.outcome, false); - + // Phase 2 replaced the drop-on-floor mutex with a persistent queue, so + // every session that crosses the gate eventually reaches `runCycle`, + // which is the single place `session_count` is incremented (with the + // real `hadCorrections` value after observation extraction). The Phase + // 0 M4 increment that used to live here was load-bearing only while + // the mutex could permanently drop a session. if (this.activeCycle !== null) { this.activeCycleSkipCount += 1; const activeId = this.activeCycleSessionId ?? "unknown"; @@ -268,9 +289,14 @@ export class EvolutionEngine { observations = extractObservations(session); } - // Step 0: Update session metrics (after extraction so hadCorrections uses observation results) + // Step 0: Update session metrics (after extraction so hadCorrections uses observation results). + // Guarded by `countedSessionKeys` so a C2 retry of the same session_key + // does not double-count. const hadCorrections = observations.some((o) => o.type === "correction"); - updateAfterSession(this.config, session.outcome, hadCorrections); + if (!this.countedSessionKeys.has(session.session_key)) { + this.countedSessionKeys.add(session.session_key); + updateAfterSession(this.config, session.outcome, hadCorrections); + } if (observations.length === 0) { return { version: this.getCurrentVersion(), changes_applied: [], changes_rejected: [] }; @@ -341,6 +367,7 @@ export class EvolutionEngine { `[evolution] Partial apply: ${partialApplied.length} changes applied ` + `(v${this.getCurrentVersion()}) after cycle abort`, ); + this.notifyConfigApplied(); } if (partialRejected.length > 0) { console.log(`[evolution] Partial apply: ${partialRejected.length} changes rejected after cycle abort`); @@ -371,6 +398,7 @@ export class EvolutionEngine { console.log( `[evolution] Applied ${applied.length} changes (v${this.getCurrentVersion()}) in ${Date.now() - startTime}ms`, ); + this.notifyConfigApplied(); // Promote successful corrections to golden suite if (session.outcome === "success" && hadCorrections) { @@ -494,6 +522,12 @@ export class EvolutionEngine { versionRollback(this.config, toVersion); updateAfterRollback(this.config); console.log(`[evolution] Rolled back to version ${toVersion}`); + // The auto-rollback branch in `runCycle` reverts disk state without + // otherwise touching the runtime; without this refresh the agent keeps + // serving the now-reverted version's snapshot until process restart. + // `getConfig()` re-reads from disk on every call, so the callback picks + // up the rolled-back content. + this.notifyConfigApplied(); } private resetDailyCostIfNewDay(): void { @@ -529,6 +563,19 @@ export class EvolutionEngine { } } + private notifyConfigApplied(): void { + if (!this.onConfigApplied) return; + try { + this.onConfigApplied(); + } catch (err: unknown) { + // A telemetry/refresh failure must not wedge the evolution pipeline. + // The next applied change will retry, and the disk-side state is + // already correct regardless of whether the callback succeeded. + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[evolution] onConfigApplied callback threw: ${msg}`); + } + } + private recordJudgeCosts(costs: JudgeCosts): void { const metricsPath = this.config.paths.metrics_file; try { diff --git a/src/evolution/gate.ts b/src/evolution/gate.ts index 481fd3e..b5c1d59 100644 --- a/src/evolution/gate.ts +++ b/src/evolution/gate.ts @@ -95,6 +95,12 @@ export async function decideGate(session: SessionSummary, runtime: AgentRuntime schema: GateJudgeResult, model: JUDGE_MODEL_HAIKU, maxTokens: 200, + // The gate is a pure pass/skip evaluation. It never reads files, + // runs commands, or calls tools. Skipping the `claude_code` preset + // drops the per-call input token count from thousands to a few + // hundred, which is what the research cost target assumed and what + // observed fleet spend (20-180x target) exposed as broken. + omitPreset: true, }); const evolve = result.data.evolve === true; return { diff --git a/src/evolution/queue.ts b/src/evolution/queue.ts index a34b6f1..646122d 100644 --- a/src/evolution/queue.ts +++ b/src/evolution/queue.ts @@ -61,18 +61,26 @@ export class EvolutionQueue { constructor(private db: Database) {} /** - * Insert one row per gate-approved session. Called from - * `engine.enqueueIfWorthy` after `decideGate` returns `fire=true`. - * Silent on duplicates: a busy multi-turn session can fire multiple - * times and each turn legitimately wants its own row because the - * batch processor ingests the latest summary when the row is drained. + * Insert one row per gate-approved session, latest summary wins for any + * given `session_key`. A busy multi-turn session that crosses the gate + * threshold more than once between drains would otherwise enqueue several + * stale rows for the same conversation, each of which would run the full + * judge pipeline against an older snapshot of the same dialog. Wrapping + * the delete and insert in a single transaction keeps the dedup atomic + * with respect to a concurrent `drainAll`. */ enqueue(summary: SessionSummary, decision: GateDecision): void { - const stmt = this.db.query( - `INSERT INTO evolution_queue (session_id, session_key, gate_decision_json, session_summary_json) - VALUES (?, ?, ?, ?)`, + const tx = this.db.transaction( + (sessionKey: string, sessionId: string, decisionJson: string, summaryJson: string) => { + this.db.run("DELETE FROM evolution_queue WHERE session_key = ?", [sessionKey]); + this.db.run( + `INSERT INTO evolution_queue (session_id, session_key, gate_decision_json, session_summary_json) + VALUES (?, ?, ?, ?)`, + [sessionId, sessionKey, decisionJson, summaryJson], + ); + }, ); - stmt.run(summary.session_id, summary.session_key, JSON.stringify(decision), JSON.stringify(summary)); + tx(summary.session_key, summary.session_id, JSON.stringify(decision), JSON.stringify(summary)); } depth(): number { diff --git a/src/index.ts b/src/index.ts index 89c4fef..4792723 100644 --- a/src/index.ts +++ b/src/index.ts @@ -112,9 +112,10 @@ async function main(): Promise { let evolution: EvolutionEngine | null = null; let evolutionCadence: EvolutionCadence | null = null; try { - evolution = new EvolutionEngine(undefined, runtime); - const currentVersion = evolution.getCurrentVersion(); - const judgeMode = evolution.usesLLMJudges() ? "LLM judges" : "heuristic"; + const engine = new EvolutionEngine(undefined, runtime); + evolution = engine; + const currentVersion = engine.getCurrentVersion(); + const judgeMode = engine.usesLLMJudges() ? "LLM judges" : "heuristic"; console.log(`[evolution] Engine initialized (v${currentVersion}, ${judgeMode})`); setEvolutionVersionProvider(() => evolution?.getCurrentVersion() ?? 0); @@ -123,9 +124,17 @@ async function main(): Promise { // route through `onEnqueue` which fires a drain whenever the queue // depth crosses `demandTriggerDepth`. const queue = new EvolutionQueue(db); - const cadenceConfig = loadCadenceConfig(evolution.getEvolutionConfig()); - evolutionCadence = new EvolutionCadence(evolution, queue, evolution.getEvolutionConfig(), cadenceConfig); - evolution.setQueueWiring(queue, () => evolutionCadence?.onEnqueue()); + const cadenceConfig = loadCadenceConfig(engine.getEvolutionConfig()); + evolutionCadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), cadenceConfig); + engine.setQueueWiring(queue, () => evolutionCadence?.onEnqueue()); + // The cadence drains the queue out-of-band, so the runtime's in-memory + // evolved config snapshot must be refreshed from disk after each + // applied change. Without this callback the queued path would rewrite + // `phantom-config/` files but the live agent would keep prompting with + // the boot-time snapshot until the process restarts. + engine.setOnConfigApplied(() => { + runtime.setEvolvedConfig(engine.getConfig()); + }); evolutionCadence.start(); console.log( `[evolution] Cadence started (cadence=${cadenceConfig.cadenceMinutes}min, demand_trigger=${cadenceConfig.demandTriggerDepth})`,