From 3917fc1d716fcdba701fee9d058a19bdca37ef4d Mon Sep 17 00:00:00 2001 From: Muhammad Ahmed Cheema Date: Tue, 14 Apr 2026 18:46:24 -0700 Subject: [PATCH 1/2] fix: phase 1+2 reviewer findings (gate preset, runtime refresh, queue dedup, single-count, failed-row preservation, integration test) Closes the Codex and independent reviewer punch list against PR #61. C1 runtime refresh after cadence drain. EvolutionEngine now owns an onConfigApplied callback that fires from both the normal apply path and the CycleAborted partial-apply recovery whenever applied.length > 0. Wired into src/index.ts next to setQueueWiring so the AgentRuntime evolved config snapshot refreshes after every drain that applies changes. Pre-fix the queued path rewrote phantom-config/ files but the live agent kept its boot-time snapshot until process restart. C2 failed queue rows now stay in the queue. cadence.runDrain filters the batch result to ok-only ids before markProcessed and emits a warn line per failed row including the queue id and error string. Failed rows retry on the next drain instead of being silently deleted, which was reintroducing the pre-Phase-2 drop-on-floor shape for the exact sessions the safety floor exists to protect. C3 gate Haiku call drops the claude_code preset envelope. New omitPreset option on JudgeQueryOptions, flipped on only at the gate call site, swaps the SDK systemPrompt from the preset envelope to a plain string. The SDK accepts both shapes; the plain-string path skips the full Claude Code base prompt and tool catalog. Live fleet data showed gate cost running 20 to 180x the research target because of preset overhead. Other judges keep the preset envelope unchanged. M1 collapses session_count triple-increment to single-increment per fired session. Removed updateAfterSession from enqueueIfWorthy and from the top of afterSessionInternal. The remaining call site inside runCycle has the real hadCorrections value after observation extraction. The Phase 0 M4 mutex-skip increment is obsolete now that Phase 2 replaced the drop-on-floor path with a persistent queue: every enqueued session eventually reaches runCycle. M2 adds a phase-1-2 integration test that wires the real EvolutionEngine, the real EvolutionQueue over in-memory sqlite, the real EvolutionCadence, and a stub AgentRuntime. Two tests cover queue depth post-drain, single-count metrics, runtime refresh callback firing, and failed-row retention through two drains. Single highest leverage item in the punch list; covers every coordination concern between engine, queue, cadence, and runtime in one fixture. M3 dedups queue rows by session_key. EvolutionQueue.enqueue wraps a DELETE WHERE session_key = ? and the INSERT in a sqlite transaction so the dedup is atomic with respect to a concurrent drainAll. No migration required. A busy multi-turn session that crosses the gate threshold multiple times between drains now collapses to a single row with the latest summary, instead of running the full Sonnet judge pipeline once per turn against progressively shorter snapshots. Test plan - bun test: 1385 pass / 10 skip / 0 fail (+11 over the 1374 baseline) - bun run lint: clean - bun run typecheck: clean --- src/agent/__tests__/judge-query.test.ts | 23 +- src/agent/judge-query.ts | 42 ++- src/evolution/__tests__/cadence.test.ts | 46 ++- src/evolution/__tests__/engine.test.ts | 57 +++- src/evolution/__tests__/gate.test.ts | 36 +++ .../__tests__/mutex-and-retry-ceiling.test.ts | 33 +- .../__tests__/phase-1-2-integration.test.ts | 283 ++++++++++++++++++ src/evolution/__tests__/queue.test.ts | 56 +++- src/evolution/cadence.ts | 19 +- src/evolution/engine.ts | 51 +++- src/evolution/gate.ts | 6 + src/evolution/queue.ts | 26 +- src/index.ts | 21 +- 13 files changed, 632 insertions(+), 67 deletions(-) create mode 100644 src/evolution/__tests__/phase-1-2-integration.test.ts diff --git a/src/agent/__tests__/judge-query.test.ts b/src/agent/__tests__/judge-query.test.ts index 6de3135..73a63fb 100644 --- a/src/agent/__tests__/judge-query.test.ts +++ b/src/agent/__tests__/judge-query.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from "bun:test"; import { z } from "zod/v4"; -import { __absorbUsageForTest, parseJsonFromResponse } from "../judge-query.ts"; +import { __absorbUsageForTest, buildSystemPrompt, parseJsonFromResponse } from "../judge-query.ts"; // parseJsonFromResponse is the shape-normalization layer for judge subprocess output. // Models sometimes return markdown fences, leading prose, or trailing whitespace even @@ -117,6 +117,27 @@ describe("parseJsonFromResponse", () => { expect(partial.costUsd).toBe(0); }); + test("buildSystemPrompt returns the claude_code preset envelope by default", () => { + const prompt = buildSystemPrompt("evaluate this", false); + expect(typeof prompt).toBe("object"); + if (typeof prompt === "object") { + expect(prompt.type).toBe("preset"); + expect(prompt.preset).toBe("claude_code"); + expect(prompt.append).toBe("evaluate this"); + } + }); + + test("buildSystemPrompt returns a plain string when omitPreset is true", () => { + // This is the C3 fix shape: the SDK accepts a plain-string systemPrompt + // per its docstring ("Use a custom system prompt") and that path skips + // the preset's base prompt and full tool catalog. The gate is the only + // caller that flips this on, because it is a pure pass/skip evaluation + // with no tool use. Other judges keep the preset. + const prompt = buildSystemPrompt("evaluate this", true); + expect(typeof prompt).toBe("string"); + expect(prompt).toBe("evaluate this"); + }); + test("parses nested structures", () => { const Nested = z.object({ flags: z.array(z.object({ category: z.string(), severity: z.enum(["critical", "warning", "info"]) })), diff --git a/src/agent/judge-query.ts b/src/agent/judge-query.ts index f7d65b1..64b7654 100644 --- a/src/agent/judge-query.ts +++ b/src/agent/judge-query.ts @@ -16,6 +16,17 @@ export type JudgeQueryOptions = { schema: z.ZodType; model?: string; maxTokens?: number; + /** + * When true, the SDK is invoked with a plain-string `systemPrompt` instead + * of the `claude_code` preset envelope. The preset bundles the full Claude + * Code base prompt and tool catalog, which costs thousands of input tokens + * per call. Pure evaluation calls that do not use any tools (the + * conditional firing gate is the canonical example) should set this so + * Haiku only sees the judge prompt and the session summary, not the entire + * Claude Code surface area. Other judges that legitimately want the preset + * keep the default. + */ + omitPreset?: boolean; }; export type JudgeQueryResult = { @@ -142,17 +153,15 @@ export async function runJudgeQuery( // whenever ANTHROPIC_API_KEY happened to be set in the shell. const providerEnv = buildProviderEnv(config); + const systemPrompt = buildSystemPrompt(judgePrompt, options.omitPreset === true); + const queryStream = query({ prompt: options.userMessage, options: { model: resolvedModel, permissionMode: "bypassPermissions", allowDangerouslySkipPermissions: true, - systemPrompt: { - type: "preset" as const, - preset: "claude_code" as const, - append: judgePrompt, - }, + systemPrompt, maxTurns: 1, effort: "low", persistSession: false, @@ -308,6 +317,29 @@ export function __absorbUsageForTest( return partial; } +/** + * Compose the SDK `systemPrompt` value. The default path keeps the + * `claude_code` preset envelope so judges designed against the Claude Code + * tool catalog continue to receive the full base prompt. When the caller + * sets `omitPreset`, the SDK is handed a plain string instead, which the + * SDK documents as "Use a custom system prompt" and which skips the preset's + * base prompt and tool descriptions entirely. The gate uses this to drop + * two orders of magnitude of input tokens per call. + */ +export function buildSystemPrompt( + judgePrompt: string, + omitPreset: boolean, +): string | { type: "preset"; preset: "claude_code"; append: string } { + if (omitPreset) { + return judgePrompt; + } + return { + type: "preset" as const, + preset: "claude_code" as const, + append: judgePrompt, + }; +} + function buildJudgePrompt(systemPrompt: string, schemaJson: unknown): string { return [ systemPrompt, diff --git a/src/evolution/__tests__/cadence.test.ts b/src/evolution/__tests__/cadence.test.ts index 1d196fc..2b55e88 100644 --- a/src/evolution/__tests__/cadence.test.ts +++ b/src/evolution/__tests__/cadence.test.ts @@ -42,9 +42,15 @@ function newDb(): Database { } function makeSummary(overrides: Partial = {}): SessionSummary { + const sessionId = overrides.session_id ?? "s1"; + // Default the session_key off the session_id so each fixture row gets a + // distinct key. The queue dedups by session_key, so two enqueues with the + // same key collapse into a single row, which is correct production + // behavior but breaks any test that uses session_id alone to distinguish + // rows. return { - session_id: "s1", - session_key: "slack:C1:T1", + session_id: sessionId, + session_key: `slack:C-${sessionId}:T-${sessionId}`, user_id: "u1", user_messages: ["help"], assistant_messages: ["ok"], @@ -297,6 +303,42 @@ describe("EvolutionCadence", () => { } }); + test("failed pipeline rows stay in the queue while ok rows are deleted", async () => { + const config = setupEnv(); + const db = newDb(); + const queue = new EvolutionQueue(db); + let calls = 0; + const engine = fakeEngine({ + onRun: async (session) => { + calls += 1; + if (session.session_id === "fail-me") { + throw new Error("simulated transient pipeline failure"); + } + return { version: 1, changes_applied: [], changes_rejected: [] }; + }, + }); + const cadence = new EvolutionCadence(engine, queue, config, { cadenceMinutes: 1_000_000, demandTriggerDepth: 999 }); + cadence.start(); + try { + // Distinct session_keys so the dedup-on-enqueue from M3 does not + // collapse them into a single row. + queue.enqueue(makeSummary({ session_id: "ok-1", session_key: "slack:C1:T1" }), DECISION); + queue.enqueue(makeSummary({ session_id: "fail-me", session_key: "slack:C2:T2" }), DECISION); + queue.enqueue(makeSummary({ session_id: "ok-2", session_key: "slack:C3:T3" }), DECISION); + const result = await cadence.triggerNow(); + expect(result?.processed).toBe(3); + expect(result?.failureCount).toBe(1); + expect(calls).toBe(3); + + // Only the failing row remains; the two ok rows were deleted. + const remaining = queue.drainAll(); + expect(remaining).toHaveLength(1); + expect(remaining[0].session_id).toBe("fail-me"); + } finally { + cadence.stop(); + } + }); + test("queue_stats counters increment after each drain", async () => { const config = setupEnv(); const db = newDb(); diff --git a/src/evolution/__tests__/engine.test.ts b/src/evolution/__tests__/engine.test.ts index c72edb7..f24c46f 100644 --- a/src/evolution/__tests__/engine.test.ts +++ b/src/evolution/__tests__/engine.test.ts @@ -201,17 +201,14 @@ describe("EvolutionEngine", () => { const session = makeSession({ outcome: "success" }); await engine.afterSession(session); - // Phase 0 M4: `updateAfterSession` is called twice on the normal run - // path. Once at the top of `afterSession` so the mutex-skip path also - // updates counters (without that, dashboards undercount during bursts), - // and again inside `runCycle` with the real `hadCorrections` value. - // Both calls increment `session_count`, so the normal path double-counts - // by one. This is the tradeoff the Phase 0 reviewer explicitly accepted - // because Phase 2 replaces the drop-on-floor mutex with a real queue - // and removes this bookkeeping entirely. + // Phase 1+2 collapsed `updateAfterSession` to a single call inside + // `runCycle`, after observation extraction supplies the real + // `hadCorrections` value. The Phase 0 M4 increments at the top of + // `afterSessionInternal` and inside `enqueueIfWorthy` were obsolete + // once the persistent queue replaced the drop-on-floor mutex path. const metrics = engine.getMetrics(); - expect(metrics.session_count).toBe(2); - expect(metrics.success_count).toBe(2); + expect(metrics.session_count).toBe(1); + expect(metrics.success_count).toBe(1); }); test("constitution violation is rejected", async () => { @@ -261,6 +258,46 @@ describe("EvolutionEngine", () => { expect(userProfile.toLowerCase()).toContain("vim"); }); + test("setOnConfigApplied fires after a successful afterSession that applies changes", async () => { + // C1 contract: the engine owns the runtime refresh callback and fires + // it from the apply path. Production wiring in src/index.ts uses this + // exact setter to refresh the runtime's evolvedConfig snapshot from + // disk. Test against the engine directly so a future refactor that + // breaks the callback shape fails here, not at boot time on a VM. + const engine = new EvolutionEngine(CONFIG_PATH); + const versions: number[] = []; + engine.setOnConfigApplied(() => { + versions.push(engine.getCurrentVersion()); + }); + await engine.afterSession(makeSession({ user_messages: ["No, use TypeScript not JavaScript"] })); + expect(versions.length).toBeGreaterThanOrEqual(1); + expect(versions.at(-1)).toBe(engine.getCurrentVersion()); + }); + + test("setOnConfigApplied does not fire when no changes are applied", async () => { + const engine = new EvolutionEngine(CONFIG_PATH); + let calls = 0; + engine.setOnConfigApplied(() => { + calls += 1; + }); + // A neutral session with no correction signals applies zero changes. + await engine.afterSession(makeSession({ user_messages: ["What time is it?"] })); + expect(calls).toBe(0); + }); + + test("setOnConfigApplied callback errors do not wedge the pipeline", async () => { + // A telemetry/refresh failure in the callback must not poison the + // evolution result. The engine swallows and warns, the pipeline + // returns normally, and the next applied change retries the refresh. + const engine = new EvolutionEngine(CONFIG_PATH); + engine.setOnConfigApplied(() => { + throw new Error("simulated runtime refresh failure"); + }); + const result = await engine.afterSession(makeSession({ user_messages: ["No, use TypeScript not JavaScript"] })); + expect(result.changes_applied.length).toBeGreaterThan(0); + expect(result.version).toBeGreaterThan(0); + }); + test("evolved config is available in getConfig after changes", async () => { const engine = new EvolutionEngine(CONFIG_PATH); const session = makeSession({ diff --git a/src/evolution/__tests__/gate.test.ts b/src/evolution/__tests__/gate.test.ts index 70c199f..e61ed33 100644 --- a/src/evolution/__tests__/gate.test.ts +++ b/src/evolution/__tests__/gate.test.ts @@ -65,6 +65,7 @@ type FakeRuntime = { schema: unknown; model?: string; maxTokens?: number; + omitPreset?: boolean; }) => Promise; }; @@ -93,6 +94,41 @@ describe("decideGate Haiku happy path", () => { expect(decision.reason).toContain("naming"); }); + test("gate forwards omitPreset=true so judgeQuery skips the claude_code preset envelope", async () => { + // The gate is a pure pass/skip evaluation that never reads files or + // runs tools, so it must opt out of the `claude_code` system prompt + // preset that bundles the full Claude Code base prompt and tool + // catalog. Live fleet data showed gate cost running 20-180x the + // research target until this flag was wired through. Asserting the + // flag at the call site is the durable defense against a future + // refactor accidentally re-introducing the preset overhead. + const captured: Array<{ omitPreset?: boolean; model?: string; maxTokens?: number }> = []; + const runtime: FakeRuntime = { + judgeQuery: async (options) => { + captured.push({ + omitPreset: options.omitPreset, + model: options.model, + maxTokens: options.maxTokens, + }); + return { + verdict: "pass" as const, + confidence: 0.9, + reasoning: "", + data: { evolve: false, reason: "routine" }, + model: "claude-haiku-4-5", + inputTokens: 420, + outputTokens: 28, + costUsd: 0.0005, + durationMs: 900, + }; + }, + }; + await decideGate(makeSession(), runtime as unknown as AgentRuntime); + expect(captured).toHaveLength(1); + expect(captured[0].omitPreset).toBe(true); + expect(captured[0].maxTokens).toBe(200); + }); + test("evolve=false returns skip source=haiku", async () => { const runtime: FakeRuntime = { judgeQuery: async () => ({ diff --git a/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts b/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts index d5c28cc..8777803 100644 --- a/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts +++ b/src/evolution/__tests__/mutex-and-retry-ceiling.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { mkdirSync, rmSync, writeFileSync } from "node:fs"; import { JudgeSubprocessError } from "../../agent/judge-query.ts"; import type { AgentRuntime } from "../../agent/runtime.ts"; import type { PhantomConfig } from "../../config/types.ts"; @@ -356,11 +356,17 @@ describe("Phase 0 mutex guard", () => { } }); - test("mutex skip path still bumps session_count so dashboards do not undercount", async () => { - // M4: without this the normal-vs-skip paths diverge and operators - // watching session_count in the dashboard see undercounting during - // bursts. The skip path now updates session metrics with - // hadCorrections=false at the top of afterSession. + test("mutex skip path returns a skipped result without spawning a new cycle", async () => { + // Phase 1+2 collapsed `updateAfterSession` to a single call inside + // `runCycle`, after observation extraction supplies the real + // `hadCorrections` value. The previous Phase 0 M4 top-of-afterSession + // increment was load-bearing only while the mutex could permanently + // drop a session; the persistent queue replaces that drop-on-floor + // path so an enqueued session eventually reaches `runCycle` and the + // counter increments exactly once. This test pins the new shape: the + // active cycle still holds the mutex, the second call sees the guard + // and returns a skipped result with zero applied changes, and no + // extra runCycle is launched on top of the hanging one. type Releaser = (reason: unknown) => void; const releaseHangRef: { fn: Releaser | null } = { fn: null }; const hang = new Promise((_resolve, reject) => { @@ -376,15 +382,12 @@ describe("Phase 0 mutex guard", () => { const callA = engine.afterSession(makeSession({ session_id: "active" })); await Promise.resolve(); await Promise.resolve(); - await engine.afterSession(makeSession({ session_id: "skip-1" })); - await engine.afterSession(makeSession({ session_id: "skip-2" })); - - const metricsPath = `${TEST_DIR}/phantom-config/meta/metrics.json`; - const metrics = JSON.parse(readFileSync(metricsPath, "utf-8")); - // Three afterSession calls, three counter increments from the top-of- - // afterSession update. The active call is still hanging so its inner - // updateAfterSession has not run yet, so we assert exactly 3. - expect(metrics.session_count).toBe(3); + const skipResult1 = await engine.afterSession(makeSession({ session_id: "skip-1" })); + const skipResult2 = await engine.afterSession(makeSession({ session_id: "skip-2" })); + + expect(skipResult1.changes_applied).toHaveLength(0); + expect(skipResult2.changes_applied).toHaveLength(0); + expect((engine as unknown as { activeCycleSkipCount: number }).activeCycleSkipCount).toBe(2); releaseHangRef.fn?.(new Error("released by test")); try { diff --git a/src/evolution/__tests__/phase-1-2-integration.test.ts b/src/evolution/__tests__/phase-1-2-integration.test.ts new file mode 100644 index 0000000..9e92782 --- /dev/null +++ b/src/evolution/__tests__/phase-1-2-integration.test.ts @@ -0,0 +1,283 @@ +import { Database } from "bun:sqlite"; +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdirSync, rmSync, writeFileSync } from "node:fs"; +import type { AgentRuntime } from "../../agent/runtime.ts"; +import { MIGRATIONS } from "../../db/schema.ts"; +import { EvolutionCadence } from "../cadence.ts"; +import { EvolutionEngine } from "../engine.ts"; +import { EvolutionQueue } from "../queue.ts"; +import type { SessionSummary } from "../types.ts"; + +// Phase 1+2 end-to-end integration. Wires the REAL EvolutionEngine, REAL +// EvolutionQueue over an in-memory SQLite, and REAL EvolutionCadence with a +// short cadence window. Only the AgentRuntime is stubbed, and only enough to +// answer `judgeQuery` (the gate Haiku call) and `getPhantomConfig` (the judge +// mode resolver). Every coordination bug in the Phase 1+2 fix punch list is +// catchable here: +// +// - C1: runtime.setEvolvedConfig is called after the drain applies changes +// - C2: a session whose pipeline throws stays in the queue +// - M1: metrics.session_count is 1 per drained session, not 3 +// - M3: dedup-by-session-key on the enqueue side +// +// One test per assertion keeps the failure message specific. They share a +// fixture builder so the surface area stays small. + +const TEST_DIR = "/tmp/phantom-test-phase-1-2-integration"; +const CONFIG_PATH = `${TEST_DIR}/config/evolution.yaml`; + +function setupTestEnvironment(): void { + mkdirSync(`${TEST_DIR}/config`, { recursive: true }); + mkdirSync(`${TEST_DIR}/phantom-config/meta`, { recursive: true }); + mkdirSync(`${TEST_DIR}/phantom-config/strategies`, { recursive: true }); + mkdirSync(`${TEST_DIR}/phantom-config/memory`, { recursive: true }); + + writeFileSync( + CONFIG_PATH, + [ + "cadence:", + " reflection_interval: 1", + " consolidation_interval: 10", + "gates:", + " drift_threshold: 0.7", + " max_file_lines: 200", + " auto_rollback_threshold: 0.1", + " auto_rollback_window: 5", + "reflection:", + ' model: "claude-sonnet-4-20250514"', + "judges:", + // `never` so the engine takes the heuristic observation path. We + // only need a stub runtime for the gate; the rest of the pipeline + // runs deterministic TypeScript code. + ' enabled: "never"', + "paths:", + ` config_dir: "${TEST_DIR}/phantom-config"`, + ` constitution: "${TEST_DIR}/phantom-config/constitution.md"`, + ` version_file: "${TEST_DIR}/phantom-config/meta/version.json"`, + ` metrics_file: "${TEST_DIR}/phantom-config/meta/metrics.json"`, + ` evolution_log: "${TEST_DIR}/phantom-config/meta/evolution-log.jsonl"`, + ` golden_suite: "${TEST_DIR}/phantom-config/meta/golden-suite.jsonl"`, + ` session_log: "${TEST_DIR}/phantom-config/memory/session-log.jsonl"`, + ].join("\n"), + "utf-8", + ); + + writeFileSync( + `${TEST_DIR}/phantom-config/constitution.md`, + [ + "# Phantom Constitution", + "", + "1. Honesty: Never deceive the user.", + "2. Safety: Never execute harmful commands.", + "3. Privacy: Never share user data.", + "4. Transparency: No hidden changes.", + "5. Boundaries: You are not a person.", + "6. Accountability: Every change is logged.", + "7. Consent: Do not modify the constitution.", + "8. Proportionality: Minimal changes.", + ].join("\n"), + "utf-8", + ); + writeFileSync(`${TEST_DIR}/phantom-config/persona.md`, "# Persona\n\n- Be direct.\n", "utf-8"); + writeFileSync( + `${TEST_DIR}/phantom-config/user-profile.md`, + "# User Profile\n\nPreferences learned from interactions.\n", + "utf-8", + ); + writeFileSync(`${TEST_DIR}/phantom-config/domain-knowledge.md`, "# Domain Knowledge\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/strategies/task-patterns.md`, "# Task Patterns\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/strategies/tool-preferences.md`, "# Tool Preferences\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/strategies/error-recovery.md`, "# Error Recovery\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/memory/session-log.jsonl`, "", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/memory/principles.md`, "# Principles\n", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/memory/corrections.md`, "# Corrections\n", "utf-8"); + + writeFileSync( + `${TEST_DIR}/phantom-config/meta/version.json`, + JSON.stringify({ + version: 0, + parent: null, + timestamp: "2026-03-25T00:00:00Z", + changes: [], + metrics_at_change: { session_count: 0, success_rate_7d: 0, correction_rate_7d: 0 }, + }), + "utf-8", + ); + writeFileSync( + `${TEST_DIR}/phantom-config/meta/metrics.json`, + JSON.stringify({ + session_count: 0, + success_count: 0, + failure_count: 0, + correction_count: 0, + evolution_count: 0, + rollback_count: 0, + last_session_at: null, + last_evolution_at: null, + success_rate_7d: 0, + correction_rate_7d: 0, + sessions_since_consolidation: 0, + }), + "utf-8", + ); + writeFileSync(`${TEST_DIR}/phantom-config/meta/evolution-log.jsonl`, "", "utf-8"); + writeFileSync(`${TEST_DIR}/phantom-config/meta/golden-suite.jsonl`, "", "utf-8"); +} + +function newDb(): Database { + const db = new Database(":memory:"); + db.run("PRAGMA journal_mode = WAL"); + for (const stmt of MIGRATIONS) db.run(stmt); + return db; +} + +type StubRuntime = { + judgeQuery: (options: { + systemPrompt: string; + userMessage: string; + schema: unknown; + model?: string; + maxTokens?: number; + omitPreset?: boolean; + }) => Promise; +}; + +function fireGateRuntime(): StubRuntime { + return { + judgeQuery: async () => ({ + verdict: "pass" as const, + confidence: 0.9, + reasoning: "", + data: { evolve: true, reason: "user taught a workflow" }, + model: "claude-haiku-4-5", + inputTokens: 320, + outputTokens: 28, + costUsd: 0.0006, + durationMs: 800, + }), + }; +} + +function fireWorthySession(overrides: Partial = {}): SessionSummary { + // The heuristic observation extractor in `reflection.ts` looks for + // correction-shaped phrases in user messages ("no, use X not Y"). Using + // such a message guarantees the engine actually applies a change to + // `user-profile.md`, which is what makes the C1 callback fire and what + // lets the integration test assert against real applied state. + return { + session_id: "integ-1", + session_key: "slack:Cint:Tint", + user_id: "user-int", + user_messages: ["No, always use TypeScript not JavaScript"], + assistant_messages: ["Got it"], + tools_used: [], + files_tracked: [], + outcome: "success", + cost_usd: 0.05, + started_at: "2026-04-14T10:00:00Z", + ended_at: "2026-04-14T10:01:00Z", + ...overrides, + }; +} + +describe("phase 1+2 integration", () => { + beforeEach(() => setupTestEnvironment()); + afterEach(() => rmSync(TEST_DIR, { recursive: true, force: true })); + + test("end-to-end drain wires every coordination concern", async () => { + const db = newDb(); + const runtime = fireGateRuntime(); + const engine = new EvolutionEngine(CONFIG_PATH, runtime as unknown as AgentRuntime); + const queue = new EvolutionQueue(db); + const cadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), { + cadenceMinutes: 1_000_000, + demandTriggerDepth: 999, + }); + + // C1 sentinel: count callback invocations and capture each version we + // see at refresh time. The production wiring in `src/index.ts` uses + // this same setter to refresh `runtime.evolvedConfig` from disk. + const refreshes: number[] = []; + engine.setOnConfigApplied(() => { + refreshes.push(engine.getCurrentVersion()); + }); + engine.setQueueWiring(queue, () => cadence.onEnqueue()); + + cadence.start(); + try { + const result = await engine.enqueueIfWorthy(fireWorthySession()); + expect(result.enqueued).toBe(true); + expect(result.decision.fire).toBe(true); + expect(queue.depth()).toBe(1); + + const drainResult = await cadence.triggerNow(); + expect(drainResult).not.toBeNull(); + expect(drainResult?.processed).toBe(1); + expect(drainResult?.successCount).toBe(1); + + // C1: the drain produced an applied change, so the refresh + // callback fired exactly once with the new version. + expect(refreshes.length).toBeGreaterThanOrEqual(1); + expect(refreshes.at(-1)).toBe(engine.getCurrentVersion()); + expect(engine.getCurrentVersion()).toBeGreaterThan(0); + + // M1: a single fired session bumps `session_count` exactly once. + // Pre-fix this was 3 because `updateAfterSession` was called from + // `enqueueIfWorthy`, the top of `afterSessionInternal`, and inside + // `runCycle`. Post-fix only the runCycle call survives. + const metrics = engine.getMetrics(); + expect(metrics.session_count).toBe(1); + expect(metrics.success_count).toBe(1); + + // Queue is empty after a successful drain. + expect(queue.depth()).toBe(0); + } finally { + cadence.stop(); + } + }); + + test("a pipeline failure leaves the row in the queue for the next drain (C2)", async () => { + const db = newDb(); + const runtime = fireGateRuntime(); + const engine = new EvolutionEngine(CONFIG_PATH, runtime as unknown as AgentRuntime); + const queue = new EvolutionQueue(db); + const cadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), { + cadenceMinutes: 1_000_000, + demandTriggerDepth: 999, + }); + + // Force the engine pipeline to throw on the first drain. We patch + // `runSingleSessionPipeline` directly on the instance so the cadence's + // processBatch loop sees the failure shape it would see from a real + // JudgeSubprocessError or a `CycleAborted` rethrow that was not caught. + let throwOnce = true; + const original = engine.runSingleSessionPipeline.bind(engine); + engine.runSingleSessionPipeline = async (session: SessionSummary) => { + if (throwOnce) { + throwOnce = false; + throw new Error("simulated transient pipeline failure"); + } + return original(session); + }; + + engine.setQueueWiring(queue, () => cadence.onEnqueue()); + cadence.start(); + try { + await engine.enqueueIfWorthy(fireWorthySession({ session_id: "fail-then-ok" })); + expect(queue.depth()).toBe(1); + + const firstDrain = await cadence.triggerNow(); + expect(firstDrain?.failureCount).toBe(1); + // The failed row stays in the queue for the next drain instead of + // being silently deleted by markProcessed. + expect(queue.depth()).toBe(1); + + // Second drain succeeds and clears the row. + const secondDrain = await cadence.triggerNow(); + expect(secondDrain?.successCount).toBe(1); + expect(queue.depth()).toBe(0); + } finally { + cadence.stop(); + } + }); +}); diff --git a/src/evolution/__tests__/queue.test.ts b/src/evolution/__tests__/queue.test.ts index 10042e3..f6e031a 100644 --- a/src/evolution/__tests__/queue.test.ts +++ b/src/evolution/__tests__/queue.test.ts @@ -67,9 +67,9 @@ describe("EvolutionQueue", () => { test("drainAll returns rows oldest-first", () => { const queue = new EvolutionQueue(db); - queue.enqueue(makeSummary({ session_id: "a" }), DECISION); - queue.enqueue(makeSummary({ session_id: "b" }), DECISION); - queue.enqueue(makeSummary({ session_id: "c" }), DECISION); + queue.enqueue(makeSummary({ session_id: "a", session_key: "slack:Ca:Ta" }), DECISION); + queue.enqueue(makeSummary({ session_id: "b", session_key: "slack:Cb:Tb" }), DECISION); + queue.enqueue(makeSummary({ session_id: "c", session_key: "slack:Cc:Tc" }), DECISION); const drained = queue.drainAll(); expect(drained).toHaveLength(3); expect(drained[0].session_id).toBe("a"); @@ -84,9 +84,9 @@ describe("EvolutionQueue", () => { test("markProcessed deletes only the specified ids", () => { const queue = new EvolutionQueue(db); - queue.enqueue(makeSummary({ session_id: "a" }), DECISION); - queue.enqueue(makeSummary({ session_id: "b" }), DECISION); - queue.enqueue(makeSummary({ session_id: "c" }), DECISION); + queue.enqueue(makeSummary({ session_id: "a", session_key: "slack:Ca:Ta" }), DECISION); + queue.enqueue(makeSummary({ session_id: "b", session_key: "slack:Cb:Tb" }), DECISION); + queue.enqueue(makeSummary({ session_id: "c", session_key: "slack:Cc:Tc" }), DECISION); const drained = queue.drainAll(); queue.markProcessed([drained[0].id, drained[2].id]); const remaining = queue.drainAll(); @@ -130,6 +130,50 @@ describe("EvolutionQueue", () => { } }); + test("enqueueing the same session_key twice keeps only the latest summary", () => { + const queue = new EvolutionQueue(db); + queue.enqueue( + makeSummary({ + session_id: "turn-3", + session_key: "slack:C1:T1", + user_messages: ["short turn 3"], + }), + DECISION, + ); + queue.enqueue( + makeSummary({ + session_id: "turn-15", + session_key: "slack:C1:T1", + user_messages: ["full conversation up through turn 15"], + cost_usd: 0.42, + }), + DECISION, + ); + expect(queue.depth()).toBe(1); + const drained = queue.drainAll(); + expect(drained).toHaveLength(1); + // The most recent enqueue wins. Without dedup, a busy multi-turn + // session would burn the full Sonnet judge pipeline once per turn that + // crossed the gate, against progressively shorter snapshots of the + // same conversation. + expect(drained[0].session_id).toBe("turn-15"); + expect(drained[0].session_summary.user_messages).toEqual(["full conversation up through turn 15"]); + expect(drained[0].session_summary.cost_usd).toBeCloseTo(0.42, 5); + }); + + test("dedup is scoped per session_key, not per session_id", () => { + const queue = new EvolutionQueue(db); + queue.enqueue(makeSummary({ session_id: "a", session_key: "slack:C1:T1" }), DECISION); + queue.enqueue(makeSummary({ session_id: "b", session_key: "slack:C2:T2" }), DECISION); + queue.enqueue(makeSummary({ session_id: "c", session_key: "slack:C1:T1" }), DECISION); + expect(queue.depth()).toBe(2); + const drained = queue.drainAll(); + const keys = drained.map((d) => d.session_key).sort(); + expect(keys).toEqual(["slack:C1:T1", "slack:C2:T2"]); + const c1Row = drained.find((d) => d.session_key === "slack:C1:T1"); + expect(c1Row?.session_id).toBe("c"); + }); + test("clear truncates the queue", () => { const queue = new EvolutionQueue(db); queue.enqueue(makeSummary(), DECISION); diff --git a/src/evolution/cadence.ts b/src/evolution/cadence.ts index 01d4598..b0999ba 100644 --- a/src/evolution/cadence.ts +++ b/src/evolution/cadence.ts @@ -165,7 +165,24 @@ export class EvolutionCadence { `[evolution] draining batch of ${queued.length} sessions (trigger=${trigger}, cadence=${this.cadenceConfig.cadenceMinutes}min)`, ); const result = await processBatch(queued, this.engine); - this.queue.markProcessed(queued.map((q) => q.id)); + + // Only delete rows whose pipeline succeeded. Failed rows stay in the + // queue so the next drain retries them; deleting on failure would + // silently drop the exact sessions the safety floor exists to protect + // (transient judge subprocess errors, network blips, CycleAborted from + // the Phase 0 failure ceiling). A future PR will add a `failure_count` + // column and a poison-pill ceiling so a single bad row cannot loop + // forever, but the dedup-by-session_key on enqueue already bounds the + // loss for repeated sessions. + const okIds: number[] = []; + for (const entry of result.results) { + if (entry.ok) { + okIds.push(entry.id); + } else { + console.warn(`[evolution] queue row id=${entry.id} pipeline failed, leaving in queue: ${entry.error}`); + } + } + this.queue.markProcessed(okIds); const appliedCount = result.results.reduce((sum, r) => { if (r.ok) return sum + r.result.changes_applied.length; diff --git a/src/evolution/engine.ts b/src/evolution/engine.ts index dc36ce6..09adf26 100644 --- a/src/evolution/engine.ts +++ b/src/evolution/engine.ts @@ -66,6 +66,13 @@ export class EvolutionEngine { // not care about batching can still construct a bare engine. private queue: EvolutionQueue | null = null; private onEnqueue: (() => void) | null = null; + // Fires after a cycle (including a partial-apply CycleAborted recovery) + // applies at least one change to disk. Wired in `src/index.ts` so the + // AgentRuntime in-memory `evolvedConfig` snapshot refreshes whenever the + // queue drain produces new state. Without this, the queued path would + // rewrite `phantom-config/` files but the live agent would keep using its + // boot-time snapshot until the process restarts. + private onConfigApplied: (() => void) | null = null; // `runtime` is optional so existing tests and heuristic-only deployments can // construct an engine without wiring a full AgentRuntime. When the engine @@ -97,6 +104,17 @@ export class EvolutionEngine { this.onEnqueue = onEnqueue; } + /** + * Register a callback that fires whenever a cycle applies at least one + * change to disk. Wired from `src/index.ts` to refresh the AgentRuntime's + * in-memory evolved config snapshot. Matches the setter shape used by + * `setQueueWiring` so the engine still owns the lifecycle and the cadence + * stays ignorant of the runtime. + */ + setOnConfigApplied(callback: () => void): void { + this.onConfigApplied = callback; + } + private resolveJudgeMode(): boolean { const setting = this.config.judges?.enabled ?? "auto"; if (setting === "never") return false; @@ -164,7 +182,6 @@ export class EvolutionEngine { * serializing gate calls through the mutex would defeat the purpose. */ async enqueueIfWorthy(session: SessionSummary): Promise { - updateAfterSession(this.config, session.outcome, false); const decision = await decideGate(session, this.runtime); appendGateLog(this.config, session, decision); recordGateDecision(this.config, decision); @@ -213,17 +230,12 @@ export class EvolutionEngine { } private async afterSessionInternal(session: SessionSummary): Promise { - // Always bump the session counter so the dashboard's `session_count` - // reflects every turn that arrived, including skipped ones. On the skip - // path we pass `hadCorrections=false` because no observation extraction - // has run yet. The normal path also calls `updateAfterSession` inside - // `runCycle` with the real `hadCorrections` value after observations - // are extracted, so the correction signal remains accurate. The small - // double-count on `session_count` during a running cycle is accepted: - // Phase 2 replaces the drop-on-floor model with a real cadence queue - // and this bookkeeping goes away. - updateAfterSession(this.config, session.outcome, false); - + // Phase 2 replaced the drop-on-floor mutex with a persistent queue, so + // every session that crosses the gate eventually reaches `runCycle`, + // which is the single place `session_count` is incremented (with the + // real `hadCorrections` value after observation extraction). The Phase + // 0 M4 increment that used to live here was load-bearing only while + // the mutex could permanently drop a session. if (this.activeCycle !== null) { this.activeCycleSkipCount += 1; const activeId = this.activeCycleSessionId ?? "unknown"; @@ -341,6 +353,7 @@ export class EvolutionEngine { `[evolution] Partial apply: ${partialApplied.length} changes applied ` + `(v${this.getCurrentVersion()}) after cycle abort`, ); + this.notifyConfigApplied(); } if (partialRejected.length > 0) { console.log(`[evolution] Partial apply: ${partialRejected.length} changes rejected after cycle abort`); @@ -371,6 +384,7 @@ export class EvolutionEngine { console.log( `[evolution] Applied ${applied.length} changes (v${this.getCurrentVersion()}) in ${Date.now() - startTime}ms`, ); + this.notifyConfigApplied(); // Promote successful corrections to golden suite if (session.outcome === "success" && hadCorrections) { @@ -529,6 +543,19 @@ export class EvolutionEngine { } } + private notifyConfigApplied(): void { + if (!this.onConfigApplied) return; + try { + this.onConfigApplied(); + } catch (err: unknown) { + // A telemetry/refresh failure must not wedge the evolution pipeline. + // The next applied change will retry, and the disk-side state is + // already correct regardless of whether the callback succeeded. + const msg = err instanceof Error ? err.message : String(err); + console.warn(`[evolution] onConfigApplied callback threw: ${msg}`); + } + } + private recordJudgeCosts(costs: JudgeCosts): void { const metricsPath = this.config.paths.metrics_file; try { diff --git a/src/evolution/gate.ts b/src/evolution/gate.ts index 481fd3e..b5c1d59 100644 --- a/src/evolution/gate.ts +++ b/src/evolution/gate.ts @@ -95,6 +95,12 @@ export async function decideGate(session: SessionSummary, runtime: AgentRuntime schema: GateJudgeResult, model: JUDGE_MODEL_HAIKU, maxTokens: 200, + // The gate is a pure pass/skip evaluation. It never reads files, + // runs commands, or calls tools. Skipping the `claude_code` preset + // drops the per-call input token count from thousands to a few + // hundred, which is what the research cost target assumed and what + // observed fleet spend (20-180x target) exposed as broken. + omitPreset: true, }); const evolve = result.data.evolve === true; return { diff --git a/src/evolution/queue.ts b/src/evolution/queue.ts index a34b6f1..646122d 100644 --- a/src/evolution/queue.ts +++ b/src/evolution/queue.ts @@ -61,18 +61,26 @@ export class EvolutionQueue { constructor(private db: Database) {} /** - * Insert one row per gate-approved session. Called from - * `engine.enqueueIfWorthy` after `decideGate` returns `fire=true`. - * Silent on duplicates: a busy multi-turn session can fire multiple - * times and each turn legitimately wants its own row because the - * batch processor ingests the latest summary when the row is drained. + * Insert one row per gate-approved session, latest summary wins for any + * given `session_key`. A busy multi-turn session that crosses the gate + * threshold more than once between drains would otherwise enqueue several + * stale rows for the same conversation, each of which would run the full + * judge pipeline against an older snapshot of the same dialog. Wrapping + * the delete and insert in a single transaction keeps the dedup atomic + * with respect to a concurrent `drainAll`. */ enqueue(summary: SessionSummary, decision: GateDecision): void { - const stmt = this.db.query( - `INSERT INTO evolution_queue (session_id, session_key, gate_decision_json, session_summary_json) - VALUES (?, ?, ?, ?)`, + const tx = this.db.transaction( + (sessionKey: string, sessionId: string, decisionJson: string, summaryJson: string) => { + this.db.run("DELETE FROM evolution_queue WHERE session_key = ?", [sessionKey]); + this.db.run( + `INSERT INTO evolution_queue (session_id, session_key, gate_decision_json, session_summary_json) + VALUES (?, ?, ?, ?)`, + [sessionId, sessionKey, decisionJson, summaryJson], + ); + }, ); - stmt.run(summary.session_id, summary.session_key, JSON.stringify(decision), JSON.stringify(summary)); + tx(summary.session_key, summary.session_id, JSON.stringify(decision), JSON.stringify(summary)); } depth(): number { diff --git a/src/index.ts b/src/index.ts index 89c4fef..4792723 100644 --- a/src/index.ts +++ b/src/index.ts @@ -112,9 +112,10 @@ async function main(): Promise { let evolution: EvolutionEngine | null = null; let evolutionCadence: EvolutionCadence | null = null; try { - evolution = new EvolutionEngine(undefined, runtime); - const currentVersion = evolution.getCurrentVersion(); - const judgeMode = evolution.usesLLMJudges() ? "LLM judges" : "heuristic"; + const engine = new EvolutionEngine(undefined, runtime); + evolution = engine; + const currentVersion = engine.getCurrentVersion(); + const judgeMode = engine.usesLLMJudges() ? "LLM judges" : "heuristic"; console.log(`[evolution] Engine initialized (v${currentVersion}, ${judgeMode})`); setEvolutionVersionProvider(() => evolution?.getCurrentVersion() ?? 0); @@ -123,9 +124,17 @@ async function main(): Promise { // route through `onEnqueue` which fires a drain whenever the queue // depth crosses `demandTriggerDepth`. const queue = new EvolutionQueue(db); - const cadenceConfig = loadCadenceConfig(evolution.getEvolutionConfig()); - evolutionCadence = new EvolutionCadence(evolution, queue, evolution.getEvolutionConfig(), cadenceConfig); - evolution.setQueueWiring(queue, () => evolutionCadence?.onEnqueue()); + const cadenceConfig = loadCadenceConfig(engine.getEvolutionConfig()); + evolutionCadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), cadenceConfig); + engine.setQueueWiring(queue, () => evolutionCadence?.onEnqueue()); + // The cadence drains the queue out-of-band, so the runtime's in-memory + // evolved config snapshot must be refreshed from disk after each + // applied change. Without this callback the queued path would rewrite + // `phantom-config/` files but the live agent would keep prompting with + // the boot-time snapshot until the process restarts. + engine.setOnConfigApplied(() => { + runtime.setEvolvedConfig(engine.getConfig()); + }); evolutionCadence.start(); console.log( `[evolution] Cadence started (cadence=${cadenceConfig.cadenceMinutes}min, demand_trigger=${cadenceConfig.demandTriggerDepth})`, From 4587e18b73d4ab4ec3f106c03edf2a9865bef58c Mon Sep 17 00:00:00 2001 From: Muhammad Ahmed Cheema Date: Tue, 14 Apr 2026 19:32:53 -0700 Subject: [PATCH 2/2] fix: idempotent session counting and runtime refresh on rollback Two follow-up fixes for findings Codex raised against the PR #63 fix pass. - engine.ts: dedup updateAfterSession by session.session_key so the M1 single-count contract holds under the C2 retry path. A failed row that retries on the next drain re-enters runCycle under the same key; the new countedSessionKeys Set guarantees one increment per unique conversation for the engine's lifetime. Memory is bounded by unique session_keys; process restart clears the set, which is an acceptable per-restart blip. - engine.ts: call notifyConfigApplied() from rollback() so the auto-rollback branch in runCycle no longer leaves the runtime serving a snapshot newer than the disk state. getConfig() re-reads from disk on every call, so the callback observes the post-rollback content with no extra in-memory work. Tests: +2 (retry dedup in phase-1-2-integration.test.ts, rollback callback in engine.test.ts). 1387 pass / 10 skip / 0 fail. Lint and typecheck clean. --- src/evolution/__tests__/engine.test.ts | 28 ++++++++ .../__tests__/phase-1-2-integration.test.ts | 64 +++++++++++++++++++ src/evolution/engine.ts | 24 ++++++- 3 files changed, 114 insertions(+), 2 deletions(-) diff --git a/src/evolution/__tests__/engine.test.ts b/src/evolution/__tests__/engine.test.ts index f24c46f..ec39dcb 100644 --- a/src/evolution/__tests__/engine.test.ts +++ b/src/evolution/__tests__/engine.test.ts @@ -246,6 +246,34 @@ describe("EvolutionEngine", () => { expect(metrics.rollback_count).toBe(1); }); + test("rollback fires the runtime refresh callback with the rolled-back state", async () => { + // Codex finding on PR #63: the apply path refreshes the runtime via + // `notifyConfigApplied` but the auto-rollback branch was reverting disk + // state without firing the callback, leaving the runtime serving the + // rolled-forward snapshot. The fix wires the same callback into + // `rollback()` so any path that mutates disk state also refreshes the + // runtime. `getConfig()` re-reads from disk on every call, so the + // callback observes the post-rollback content. + const engine = new EvolutionEngine(CONFIG_PATH); + const refreshes: Array<{ version: number; userProfile: string }> = []; + engine.setOnConfigApplied(() => { + const config = engine.getConfig(); + refreshes.push({ version: engine.getCurrentVersion(), userProfile: config.userProfile }); + }); + + await engine.afterSession(makeSession({ user_messages: ["No, use TypeScript not JavaScript"] })); + expect(refreshes.length).toBe(1); + expect(refreshes[0].version).toBeGreaterThan(0); + expect(refreshes[0].userProfile).toContain("TypeScript"); + + engine.rollback(0); + // Two callback invocations now: one from the original apply, one from + // the rollback. The second sees the version-0 state on disk. + expect(refreshes.length).toBe(2); + expect(refreshes[1].version).toBe(0); + expect(refreshes[1].userProfile).not.toContain("TypeScript"); + }); + test("preference is detected and applied", async () => { const engine = new EvolutionEngine(CONFIG_PATH); const session = makeSession({ diff --git a/src/evolution/__tests__/phase-1-2-integration.test.ts b/src/evolution/__tests__/phase-1-2-integration.test.ts index 9e92782..6d31f04 100644 --- a/src/evolution/__tests__/phase-1-2-integration.test.ts +++ b/src/evolution/__tests__/phase-1-2-integration.test.ts @@ -280,4 +280,68 @@ describe("phase 1+2 integration", () => { cadence.stop(); } }); + + test("retried session_key is counted exactly once across drains", async () => { + // Codex finding on PR #63: the C2 fix preserves a failed row in the + // queue, so a transient pipeline failure followed by a retry would + // re-enter `runCycle` under the same `session_key` and double-count + // `session_count`. The dedup guard in `runCycle` caps the increment at + // one per unique `session_key` for the engine's lifetime. + const db = newDb(); + const runtime = fireGateRuntime(); + const engine = new EvolutionEngine(CONFIG_PATH, runtime as unknown as AgentRuntime); + const queue = new EvolutionQueue(db); + const cadence = new EvolutionCadence(engine, queue, engine.getEvolutionConfig(), { + cadenceMinutes: 1_000_000, + demandTriggerDepth: 999, + }); + + let throwOnce = true; + const original = engine.runSingleSessionPipeline.bind(engine); + engine.runSingleSessionPipeline = async (session: SessionSummary) => { + if (throwOnce) { + throwOnce = false; + throw new Error("simulated transient pipeline failure"); + } + return original(session); + }; + + engine.setQueueWiring(queue, () => cadence.onEnqueue()); + cadence.start(); + try { + await engine.enqueueIfWorthy(fireWorthySession({ session_id: "retry-once" })); + + const firstDrain = await cadence.triggerNow(); + expect(firstDrain?.failureCount).toBe(1); + // The first drain threw before reaching `updateAfterSession`, so + // `session_count` is still 0. The row is preserved for retry. + expect(engine.getMetrics().session_count).toBe(0); + expect(queue.depth()).toBe(1); + + const secondDrain = await cadence.triggerNow(); + expect(secondDrain?.successCount).toBe(1); + // The retry is the first call that reaches `updateAfterSession`, + // so it claims the dedup slot for this `session_key` and increments + // exactly once. + expect(engine.getMetrics().session_count).toBe(1); + + // A subsequent enqueue under the SAME `session_key` must not + // increment again. Use a different `session_id` (which is what the + // queue dedup also uses to distinguish turns within a conversation) + // so the row is fresh but the dedup key is the same. + await engine.enqueueIfWorthy(fireWorthySession({ session_id: "retry-once-followup" })); + await cadence.triggerNow(); + expect(engine.getMetrics().session_count).toBe(1); + + // A new `session_key` is counted normally, proving the guard does + // not over-suppress. + await engine.enqueueIfWorthy( + fireWorthySession({ session_id: "other-session", session_key: "slack:Cint:Tother" }), + ); + await cadence.triggerNow(); + expect(engine.getMetrics().session_count).toBe(2); + } finally { + cadence.stop(); + } + }); }); diff --git a/src/evolution/engine.ts b/src/evolution/engine.ts index 09adf26..baa81ad 100644 --- a/src/evolution/engine.ts +++ b/src/evolution/engine.ts @@ -62,6 +62,15 @@ export class EvolutionEngine { private activeCycleSessionId: string | null = null; private activeCycleSkipCount = 0; + // Dedup set for the M1 single-count contract under the C2 retry path. + // A row that fails mid-cycle stays in the queue and re-enters runCycle + // under the same `session_key`; without this guard, `session_count` would + // be incremented once per retry. Bounded by unique session_keys ever seen + // by this engine instance (low thousands per year). Process restart clears + // it, which means a row pending retry across a restart double-counts on + // its first post-restart drain. Acceptable per-restart blip. + private countedSessionKeys = new Set(); + // Phase 1 + 2 wiring. The queue is optional so engine unit tests that do // not care about batching can still construct a bare engine. private queue: EvolutionQueue | null = null; @@ -280,9 +289,14 @@ export class EvolutionEngine { observations = extractObservations(session); } - // Step 0: Update session metrics (after extraction so hadCorrections uses observation results) + // Step 0: Update session metrics (after extraction so hadCorrections uses observation results). + // Guarded by `countedSessionKeys` so a C2 retry of the same session_key + // does not double-count. const hadCorrections = observations.some((o) => o.type === "correction"); - updateAfterSession(this.config, session.outcome, hadCorrections); + if (!this.countedSessionKeys.has(session.session_key)) { + this.countedSessionKeys.add(session.session_key); + updateAfterSession(this.config, session.outcome, hadCorrections); + } if (observations.length === 0) { return { version: this.getCurrentVersion(), changes_applied: [], changes_rejected: [] }; @@ -508,6 +522,12 @@ export class EvolutionEngine { versionRollback(this.config, toVersion); updateAfterRollback(this.config); console.log(`[evolution] Rolled back to version ${toVersion}`); + // The auto-rollback branch in `runCycle` reverts disk state without + // otherwise touching the runtime; without this refresh the agent keeps + // serving the now-reverted version's snapshot until process restart. + // `getConfig()` re-reads from disk on every call, so the callback picks + // up the rolled-back content. + this.notifyConfigApplied(); } private resetDailyCostIfNewDay(): void {