From e89c79e731a6f85cd525502383d7dc56e2ba30d5 Mon Sep 17 00:00:00 2001 From: Daniel Ellison Date: Fri, 15 May 2026 08:54:21 -0400 Subject: [PATCH 1/2] feat(triage): codex branch for one-shot issue triage Phase 3 of the codex backend epic (#480). Adds the codex branch to `run_triage` so GitHub webhook-driven issue triage runs through codex when AGENT_BACKEND=codex. src/kai/triage.py: - New `if agent_backend == "codex":` branch invoking `codex exec --json --model ` with the model resolved via get_model_for(ModelRole.ISSUE_TRIAGE, "codex", override=...). The override env var is ISSUE_TRIAGE_MODEL_CODEX, read at the call site to match the existing claude/goose pattern. - New `_extract_codex_text` and `_recover_text_from_event` helpers that walk codex's NDJSON event stream and recover the final agent message text. Defensive about field names: tries top-level "text", "delta.text", "content" as string or list-of-blocks, and "item.content" with the list-of-blocks shape. The actual codex CLI schema is not yet pinned by smoke test, so the parser accepts multiple field paths; whichever fires on the pinned version wins, the others stay as fallbacks for schema drift. - No sudo wrap (codex on subscription auth uses the service user's own ~/.codex/auth.json); no --max-budget-usd (subscription auth has no per-call billing, runaway protection comes from _TRIAGE_TIMEOUT). tests/test_triage.py: - TestRunTriageCodex (7 tests): argv asserts "codex" + "exec" + --json (not claude / sudo / --max-budget-usd); registry model selection; ISSUE_TRIAGE_MODEL_CODEX env override; claude_user ignored on codex (no sudo); subprocess failure raises with stderr; NDJSON extraction returns final text not raw stdout. - TestExtractCodexText (10 tests): each defensive field path locked independently (top-level text, delta.text, content as string, content as list-of-blocks, item.content, unknown events skipped, multi-event accumulation, whitespace stripped, non-JSON lines tolerated, empty input returns empty string). Refs #480 --- src/kai/triage.py | 138 +++++++++++++++++++++++++++++ tests/test_triage.py | 202 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 340 insertions(+) diff --git a/src/kai/triage.py b/src/kai/triage.py index 6c37969..135baef 100644 --- a/src/kai/triage.py +++ b/src/kai/triage.py @@ -413,6 +413,58 @@ async def run_triage( Raises: RuntimeError: If the subprocess fails or times out. """ + if agent_backend == "codex": + # Codex one-shot mode: --json emits NDJSON events on stdout + # (thread.started, turn.started, item.*, turn.completed, error). + # The final agent message text is recovered by walking the + # events and accumulating any text content; see + # _extract_codex_text() below for the schema-defensive parser. + # No sudo: codex on subscription auth uses the service user's + # own ~/.codex/auth.json; per-user OAuth isolation is the + # os_user lever in users.yaml (post-#353), not a sudo wrap. + # No --max-budget-usd: codex on subscription auth has no + # per-call billing; runaway protection comes from + # _TRIAGE_TIMEOUT at the asyncio.wait_for below. + triage_model = get_model_for( + ModelRole.ISSUE_TRIAGE, + agent_backend, + override=os.environ.get("ISSUE_TRIAGE_MODEL_CODEX", ""), + ) + cmd = [ + "codex", + "exec", + "--json", + "--model", + triage_model, + ] + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(input=prompt.encode()), + timeout=_TRIAGE_TIMEOUT, + ) + except TimeoutError: + proc.kill() + await proc.wait() + raise RuntimeError(f"Triage subprocess timed out after {_TRIAGE_TIMEOUT}s") from None + + if proc.returncode != 0: + error = stderr.decode().strip() + raise RuntimeError(f"Triage subprocess failed (exit {proc.returncode}): {error}") + + # Codex emits NDJSON; extract the final agent message text and + # return it (mirrors the contract the claude / goose branches + # already satisfy: a single string the caller hands to + # _parse_triage_json). + return _extract_codex_text(stdout.decode()) + if agent_backend == "goose": if not provider: raise ValueError( @@ -537,6 +589,92 @@ async def run_triage( return stdout.decode().strip() +def _extract_codex_text(stdout: str) -> str: + """ + Walk codex's NDJSON event stream and return the agent message text. + + `codex exec --json` emits one JSON event per line. The relevant + payload (the agent's final response) lives inside an item event; + the exact event name and field path are not yet pinned by smoke + test, so this parser is defensive about field names: + + - Each line is attempted as JSON; non-JSON lines are skipped. + - Within each parsed object, text content is recovered from any + of: top-level "text"; "content" as a string; "content" as a + list of {"type": "text", "text": ...} blocks (the JSON-RPC + convention used by goose ACP); "delta.text"; "item.content" + with the same list shape. + - All recovered text is concatenated in order; partial recoveries + are returned rather than raising, on the assumption that the + caller's JSON parser will surface a clearer error if the + result is malformed. + + The smoke test against a real codex CLI will reveal which of + these field paths actually fire for the pinned version; the + others remain as fallbacks for schema drift. + + Args: + stdout: The full stdout from `codex exec --json`. + + Returns: + The accumulated agent message text. Empty string if no + recognizable text content was found in any event. + """ + chunks: list[str] = [] + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + chunks.extend(_recover_text_from_event(obj)) + return "".join(chunks).strip() + + +def _recover_text_from_event(obj: dict) -> list[str]: + """ + Pull text content out of one codex NDJSON event. + + Defensive: tries multiple field paths the codex CLI is observed + or documented to use, and returns whatever text it finds. An + event with no recognizable text payload contributes nothing. + """ + out: list[str] = [] + # Top-level text field (common shape for "delta" events) + text = obj.get("text") + if isinstance(text, str): + out.append(text) + # delta.text (alternate streaming shape) + delta = obj.get("delta") + if isinstance(delta, dict): + d_text = delta.get("text") + if isinstance(d_text, str): + out.append(d_text) + # content as a string OR as a list of {type: text, text: ...} blocks + content = obj.get("content") + if isinstance(content, str): + out.append(content) + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + b_text = block.get("text") + if isinstance(b_text, str): + out.append(b_text) + # item.content for events wrapped in an "item" object + item = obj.get("item") + if isinstance(item, dict): + i_content = item.get("content") + if isinstance(i_content, list): + for block in i_content: + if isinstance(block, dict) and block.get("type") == "text": + b_text = block.get("text") + if isinstance(b_text, str): + out.append(b_text) + return out + + def _parse_triage_json(raw: str) -> dict: """ Parse Claude's triage response, stripping markdown fencing if present. diff --git a/tests/test_triage.py b/tests/test_triage.py index 5ecaca1..914979b 100644 --- a/tests/test_triage.py +++ b/tests/test_triage.py @@ -12,6 +12,7 @@ from kai.triage import ( _GOOSE_AGENT_MODELS, IssueMetadata, + _extract_codex_text, _parse_triage_json, _resolve_goose_model, _sanitize_search_query, @@ -651,6 +652,207 @@ async def test_empty_provider_raises(self): await run_triage("prompt", agent_backend="goose", provider="") +class TestRunTriageCodex: + """ + Tests for the codex branch of run_triage. + + The codex branch invokes `codex exec --json` and parses NDJSON + output via _extract_codex_text. No sudo wrap; subscription auth + uses the service user's own ~/.codex/auth.json. + """ + + @staticmethod + def _codex_ndjson(text: str) -> str: + """ + Build a minimal NDJSON stream that _extract_codex_text resolves + to the given final text. Uses the "content" list-of-blocks + shape (the JSON-RPC convention goose ACP uses) since the + actual codex schema is not yet pinned. + """ + events = [ + {"event": "thread.started"}, + {"event": "turn.started"}, + { + "event": "item.message.completed", + "content": [{"type": "text", "text": text}], + }, + {"event": "turn.completed"}, + ] + return "\n".join(json.dumps(e) for e in events) + "\n" + + @pytest.mark.asyncio + async def test_codex_argv_uses_codex_exec(self): + """ + Argv is `codex exec --json --model `, never claude or + goose. Locks the "no overlap" guarantee at the subprocess + boundary: the codex triage branch never spawns claude. + """ + mock_proc = _mock_subprocess(stdout=self._codex_ndjson('{"labels": []}')) + with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_triage("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + assert cmd[0] == "codex" + assert cmd[1] == "exec" + assert "--json" in cmd + assert "--print" not in cmd # No claude flag + + @pytest.mark.asyncio + async def test_codex_argv_uses_registry_model(self): + """ + With agent_backend=codex and no env override, the --model argv + slot matches the registry's (codex, ISSUE_TRIAGE) row + ("gpt-5.4-mini"). Locks the registry as the source of truth + for the codex side. + """ + mock_proc = _mock_subprocess(stdout=self._codex_ndjson("{}")) + with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_triage("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + i = cmd.index("--model") + assert cmd[i + 1] == "gpt-5.4-mini" + + @pytest.mark.asyncio + async def test_codex_env_override_honored_at_call_site(self, monkeypatch): + """ + ISSUE_TRIAGE_MODEL_CODEX in the environment overrides the + registry value at the call site. Same end-to-end env-override + wiring as the claude path. + """ + monkeypatch.setenv("ISSUE_TRIAGE_MODEL_CODEX", "gpt-5.4") + mock_proc = _mock_subprocess(stdout=self._codex_ndjson("{}")) + with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_triage("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + i = cmd.index("--model") + assert cmd[i + 1] == "gpt-5.4" + + @pytest.mark.asyncio + async def test_codex_no_sudo_even_with_claude_user(self): + """ + claude_user is a claude-specific sudo argument. The codex + branch ignores it; argv must NOT contain "sudo". + """ + mock_proc = _mock_subprocess(stdout=self._codex_ndjson("{}")) + with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_triage("prompt", agent_backend="codex", claude_user="some-user") + cmd = mock_exec.call_args[0] + assert "sudo" not in cmd + + @pytest.mark.asyncio + async def test_codex_no_max_budget_flag(self): + """ + --max-budget-usd is not emitted on the codex branch. Codex on + subscription auth has no per-call billing; runaway protection + comes from the asyncio.wait_for timeout. Mirror of the existing + claude-side absence assertion. + """ + mock_proc = _mock_subprocess(stdout=self._codex_ndjson("{}")) + with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec: + await run_triage("prompt", agent_backend="codex") + cmd = mock_exec.call_args[0] + assert "--max-budget-usd" not in cmd + + @pytest.mark.asyncio + async def test_codex_extracts_final_text_from_ndjson(self): + """ + Return value is the agent message text extracted from the + NDJSON event stream, not the raw stdout. The downstream + _parse_triage_json receives a single JSON object, not + multi-line NDJSON. + """ + expected_json = '{"labels": ["bug"], "summary": "A bug."}' + mock_proc = _mock_subprocess(stdout=self._codex_ndjson(expected_json)) + with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc): + result = await run_triage("prompt", agent_backend="codex") + # _extract_codex_text strips, so the result is the expected + # JSON string with no leading/trailing whitespace. + assert result == expected_json + + @pytest.mark.asyncio + async def test_codex_subprocess_failure_raises(self): + """Non-zero exit from codex raises RuntimeError with stderr.""" + mock_proc = _mock_subprocess(returncode=1, stderr="auth failed") + with ( + patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc), + pytest.raises(RuntimeError, match="auth failed"), + ): + await run_triage("prompt", agent_backend="codex") + + +class TestExtractCodexText: + """ + Unit tests for the _extract_codex_text NDJSON parser. + + The codex CLI schema is not yet pinned by smoke test; the parser + accepts multiple field paths (top-level "text"; "content" as + string or list-of-blocks; "delta.text"; "item.content") so a + schema variant the docs do not describe still yields the agent + message. These tests lock each path independently. + """ + + def test_empty_input(self): + """An empty stream returns the empty string.""" + assert _extract_codex_text("") == "" + + def test_skips_non_json_lines(self): + """Non-JSON lines are silently skipped.""" + stream = "not-json\n" + json.dumps({"text": "hello"}) + "\n" + assert _extract_codex_text(stream) == "hello" + + def test_extracts_top_level_text(self): + """Events with a top-level 'text' field contribute that string.""" + stream = json.dumps({"event": "delta", "text": "abc"}) + "\n" + assert _extract_codex_text(stream) == "abc" + + def test_extracts_delta_text(self): + """Events with 'delta.text' (alternate streaming shape) work.""" + stream = json.dumps({"event": "delta", "delta": {"text": "abc"}}) + "\n" + assert _extract_codex_text(stream) == "abc" + + def test_extracts_content_string(self): + """Events with 'content' as a string contribute that string.""" + stream = json.dumps({"event": "msg", "content": "hello"}) + "\n" + assert _extract_codex_text(stream) == "hello" + + def test_extracts_content_list_of_text_blocks(self): + """Events with 'content' as a list of {type:text, text:...} blocks work.""" + event = { + "event": "msg", + "content": [ + {"type": "text", "text": "part-A"}, + {"type": "text", "text": " part-B"}, + ], + } + stream = json.dumps(event) + "\n" + assert _extract_codex_text(stream) == "part-A part-B" + + def test_extracts_item_content(self): + """Events with text inside 'item.content[...]' work.""" + event = {"event": "item.message.completed", "item": {"content": [{"type": "text", "text": "from item"}]}} + stream = json.dumps(event) + "\n" + assert _extract_codex_text(stream) == "from item" + + def test_ignores_unknown_event_types(self): + """An event with no recognizable text payload contributes nothing.""" + stream = json.dumps({"event": "thread.started", "metadata": {"foo": "bar"}}) + "\n" + assert _extract_codex_text(stream) == "" + + def test_accumulates_across_events(self): + """Text from multiple events accumulates in order.""" + events = [ + {"event": "delta", "text": "Hello"}, + {"event": "delta", "text": " "}, + {"event": "delta", "text": "world"}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert _extract_codex_text(stream) == "Hello world" + + def test_strips_outer_whitespace(self): + """The accumulated result has leading/trailing whitespace stripped.""" + stream = json.dumps({"text": " hello "}) + "\n" + assert _extract_codex_text(stream) == "hello" + + class TestResolveGooseModelTriage: """Tests for _resolve_goose_model in triage.py.""" From 26d6e7555523ba773edb7e2e63b0966224cb9dbc Mon Sep 17 00:00:00 2001 From: Daniel Ellison Date: Fri, 15 May 2026 09:18:46 -0400 Subject: [PATCH 2/2] review: terminal-text wins over delta accumulation in codex NDJSON parser PR review M1: _extract_codex_text accumulated text from every recognized field path across every event. On the standard streaming shape that codex emits (delta chunks followed by a terminal consolidated message), this returned the message twice concatenated. For triage where the agent message is a JSON object, the doubled output is `{"labels":[]}{"labels":[]}`-shaped and _parse_triage_json cannot parse it, breaking the triage path on every codex run that streams. Fix: split recovery into terminal and chunk paths. - _recover_terminal_text recognizes complete-message field paths (top-level "content" as string or list-of-blocks, "item.content" as list-of-blocks). When any terminal event appears in the stream, the most recent one wins; delta chunks are discarded. - _recover_chunk_text recognizes streaming-delta paths ("delta.text", top-level "text" as a string). Chunks accumulate only when no terminal event was emitted in the stream. The existing 10 TestExtractCodexText cases continue to pass under the new design (the single-path streams in those tests are classified consistently). Three new regression tests: - test_terminal_text_wins_over_accumulated_deltas: worked example from the review. Asserts deltas + terminal returns the terminal text once. - test_last_terminal_wins_on_multiple_terminals: multiple terminal events resolve to the most recent. - test_deltas_only_when_no_terminal: no-terminal stream accumulates deltas as before. Locks the fallback the new logic must not short-circuit. Plus an integration test in TestRunTriageCodex: - test_codex_handles_streaming_deltas_plus_terminal: end-to-end through run_triage on a delta+terminal stream returns the terminal JSON exactly once, parseable downstream. M2 (unverified stdin contract for `codex exec`) and L1 (smoke checklist needs explicit `codex exec` triage path) folded into the PR body's smoke checklist rather than code changes; both are post-merge verification items. --- src/kai/triage.py | 141 ++++++++++++++++++++++++++++--------------- tests/test_triage.py | 78 ++++++++++++++++++++++++ 2 files changed, 171 insertions(+), 48 deletions(-) diff --git a/src/kai/triage.py b/src/kai/triage.py index 135baef..cde283a 100644 --- a/src/kai/triage.py +++ b/src/kai/triage.py @@ -593,21 +593,31 @@ def _extract_codex_text(stdout: str) -> str: """ Walk codex's NDJSON event stream and return the agent message text. - `codex exec --json` emits one JSON event per line. The relevant - payload (the agent's final response) lives inside an item event; - the exact event name and field path are not yet pinned by smoke + `codex exec --json` emits one JSON event per line. A streaming run + can produce two representations of the same message: incremental + chunks (delta events) and a single terminal/complete event that + carries the full consolidated text. Accumulating across both + representations would double the message; this parser prefers + the terminal text and only falls back to delta accumulation when + no terminal event appears in the stream. + + The exact event name and field path are not yet pinned by smoke test, so this parser is defensive about field names: - Each line is attempted as JSON; non-JSON lines are skipped. - - Within each parsed object, text content is recovered from any - of: top-level "text"; "content" as a string; "content" as a - list of {"type": "text", "text": ...} blocks (the JSON-RPC - convention used by goose ACP); "delta.text"; "item.content" - with the same list shape. - - All recovered text is concatenated in order; partial recoveries - are returned rather than raising, on the assumption that the - caller's JSON parser will surface a clearer error if the - result is malformed. + - Terminal/complete events are recognized by their FIELD PATH + rather than by an event-name string. Any of these signals a + complete message: top-level "content" as a string, top-level + "content" as a list of {"type":"text", "text":...} blocks, or + "item.content" with the same list shape. When multiple + terminal events appear, the last one wins (matches the + streaming convention that "completed" supersedes prior partials). + - Chunk/delta events use "delta.text" or a top-level "text" + field. Chunks accumulate in order. Used only when the stream + contains no terminal event. + - On any genuine schema mismatch the result is the empty string; + `_parse_triage_json` then raises a clearer error than a + doubled-up partial would. The smoke test against a real codex CLI will reveal which of these field paths actually fire for the pinned version; the @@ -617,10 +627,12 @@ def _extract_codex_text(stdout: str) -> str: stdout: The full stdout from `codex exec --json`. Returns: - The accumulated agent message text. Empty string if no - recognizable text content was found in any event. + The agent message text. Terminal text if any terminal event + was found; otherwise the accumulated delta/chunk text. Empty + string if no recognizable text was found in any event. """ - chunks: list[str] = [] + terminal_text: str | None = None + accumulated_chunks: list[str] = [] for line in stdout.splitlines(): line = line.strip() if not line: @@ -629,50 +641,83 @@ def _extract_codex_text(stdout: str) -> str: obj = json.loads(line) except json.JSONDecodeError: continue - chunks.extend(_recover_text_from_event(obj)) - return "".join(chunks).strip() + # Terminal event wins; the most recent terminal event + # supersedes earlier ones (a streaming run that emits + # interim consolidated events before the final one ends + # up with the latest version). + terminal = _recover_terminal_text(obj) + if terminal is not None: + terminal_text = terminal + continue + # Delta/chunk events accumulate, but only matter when no + # terminal event ever fires. + chunk = _recover_chunk_text(obj) + if chunk is not None: + accumulated_chunks.append(chunk) + if terminal_text is not None: + return terminal_text.strip() + return "".join(accumulated_chunks).strip() -def _recover_text_from_event(obj: dict) -> list[str]: +def _recover_terminal_text(obj: dict) -> str | None: """ - Pull text content out of one codex NDJSON event. + Pull a complete/terminal agent message from one codex event. + + Recognized field paths for a complete message: + - Top-level "content" as a string (single-block shape). + - Top-level "content" as a list of {"type":"text", "text":...} blocks. + - "item.content" with the same list-of-blocks shape (events + wrapped in an "item" object). - Defensive: tries multiple field paths the codex CLI is observed - or documented to use, and returns whatever text it finds. An - event with no recognizable text payload contributes nothing. + Returns the consolidated text string, or None if the event has + none of these paths (and is therefore either a chunk/delta or + pure metadata). """ - out: list[str] = [] - # Top-level text field (common shape for "delta" events) - text = obj.get("text") - if isinstance(text, str): - out.append(text) - # delta.text (alternate streaming shape) - delta = obj.get("delta") - if isinstance(delta, dict): - d_text = delta.get("text") - if isinstance(d_text, str): - out.append(d_text) - # content as a string OR as a list of {type: text, text: ...} blocks content = obj.get("content") if isinstance(content, str): - out.append(content) - elif isinstance(content, list): - for block in content: - if isinstance(block, dict) and block.get("type") == "text": - b_text = block.get("text") - if isinstance(b_text, str): - out.append(b_text) - # item.content for events wrapped in an "item" object + return content + if isinstance(content, list): + parts = [ + block["text"] + for block in content + if isinstance(block, dict) and block.get("type") == "text" and isinstance(block.get("text"), str) + ] + if parts: + return "".join(parts) item = obj.get("item") if isinstance(item, dict): i_content = item.get("content") if isinstance(i_content, list): - for block in i_content: - if isinstance(block, dict) and block.get("type") == "text": - b_text = block.get("text") - if isinstance(b_text, str): - out.append(b_text) - return out + parts = [ + block["text"] + for block in i_content + if isinstance(block, dict) and block.get("type") == "text" and isinstance(block.get("text"), str) + ] + if parts: + return "".join(parts) + return None + + +def _recover_chunk_text(obj: dict) -> str | None: + """ + Pull a streaming chunk/delta text fragment from one codex event. + + Recognized field paths for an incremental chunk: + - "delta.text" (the conventional streaming-delta shape). + - Top-level "text" string (an alternate shape some CLI versions emit). + + Returns the fragment text, or None if the event has no chunk-style + text field. + """ + delta = obj.get("delta") + if isinstance(delta, dict): + d_text = delta.get("text") + if isinstance(d_text, str): + return d_text + text = obj.get("text") + if isinstance(text, str): + return text + return None def _parse_triage_json(raw: str) -> dict: diff --git a/tests/test_triage.py b/tests/test_triage.py index 914979b..5022350 100644 --- a/tests/test_triage.py +++ b/tests/test_triage.py @@ -778,6 +778,35 @@ async def test_codex_subprocess_failure_raises(self): ): await run_triage("prompt", agent_backend="codex") + @pytest.mark.asyncio + async def test_codex_handles_streaming_deltas_plus_terminal(self): + """ + End-to-end through run_triage: a stream that contains both + delta chunks AND a terminal consolidated message returns the + terminal JSON exactly once, parseable by _parse_triage_json. + + This is the integration counterpart to + TestExtractCodexText::test_terminal_text_wins_over_accumulated_deltas. + Without the terminal-wins rule, the triage path would return + a doubled JSON string (`{"labels":[]}{"labels":[]}`) and the + downstream parser would fail on every codex run that streams. + """ + expected_json = '{"labels": ["bug"], "summary": "ok"}' + events = [ + {"event": "delta", "delta": {"text": '{"labels":'}}, + {"event": "delta", "delta": {"text": ' ["bug"], "summary": "ok"}'}}, + { + "event": "item.message.completed", + "item": {"content": [{"type": "text", "text": expected_json}]}, + }, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + mock_proc = _mock_subprocess(stdout=stream) + with patch("kai.triage.asyncio.create_subprocess_exec", return_value=mock_proc): + result = await run_triage("prompt", agent_backend="codex") + # Exactly the terminal JSON, not deltas + terminal concatenated. + assert result == expected_json + class TestExtractCodexText: """ @@ -852,6 +881,55 @@ def test_strips_outer_whitespace(self): stream = json.dumps({"text": " hello "}) + "\n" assert _extract_codex_text(stream) == "hello" + def test_terminal_text_wins_over_accumulated_deltas(self): + """ + A stream with both delta chunks and a terminal consolidated + message returns the terminal text exactly once, NOT the + deltas concatenated with the terminal. + + Without the terminal-wins rule, the parser would yield + `"HelloHello"` for the worked example below: triage's JSON + parser then fails because `{"labels":[]}{"labels":[]}` is + not a single JSON object, breaking the triage path on + every codex run that streams. This regression guard + protects against a future refactor that re-introduces + accumulate-across-representations. + """ + events = [ + {"event": "delta", "delta": {"text": "Hel"}}, + {"event": "delta", "delta": {"text": "lo"}}, + {"event": "item.message.completed", "item": {"content": [{"type": "text", "text": "Hello"}]}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert _extract_codex_text(stream) == "Hello" + + def test_last_terminal_wins_on_multiple_terminals(self): + """ + When a stream emits more than one terminal/complete event + (e.g. an interim consolidated text followed by a final one), + the most recent terminal text wins. Mirrors the streaming + convention that "completed" supersedes prior partials. + """ + events = [ + {"item": {"content": [{"type": "text", "text": "first"}]}}, + {"item": {"content": [{"type": "text", "text": "final"}]}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert _extract_codex_text(stream) == "final" + + def test_deltas_only_when_no_terminal(self): + """ + With no terminal event, delta chunks accumulate as the result. + Locks the fallback behavior the terminal-wins rule does not + short-circuit when no terminal text was emitted. + """ + events = [ + {"event": "delta", "delta": {"text": "Hel"}}, + {"event": "delta", "delta": {"text": "lo"}}, + ] + stream = "\n".join(json.dumps(e) for e in events) + "\n" + assert _extract_codex_text(stream) == "Hello" + class TestResolveGooseModelTriage: """Tests for _resolve_goose_model in triage.py."""