Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 93 additions & 26 deletions kennel/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ def __init__(
popen: Callable[..., subprocess.Popen[str]] = subprocess.Popen,
selector: Callable[..., tuple[list, list, list]] = select.select,
repo_name: str | None = None,
model: str = "claude-opus-4-6",
) -> None:
self._idle_timeout = idle_timeout
self._selector = selector
Expand All @@ -759,6 +760,13 @@ def __init__(
self._lock = threading.Lock()
self._cancel = threading.Event()
self._repo_name = repo_name
self._model = model
# Latest session_id seen in a stream-json event. Updated inside
# :meth:`iter_events` so a subsequent :meth:`switch_model` can
# restart with ``--resume <sid>`` and keep conversation context
# across the model swap. Empty until the first claude event with
# a session_id arrives.
self._session_id = ""
# True when the most recent :meth:`iter_events` call exited early
# because :attr:`_cancel` was set (i.e. another thread preempted the
# turn via :meth:`prompt`). Cleared at the start of each turn.
Expand Down Expand Up @@ -788,7 +796,15 @@ def last_turn_cancelled(self) -> bool:
return self._last_turn_cancelled

def _spawn(self) -> subprocess.Popen[str]:
"""Spawn the claude subprocess with bidirectional stream-json I/O."""
"""Spawn the claude subprocess with bidirectional stream-json I/O.

Model is set via ``--model`` at spawn time — the runtime ``/model``
slash command isn't honored in stream-json mode (claude echoes
"Unknown command: /model" and hangs without producing a turn
boundary). When :attr:`_session_id` is non-empty the new process
resumes the prior conversation via ``--resume`` so context
survives a model swap.
"""
cmd = [
"claude",
"--input-format",
Expand All @@ -797,9 +813,13 @@ def _spawn(self) -> subprocess.Popen[str]:
"stream-json",
"--verbose",
"--dangerously-skip-permissions",
"--model",
self._model,
"--system-prompt-file",
str(self._system_file),
]
if self._session_id:
cmd += ["--resume", self._session_id]
return self._popen_fn(
cmd,
stdin=subprocess.PIPE,
Expand All @@ -825,24 +845,38 @@ def pid(self) -> int:
return self._proc.pid

def restart(self) -> None:
"""Stop the current subprocess and start a fresh one.

Unregisters the dead process from ``_active_children``, kills it if
still running, then spawns a replacement and registers it. The
conversation transcript is lost — callers are responsible for
re-sending any context the new process needs.
"""Stop the current subprocess and start a fresh one with a **fresh
conversation** — the tracked ``session_id`` is cleared so ``_spawn``
does not pass ``--resume``.

Used by :class:`~kennel.worker.Worker` on issue boundaries to bound
context growth (one issue's conversation should not bleed into the
next). Contrast with :meth:`switch_model`, which deliberately
preserves ``session_id`` across the subprocess swap so the new
model picks up where the old one left off.

Callers are responsible for re-sending any context the new
process needs (e.g. the sub-skill prompt for the current phase).
"""
log.warning("ClaudeSession: restarting after unexpected process death")
_unregister_child(self._proc)
if self._proc.poll() is None:
try:
self._proc.kill()
self._proc.wait(timeout=1.0)
except (OSError, ProcessLookupError, subprocess.TimeoutExpired) as exc:
log.warning("ClaudeSession.restart: kill/wait failed: %s", exc)
raise
self._proc = self._spawn()
_register_child(self._proc)
log.info(
"ClaudeSession: restarting (clear conversation, model=%s)", self._model
)
with self._lock:
_unregister_child(self._proc)
if self._proc.poll() is None:
try:
self._proc.kill()
self._proc.wait(timeout=1.0)
except (OSError, ProcessLookupError, subprocess.TimeoutExpired) as exc:
log.warning("ClaudeSession.restart: kill/wait failed: %s", exc)
raise
# Fresh conversation — drop the tracked session_id so _spawn omits
# --resume. Any thread waiting on ``with session:`` blocks on
# :attr:`_lock` until the new subprocess is listening.
self._session_id = ""
self._proc = self._spawn()
_register_child(self._proc)
log.info("ClaudeSession: restart complete, new pid %d", self._proc.pid)

@property
def owner(self) -> str | None:
Expand Down Expand Up @@ -992,15 +1026,42 @@ def prompt(
return self.consume_until_result()

def switch_model(self, model: str) -> None:
"""Switch the active model by sending a /model slash command.

Sends ``/model <model>`` as a user message and drains any response
events so the turn boundary is clean before the next call to
:meth:`send` + :meth:`iter_events`.
"""Switch the active model. Restart-based — stream-json does not
accept ``/model`` or any slash command (claude echoes "Unknown
command" and never emits a turn boundary, hanging the reader).

Holds :attr:`_lock` for the full swap so callers waiting on
:meth:`__enter__` block gracefully until the new subprocess is
listening. When a prior ``session_id`` is known we pass
``--resume`` to the new subprocess so the conversation transcript
carries over across the swap — no context loss when phase
transitions flip opus → sonnet or vice versa.

No-op when *model* equals the current model.
"""
self.send(f"/model {model}")
for _ in self.iter_events():
pass
if model == self._model:
return
log.info(
"switch_model: %s → %s (restart-based, resume=%s)",
self._model,
model,
self._session_id or "—",
)
with self._lock:
_unregister_child(self._proc)
if self._proc.poll() is None:
try:
self._proc.kill()
self._proc.wait(timeout=1.0)
except (OSError, ProcessLookupError, subprocess.TimeoutExpired) as exc:
log.warning(
"switch_model: kill/wait of old subprocess failed: %s", exc
)
raise
self._model = model
self._proc = self._spawn()
_register_child(self._proc)
log.info("switch_model: new pid %d ready (model=%s)", self._proc.pid, model)

def iter_events(self) -> Iterator[dict]:
"""Yield parsed stream-json events for the current turn.
Expand Down Expand Up @@ -1045,6 +1106,12 @@ def iter_events(self) -> Iterator[dict]:
obj = json.loads(line)
log.debug("ClaudeSession event: %s", _Trunc(line))
last_activity = time.monotonic()
# Track the latest session_id so :meth:`switch_model` can
# restart with ``--resume <sid>`` and keep conversation
# context across the swap.
sid = obj.get("session_id")
if isinstance(sid, str) and sid:
self._session_id = sid
yield obj
if obj.get("type") in ("result", "error"):
break
Expand Down
136 changes: 105 additions & 31 deletions tests/test_claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,33 +1482,86 @@ def test_skips_close_when_stdin_already_closed(self, tmp_path: Path) -> None:


class TestClaudeSessionSwitchModel:
def test_sends_model_slash_command(self, tmp_path: Path) -> None:
import json as _json

result_line = _json.dumps({"type": "result", "result": ""}) + "\n"
proc = _make_session_proc([result_line])
session = _make_session(tmp_path, proc)
def test_same_model_is_noop(self, tmp_path: Path) -> None:
"""When the target matches the current model, nothing happens."""
proc = _make_session_proc([])
session = _make_session(tmp_path, proc) # default model claude-opus-4-6
current_proc = session._proc
session.switch_model("claude-opus-4-6")
written = proc.stdin.write.call_args.args[0]
obj = _json.loads(written.strip())
assert obj["message"]["content"] == "/model claude-opus-4-6"

def test_drains_response_events(self, tmp_path: Path) -> None:
import json as _json
assert session._proc is current_proc
# stdin.write should NOT have been called for a /model slash command.
assert proc.stdin.write.call_count == 0

lines = [
_json.dumps({"type": "assistant", "text": "Switching..."}) + "\n",
_json.dumps({"type": "result", "result": ""}) + "\n",
]
proc = _make_session_proc(lines)
session = _make_session(tmp_path, proc)
# Must not raise or leave unread events blocking future reads
def test_different_model_respawns_with_new_flag(self, tmp_path: Path) -> None:
"""Switching model kills the old proc and spawns a new one with
--model <new>, passing --resume when session_id is known."""
system_file = tmp_path / "system.md"
system_file.write_text("sys")
old_proc = _make_session_proc([])
old_proc.pid = 1001
new_proc = _make_session_proc([])
new_proc.pid = 1002
fake_popen = MagicMock(side_effect=[old_proc, new_proc])
session = ClaudeSession(
system_file,
work_dir=tmp_path,
popen=fake_popen,
selector=MagicMock(return_value=([], [], [])),
model="claude-opus-4-6",
repo_name="owner/repo",
)
# Prior turn established a session_id — switch_model must preserve
# conversation by passing --resume to the new subprocess.
session._session_id = "sid-123"
session.switch_model("claude-sonnet-4-6")
old_proc.kill.assert_called_once()
assert session._proc is new_proc
assert session._model == "claude-sonnet-4-6"
# Second spawn call had --model claude-sonnet-4-6 and --resume sid-123.
second_cmd = fake_popen.call_args_list[1].args[0]
assert "--model" in second_cmd
assert second_cmd[second_cmd.index("--model") + 1] == "claude-sonnet-4-6"
assert "--resume" in second_cmd
assert second_cmd[second_cmd.index("--resume") + 1] == "sid-123"

def test_switch_raises_when_kill_fails(self, tmp_path: Path) -> None:
"""kill/wait failure during switch_model re-raises so the caller
can decide how to recover."""
import subprocess

def test_works_when_command_produces_no_output(self, tmp_path: Path) -> None:
proc = _make_session_proc([]) # immediate EOF
session = _make_session(tmp_path, proc)
session.switch_model("claude-haiku-4-5-20251001") # must not raise
system_file = tmp_path / "system.md"
system_file.write_text("sys")
old_proc = _make_session_proc([], poll_returns=None)
old_proc.kill = MagicMock()
old_proc.wait = MagicMock(side_effect=subprocess.TimeoutExpired("claude", 1.0))
fake_popen = MagicMock(side_effect=[old_proc, _make_session_proc([])])
session = ClaudeSession(
system_file,
work_dir=tmp_path,
popen=fake_popen,
selector=MagicMock(return_value=([], [], [])),
model="claude-opus-4-6",
)
with pytest.raises(subprocess.TimeoutExpired):
session.switch_model("claude-sonnet-4-6")

def test_switch_with_no_prior_session_id_omits_resume(self, tmp_path: Path) -> None:
system_file = tmp_path / "system.md"
system_file.write_text("sys")
old_proc = _make_session_proc([])
new_proc = _make_session_proc([])
fake_popen = MagicMock(side_effect=[old_proc, new_proc])
session = ClaudeSession(
system_file,
work_dir=tmp_path,
popen=fake_popen,
selector=MagicMock(return_value=([], [], [])),
model="claude-opus-4-6",
)
# No prior session_id (fresh session) — no --resume flag.
session.switch_model("claude-sonnet-4-6")
second_cmd = fake_popen.call_args_list[1].args[0]
assert "--resume" not in second_cmd


class TestClaudeSessionConsumeUntilResult:
Expand Down Expand Up @@ -1618,7 +1671,7 @@ def test_restart_unregisters_old_proc(self, tmp_path: Path) -> None:
# cleanup
session.stop()

def test_restart_logs_warning(self, tmp_path: Path, caplog) -> None:
def test_restart_logs_info(self, tmp_path: Path, caplog) -> None:
import logging as _logging

system_file = tmp_path / "system.md"
Expand All @@ -1630,11 +1683,32 @@ def test_restart_logs_warning(self, tmp_path: Path, caplog) -> None:
session = ClaudeSession(
system_file, work_dir=tmp_path, popen=fake_popen, selector=fake_selector
)
with caplog.at_level(_logging.WARNING, logger="kennel.claude"):
with caplog.at_level(_logging.INFO, logger="kennel.claude"):
session.restart()
assert any("restart" in r.message.lower() for r in caplog.records)
session.stop()

def test_restart_clears_session_id(self, tmp_path: Path) -> None:
"""restart drops session_id so the new spawn starts a fresh
conversation (opposite of switch_model which preserves it)."""
system_file = tmp_path / "system.md"
system_file.write_text("sys")
old_proc = _make_session_proc([])
new_proc = _make_session_proc([])
fake_popen = MagicMock(side_effect=[old_proc, new_proc])
session = ClaudeSession(
system_file,
work_dir=tmp_path,
popen=fake_popen,
selector=MagicMock(return_value=([], [], [])),
)
session._session_id = "sid-123"
session.restart()
assert session._session_id == ""
# Second spawn call had no --resume.
second_cmd = fake_popen.call_args_list[1].args[0]
assert "--resume" not in second_cmd

def test_restart_skips_kill_when_process_already_dead(self, tmp_path: Path) -> None:
system_file = tmp_path / "system.md"
system_file.write_text("sys")
Expand Down Expand Up @@ -1797,7 +1871,6 @@ def test_prompt_routes_through_session(self, tmp_path: Path) -> None:
proc = _make_session_proc(
[
'{"type":"result","result":""}\n', # drain after interrupt
'{"type":"result","result":""}\n', # /model ack
'{"type":"result","result":"hello world"}\n', # actual turn
]
)
Expand All @@ -1810,15 +1883,16 @@ def test_prompt_routes_through_session(self, tmp_path: Path) -> None:
popen=fake_popen,
selector=fake_selector,
repo_name="owner/repo",
model="claude-opus-4-6",
)
try:
result = session.prompt("hi there", model="claude-opus-4-6")
assert result == "hello world"
# send() was called: /model line + main content.
sent = [call.args[0] for call in proc.stdin.write.call_args_list]
combined = "".join(sent)
assert "/model claude-opus-4-6" in combined
assert "hi there" in combined
# model param matches current → switch_model is a no-op;
# stdin only carries the user message body.
sent = "".join(call.args[0] for call in proc.stdin.write.call_args_list)
assert "hi there" in sent
assert "/model" not in sent
finally:
session.stop()

Expand Down
Loading