Drive a single persistent claude per worker via bidirectional stream-json (closes #455)#456
Conversation
Introduces ClaudeSession to kennel/claude.py — a long-lived claude process that stays open across turns via --input-format stream-json --output-format stream-json. Provides send(), iter_events(), and stop() with full idle-timeout detection, _active_children registration for graceful shutdown, and dependency injection on popen/selector for testability. 22 new tests, all at 100% coverage.
4c8fed2 to
b8cc8d0
Compare
Sends /model <model> as a user message and drains any response events so the turn boundary is clean before the next send/iter_events cycle. Three tests cover: correct command content, response draining, and graceful handling when the command produces no output.
|
It would be cool if kennel status could show (uptime, events/min ↓, commands/min ↑) avg last 10min moving window? |
|
Ooh that's a fun idea — rolling 10min window of uptime, events/min, and commands/min would make But it's out of scope for this PR — I want to keep this one focused on the persistent claude stream work. I've opened #457 to track it: #457 |
Exhausts iter_events() for one turn and returns the result field from the type=result event, or "" when the turn ends without one (EOF, type=error, or non-string result). Five tests cover each exit path.
|
It would be nice to surface when compaction is running in |
|
Ooh that's a great idea — surfacing |
|
You're going to need to determine how to interrupt Claude with a streaming event to preempt for things like ci failures first or rescopes |
|
Woof, good catch — want to make sure I sniff in the right direction here. A few ways I could go:
My gut says (1) — it's the cleanest and matches what stream-json is designed for. But do you have a preference, or a specific failure mode you're worried about (e.g. tool calls in flight, partial edits)? Want to make sure I chase the right ball. 🐾 |
1 |
|
Woof, going with option (1) then! tail wag I'll send a For tool calls in flight — the interrupt will cancel the current turn, and the follow-up user message will give Claude the new situation to react to. Partial edits stay on disk; the next turn can sniff around and reconcile if needed. Off to fetch! 🐾 |
|
Woof! Just a heads up — the task from your comment got a little polish on its title ("Implement" is now capitalized). Same scope, same plan. tail wag Back to sniffing out the implementation! |
Adds create_session(fido_dir) and stop_session() methods to Worker. Worker.run() calls create_session right after setup_hooks (persona.md as system prompt) and calls stop_session in the finally block alongside teardown_hooks, so the session is always cleaned up even on exception. A module-level autouse fixture in test_worker.py patches ClaudeSession so no real subprocess is spawned by the 30 existing worker.run() tests. 10 new tests cover session creation, stop, no-op stop, and run lifecycle.
ClaudeSession gains _spawn() (extracted subprocess creation), is_alive() (poll() is None), and restart() (unregister old proc, kill, spawn fresh, re-register). Restart swallows OSError/ProcessLookupError/TimeoutExpired so it never raises when the process is already gone. Worker gets _ensure_session_alive(fido_dir): if the session process has died unexpectedly it calls session.restart() and logs a warning. This is called in Worker.run() after seed_tasks_from_pr_body, just before the first handler (handle_ci / handle_threads / execute_task), so a dead session is always replaced before any work starts — tasks are never lost due to session process death between iterations. 14 new tests cover is_alive, restart lifecycle, error handling, and the _ensure_session_alive / run() integration.
|
Woof! Just a heads up — the task from your comment got updated to match new requirements. Back to sniffing! tail wag |
Replaces the magic numbers 10.0 (select poll interval) and 200 (log line truncation) with named module-level constants _SELECT_POLL_INTERVAL and _LOG_LINE_TRUNCATE, applied consistently across _run_streaming, iter_events, and all log sites in the file.
Each bare 'pass' in ClaudeSession.stop() is replaced with a log.debug that includes the exception message, so cleanup failures are visible in debug logs rather than disappearing without a trace. Tests updated to assert the log messages are present.
A non-JSON line from the claude subprocess is a protocol violation — silently skipping it with a warning hides bugs. Removed the try/except around json.loads() in iter_events() so JSONDecodeError propagates to the caller. Updated docstring and replaced the "skips unparseable lines" test with one that asserts the exception is raised.
|
Claude sessions should persist across worked death but not home/kennel death |
Also reraise the OSError from stdin close in stop(), consistent with the log+raise pattern applied to the other exception handlers.
Guard the kill/wait block with poll() is None so that restart() spawns a fresh process even when the old one was already killed (e.g. by the idle-timeout handler in iter_events()).
After killing the idle process, call restart() to spawn a replacement so the session always has a live subprocess when ClaudeStreamError propagates. restart() sees poll() != None (process already dead) and skips the kill, going straight to spawn.
When Worker picks up a different issue than the one the session was last
working on, restart the session before starting work. This clears the
accumulated conversation transcript so context doesn't grow unboundedly
across issues.
WorkerThread tracks _session_issue alongside _session and passes both to
each Worker iteration. Worker.run() detects the boundary (issue !=
session_issue) and calls restart() + switch_model("claude-opus-4-6"),
then sets session_fresh=True so the normal Sonnet switch fires after
setup — matching the full model sequence of a brand-new session.
Track which thread holds the ClaudeSession lock and surface it in the status display. ClaudeSession.owner records threading.current_thread().name in __enter__ and clears it in __exit__. WorkerThread.session_owner delegates to the session. WorkerRegistry.get_session_owner delegates to the thread. The /status JSON endpoint includes session_owner, which flows through _fetch_activities → repo_status → RepoStatus → _format_repo_body where it appears as "held by <thread>" on the claude pid line. Example output: └─ claude pid 12345 (running 3m, held by worker-home)
…e restarts Add explicit lifetime/persistence model notes to ClaudeSession and WorkerThread docstrings, and a matching paragraph in CLAUDE.md. Session survives: individual Worker crashes — WorkerThread._session is passed into each new Worker and handed back after run() returns, so a watchdog-restarted thread inherits the existing session. Session does not survive: kennel/home restarts — os.execvp replaces the process, all in-memory state is lost, and the new WorkerThread starts with _session = None.
…events() Clearing the cancel event in __enter__() created a window where a signal that arrived after the previous holder's __exit__ but before the next holder's __enter__() would be silently erased. Moving the clear to the top of iter_events() narrows the window to essentially zero — between clear() and the first loop check — while still starting each turn with a clean slate. Adds a concurrency test verifying cancel survives the __exit__→__enter__ handoff, updates the "cancel before iter_events" test to assert the new cleared-at-start behavior, and adds a "cancel set during turn" test for the mid-polling abort path.
The outer finally in WorkerThread.run() was unconditionally stopping the session even when the thread crashed, so the registry's watchdog-restart always started fresh — the 'survives crashes' contract existed in docs but not in code. Now the outer finally only stops the session on orderly shutdown (_stop is True). A crashed thread leaves the session alive; WorkerRegistry.start() detects the dead-but-not-stopped thread, rescues its live session, and passes it to the replacement WorkerThread via the new session/session_issue constructor params. WorkerThread.__init__ and _make_thread are updated to thread these params through. Adds: test_session_preserved_when_worker_raises, test_session_accepted_via_ constructor, test_start_rescues_session_from_crashed_thread, and test_start_does_not_rescue_session_from_orderly_shutdown_thread.
If stdin.close() raised an OSError, stop() re-raised before reaching the finally that calls _unregister_child(), leaving a stale proc in _active_children and contradicting the docstring. Wrap the entire stop() body in an outer try/finally so _unregister_child is guaranteed to run regardless of which inner operation fails. Add test_unregisters_even_when_stdin_close_raises to lock this in.
Previously interrupt() only set a local _cancel event and sent follow-up
content after acquiring the lock. The Claude subprocess kept producing the
current turn uninterrupted, leaving unread old-turn output on stdout for
the next iter_events() caller to inherit.
Now interrupt() is one owner-controlled sequence under the lock:
1. Set _cancel so any concurrent iter_events() exits on its next poll.
2. Acquire the lock.
3. Send {"type":"control_request","request_id":"...","request":{"subtype":
"interrupt"}} so the subprocess aborts the current turn at the protocol
level.
4. Drain events via consume_until_result() to reach a clean turn boundary.
5. Send the follow-up user message.
Adds TestClaudeSessionSendControlInterrupt covering the new helper, and
rewrites TestClaudeSessionInterrupt to verify the control_request/drain/
follow-up sequence and cancel-before-lock ordering.
ClaudeSession.preempt() / take_queued_content() were never wired into the abort path (events.py calls registry.abort_task() → WorkerThread sets _abort_task → checked only after claude_run() returns), so preempt.json became dead state that survived process restarts for no reason. Remove PreemptQueue from state.py, the preempt_queue constructor param and both methods from ClaudeSession, the import and usage from create_session() in WorkerThread, and all related tests. create_session no longer needs the fido_dir arg, so that is removed from the signature and call sites too.
#477) (#478) Fixes the \"setup produced no tasks\" regression from #456. ## Problem When the persistent \`ClaudeSession\` was introduced in #456, \`claude_start\` / \`claude_run\` started sending only \`fido_dir/prompt\` (context) to the session. The sub-skill instructions — \`setup.md\`, \`task.md\`, \`ci.md\`, \`comments.md\`, \`resume.md\` — were built into \`fido_dir/system\` by \`build_prompt\` but **never delivered** to the session. Claude saw only the bare \"Request: ... Repo: ... Branch: ...\" with no instructions, produced empty output, and \`find_or_create_pr\` raised \`setup produced no tasks\`. Observed live after #475 landed: fido stuck on #465, watchdog kept restarting, three abandoned branches. ## Fix The persistent session already has \`persona.md\` as its system prompt, so prepending the full \`fido_dir/system\` (persona + sub-skill) would duplicate the persona. Instead: - \`build_prompt\` now writes a third file \`fido_dir/skill\` containing just the sub-skill content (\`setup.md\`, \`task.md\`, …) - New \`_session_turn_prompt\` helper reads \`fido_dir/skill\` + \`fido_dir/prompt\` and joins them with \`\\n\\n---\\n\\n\` - Both \`claude_start\` and \`claude_run\` persistent-session paths now send \`_session_turn_prompt(fido_dir)\` Tests updated: \`_setup_fido_dir\` fixtures now write \`skill\`, and the \`session_path_sends_prompt_content\` tests expect the combined \`<skill>\\n\\n---\\n\\n<prompt>\` body. 100% coverage, suite passes. Closes #477. --------- Co-authored-by: Fido Can Code <190991155+FidoCanCode@users.noreply.github.com>
Introduces a persistent ClaudeSession that keeps a single stream-json subprocess alive across a worker's setup, task, nudge, thread, and CI phases, switching between Opus for planning/triage and Sonnet for implementation, and bounds context growth by restarting on issue boundaries. Remaining work fixes a session teardown bug so cleanup only runs on orderly shutdown (with a crash-recovery test and an outer try/finally around
stop()to always unregister the child), resolves a lost-interrupt race from clearing_cancelon every lock acquisition, sends a stream-jsoncontrol_requestinterrupt inClaudeSession.interruptbefore follow-up content, and removes the durable preempt queue since it has no production path.Fixes #455.
Work queue
Completed (40)
_cancelon every lock acquisition creates a lost-interrupt