Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions src/kai/triage.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,58 @@ async def run_triage(
Raises:
RuntimeError: If the subprocess fails or times out.
"""
if agent_backend == "codex":
# Codex one-shot mode: --json emits NDJSON events on stdout
# (thread.started, turn.started, item.*, turn.completed, error).
# The final agent message text is recovered by walking the
# events and accumulating any text content; see
# _extract_codex_text() below for the schema-defensive parser.
# No sudo: codex on subscription auth uses the service user's
# own ~/.codex/auth.json; per-user OAuth isolation is the
# os_user lever in users.yaml (post-#353), not a sudo wrap.
# No --max-budget-usd: codex on subscription auth has no
# per-call billing; runaway protection comes from
# _TRIAGE_TIMEOUT at the asyncio.wait_for below.
triage_model = get_model_for(
ModelRole.ISSUE_TRIAGE,
agent_backend,
override=os.environ.get("ISSUE_TRIAGE_MODEL_CODEX", ""),
)
cmd = [
"codex",
"exec",
"--json",
"--model",
triage_model,
]

proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=prompt.encode()),
timeout=_TRIAGE_TIMEOUT,
)
except TimeoutError:
proc.kill()
await proc.wait()
raise RuntimeError(f"Triage subprocess timed out after {_TRIAGE_TIMEOUT}s") from None

if proc.returncode != 0:
error = stderr.decode().strip()
raise RuntimeError(f"Triage subprocess failed (exit {proc.returncode}): {error}")

# Codex emits NDJSON; extract the final agent message text and
# return it (mirrors the contract the claude / goose branches
# already satisfy: a single string the caller hands to
# _parse_triage_json).
return _extract_codex_text(stdout.decode())

if agent_backend == "goose":
if not provider:
raise ValueError(
Expand Down Expand Up @@ -537,6 +589,137 @@ async def run_triage(
return stdout.decode().strip()


def _extract_codex_text(stdout: str) -> str:
"""
Walk codex's NDJSON event stream and return the agent message text.

`codex exec --json` emits one JSON event per line. A streaming run
can produce two representations of the same message: incremental
chunks (delta events) and a single terminal/complete event that
carries the full consolidated text. Accumulating across both
representations would double the message; this parser prefers
the terminal text and only falls back to delta accumulation when
no terminal event appears in the stream.

The exact event name and field path are not yet pinned by smoke
test, so this parser is defensive about field names:

- Each line is attempted as JSON; non-JSON lines are skipped.
- Terminal/complete events are recognized by their FIELD PATH
rather than by an event-name string. Any of these signals a
complete message: top-level "content" as a string, top-level
"content" as a list of {"type":"text", "text":...} blocks, or
"item.content" with the same list shape. When multiple
terminal events appear, the last one wins (matches the
streaming convention that "completed" supersedes prior partials).
- Chunk/delta events use "delta.text" or a top-level "text"
field. Chunks accumulate in order. Used only when the stream
contains no terminal event.
- On any genuine schema mismatch the result is the empty string;
`_parse_triage_json` then raises a clearer error than a
doubled-up partial would.

The smoke test against a real codex CLI will reveal which of
these field paths actually fire for the pinned version; the
others remain as fallbacks for schema drift.

Args:
stdout: The full stdout from `codex exec --json`.

Returns:
The agent message text. Terminal text if any terminal event
was found; otherwise the accumulated delta/chunk text. Empty
string if no recognizable text was found in any event.
"""
terminal_text: str | None = None
accumulated_chunks: list[str] = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
# Terminal event wins; the most recent terminal event
# supersedes earlier ones (a streaming run that emits
# interim consolidated events before the final one ends
# up with the latest version).
terminal = _recover_terminal_text(obj)
if terminal is not None:
terminal_text = terminal
continue
# Delta/chunk events accumulate, but only matter when no
# terminal event ever fires.
chunk = _recover_chunk_text(obj)
if chunk is not None:
accumulated_chunks.append(chunk)
if terminal_text is not None:
return terminal_text.strip()
return "".join(accumulated_chunks).strip()


def _recover_terminal_text(obj: dict) -> str | None:
"""
Pull a complete/terminal agent message from one codex event.

Recognized field paths for a complete message:
- Top-level "content" as a string (single-block shape).
- Top-level "content" as a list of {"type":"text", "text":...} blocks.
- "item.content" with the same list-of-blocks shape (events
wrapped in an "item" object).

Returns the consolidated text string, or None if the event has
none of these paths (and is therefore either a chunk/delta or
pure metadata).
"""
content = obj.get("content")
if isinstance(content, str):
return content
if isinstance(content, list):
parts = [
block["text"]
for block in content
if isinstance(block, dict) and block.get("type") == "text" and isinstance(block.get("text"), str)
]
if parts:
return "".join(parts)
item = obj.get("item")
if isinstance(item, dict):
i_content = item.get("content")
if isinstance(i_content, list):
parts = [
block["text"]
for block in i_content
if isinstance(block, dict) and block.get("type") == "text" and isinstance(block.get("text"), str)
]
if parts:
return "".join(parts)
return None


def _recover_chunk_text(obj: dict) -> str | None:
"""
Pull a streaming chunk/delta text fragment from one codex event.

Recognized field paths for an incremental chunk:
- "delta.text" (the conventional streaming-delta shape).
- Top-level "text" string (an alternate shape some CLI versions emit).

Returns the fragment text, or None if the event has no chunk-style
text field.
"""
delta = obj.get("delta")
if isinstance(delta, dict):
d_text = delta.get("text")
if isinstance(d_text, str):
return d_text
text = obj.get("text")
if isinstance(text, str):
return text
return None


def _parse_triage_json(raw: str) -> dict:
"""
Parse Claude's triage response, stripping markdown fencing if present.
Expand Down
Loading
Loading