diff --git a/.gitignore b/.gitignore index 9c45b85..3ecb9c1 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,6 @@ venv/ .idea/ .DS_Store -.ruff_cache/ \ No newline at end of file +.ruff_cache/ +# Blog/writing scratch produced by mining session jsonls — not committed +mining/ diff --git a/src/capabilities/slack.md b/src/capabilities/slack.md index 920e720..5404002 100644 --- a/src/capabilities/slack.md +++ b/src/capabilities/slack.md @@ -70,6 +70,29 @@ The ACK is a commitment to a direction, not a contract. If work surfaces somethi Why this rule exists: the gap between a `:hourglass:` reaction and Sam's final reply is anywhere from 30 seconds to 6 minutes. In that gap, the operator can't tell whether Sam understood the ask, is on the right track, or is about to ship the wrong thing. The ACK closes that gap. +## Pausing for operator input mid-work — `ask_operator` + +Sam runs as a single-event-at-a-time async daemon. There is no synchronous "stop and wait for input" primitive — the LLM cannot pause an in-flight session and resume it. But Sam CAN break out async-style: post the question, exit cleanly, and let the operator's reply (whenever it arrives) trigger a continuation session. + +The mechanism is the `ask_operator(question: str)` tool, available only to the main agent. When called, the tool atomically posts the question to the originating thread + adds a `:speech_balloon:` reaction on the question post. The session then exits. **Slack reactions are the source of truth for the paused state** — no in-memory daemon map, no parallel state store. The `:speech_balloon:` on a bot post means "this question is awaiting input." Operator-visible at a glance. + +When the operator replies in the thread, the daemon: (1) scans `thread_history` (already fetched on every inbound) for the most recent bot message carrying `:speech_balloon:` from this bot, (2) looks up the session_id of that pause from the per-session ledger (`sessions.jsonl` filtered by `thread_ts` + `ask_operator_called=True` + most-recent-before-the-question-post), (3) injects `paused_session_id` into the new IncomingMessage, (4) clears the `:speech_balloon:` (marks resolved). Sam's continuation prompt then instructs it to: read the prior audit-log slice to see what was already done, apply the operator's answer, continue from where the previous session stopped. + +Daemon restart is not a special case. The reaction lives on Slack, the ledger lives on GCS, both persist. Next inbound triggers detection naturally — no in-memory rebuild needed. + +**Use `ask_operator` ONLY for genuine unknown unknowns** — something Sam cannot resolve from: +- the current Slack thread +- the journal (`/data/journal/*.md`) +- the audit log (`/data/tool_calls/.jsonl`) +- Sam's own source (`src/identity.md`, capabilities, skills) +- the operator's earlier messages in this same thread + +When the operator already gave the answer earlier in the thread, re-reading is the right move — `ask_operator` for something already-answered burns operator attention and is a worse failure than just paying attention. When Sam is 70% sure, the right move is the existing pattern: make the conservative call, disclose it in the final reply (*"Hit a fork between A and B — picked A because constraint K; flag if wrong"*). `ask_operator` is for the cases where conservative-call-and-disclose is genuinely unsafe. + +**Workers, pro_executor, and the mentor do NOT have `ask_operator`.** Narrow-scoped subagents must return their finding to the main agent, never escalate to the operator directly. The main agent decides whether to pause. + +**Abandoned questions are reviewed by Sam itself during `daily-maintenance`.** The daily-maintenance skill's "abandoned questions" section enumerates `:speech_balloon:` reactions from this bot via `reactions.list`, filters to those older than 24h, and Sam decides per-question whether to: post a contextual reminder (keeps `:speech_balloon:`, resets the clock), mark abandoned (clears `:speech_balloon:`, posts a one-line close), or escalate. The daemon does NOT autonomously do this — daemon detects mechanically, Sam holds the judgment. No separate cron, no new state store; the existing daily routine handles it. + ## Live UX (streaming, plan, feedback, references) These are *patterns* with judgment calls, not always-on defaults. The detailed when/when-not rules are in `src/skills/slack-dynamic-messaging.md` — read it before deciding whether to stream a reply, attach feedback buttons, set a plan block, or cite sources. The shape of the decision is "would a human reader be glad I turned this on?", not "is this feature available?" diff --git a/src/runtime/adk_runner.py b/src/runtime/adk_runner.py index c1d7dd5..54fd17e 100644 --- a/src/runtime/adk_runner.py +++ b/src/runtime/adk_runner.py @@ -734,6 +734,126 @@ async def consult_opus( return consult_opus +def _make_ask_operator_tool( + *, + channel: Optional[str], + thread_ts: Optional[str], + event_ts: Optional[str], +): + """Return a coroutine that posts a question to the originating Slack + thread, adds a `:speech_balloon:` reaction on the question post + atomically (race-mitigation), and signals the LLM to end the session. + + The async-question pattern: Sam, mid-work, hits a genuine unknown + unknown — something it cannot resolve from existing context. Sam + calls `ask_operator(question=...)`. The tool posts the question, + reacts on its own post with `:speech_balloon:` so the operator can + glance and see "Sam paused, awaiting input", then returns a string + telling the LLM to exit. The session ends cleanly; the daemon's + in-memory `paused_threads` map records the pause; when the operator + replies in the thread, the daemon detects the continuation and + spawns a session with the previous session_id injected so Sam can + reconstruct prior state from the audit log. + + Only the MAIN agent gets this tool. Workers, pro_executor, and the + mentor cannot pause — they have narrow scopes and must return to + their caller, never escalate to the operator directly. + + Channel/thread_ts come from the IncomingMessage and are bound here + at closure-time, so the LLM doesn't need to pass them. If they're + missing (scheduled/retry/synthetic sessions), the tool errors — + `ask_operator` is meaningless without an originating thread. + """ + async def ask_operator(question: str) -> str: + """Pause this session mid-work and await operator input via Slack. + + Use ONLY for genuine unknown unknowns — something you cannot + resolve from the current Slack thread, the journal, the audit + log, or your own source files. NOT for "I'm 70% sure" (make the + conservative call + disclose). NOT for clarifications the + operator already gave (re-read the thread). NOT for things the + operator might know — verify the gap first. + + The pattern is async: this tool posts your question, the + session exits cleanly, and the operator's reply (whenever it + comes) will trigger a new continuation session with full + audit-log context so you can pick up where you left off. + + question: the question to post. Write it consequence-first — + "Should I do A or B?" beats "I'm thinking about A and B + and not sure". The operator should be able to reply in + one sentence. + + Returns a sentinel string telling the LLM to end the session. + """ + import aiohttp + from .config import SLACK_BOT_TOKEN + + if not channel: + return "(ask_operator unavailable: no originating channel — this session has no Slack thread to pause on. Make the call yourself and disclose in your final reply.)" + if not SLACK_BOT_TOKEN: + return "(ask_operator unavailable: SLACK_BOT_TOKEN not set in env. Make the call yourself and disclose in your final reply.)" + + target_thread_ts = thread_ts or event_ts + post_body = { + "channel": channel, + "text": question, + } + if target_thread_ts: + post_body["thread_ts"] = target_thread_ts + + headers = { + "Authorization": f"Bearer {SLACK_BOT_TOKEN}", + "Content-Type": "application/json; charset=utf-8", + } + + try: + async with aiohttp.ClientSession() as http: + # 1. Post the question. + async with http.post( + "https://slack.com/api/chat.postMessage", + headers=headers, + json=post_body, + ) as resp: + post_json = await resp.json() + if not post_json.get("ok"): + err = post_json.get("error", "unknown") + return f"(ask_operator failed at chat.postMessage: {err}. Make the call yourself and disclose in your final reply.)" + + posted_ts = post_json.get("ts") + # 2. Atomically add :speech_balloon: to the question post. + # If this fails, the question is still posted — the + # operator will see and reply, the daemon's continuation + # detection still works via the paused_threads map. So + # we don't fail the whole tool on reaction failure. + if posted_ts: + try: + async with http.post( + "https://slack.com/api/reactions.add", + headers=headers, + json={ + "channel": channel, + "timestamp": posted_ts, + "name": "speech_balloon", + }, + ) as react_resp: + await react_resp.json() + except Exception: + log.exception("ask_operator: reaction.add failed (non-fatal)") + except Exception as exc: + log.exception("ask_operator HTTP call failed") + return f"(ask_operator HTTP error: {type(exc).__name__}: {exc}. Make the call yourself and disclose in your final reply.)" + + return ( + "Question posted to the Slack thread; :speech_balloon: added " + "to the post. **End this session now** — do not call any " + "more tools. The operator's reply will trigger a continuation " + "session that picks up via the audit log." + ) + + return ask_operator + + # ─── ADK runner ─────────────────────────────────────────────────────────────── @@ -837,6 +957,20 @@ async def run(self, request: AgentRunRequest) -> AgentRunResult: tools=worker_tools, ) + # `ask_operator` — pause the session mid-work and post a question + # to the originating Slack thread, then exit cleanly. The + # operator's reply triggers a continuation session via the + # daemon's `paused_threads` map. ONLY the main agent gets this + # tool — workers / pro_executor / mentor must not escalate to + # the operator directly; they return to their caller. Tool + # closes over Slack origin coords so the LLM signature is just + # `ask_operator(question: str)`. + ask_operator_tool = _make_ask_operator_tool( + channel=request.slack_channel, + thread_ts=request.slack_thread_ts, + event_ts=request.slack_event_ts, + ) + # `consult_opus` — dispatch the mentor (Opus, read-only) to review # accumulated context. Sam DOES NOT decide to invoke autonomously — # only the `daily-maintenance` skill (review step) and the @@ -862,6 +996,7 @@ async def run(self, request: AgentRunRequest) -> AgentRunResult: FunctionTool(func=parallel_workers), AgentTool(agent=pro_executor_agent), FunctionTool(func=consult_opus_tool), + FunctionTool(func=ask_operator_tool), ], ) diff --git a/src/runtime/daemon.py b/src/runtime/daemon.py index 9d08924..7153fa0 100644 --- a/src/runtime/daemon.py +++ b/src/runtime/daemon.py @@ -51,6 +51,7 @@ SAM_CHANNEL, SAM_MAIN_MODEL, SAM_OPERATOR_USER_ID, + SESSIONS_PATH, SLACK_APP_TOKEN, SLACK_BOT_TOKEN, THREAD_CACHE_TTL_SECONDS, @@ -157,6 +158,39 @@ def _format_session_badges(result: SessionResult) -> str: WORKER_WEDGE_THRESHOLD_SECONDS = int(MAX_SESSION_SECONDS * 1.5) +def _find_active_paused_question( + thread_history: list[dict], + bot_user_id: str, +) -> Optional[dict]: + """Walk `thread_history` newest-first; return the first bot message + whose reactions include `:speech_balloon:` placed by this bot. + + This is the daemon's continuation-detection input. The `:speech_balloon:` + reaction is added by `ask_operator` when Sam pauses, and removed by the + daemon when a continuation is detected — so at any time there's at most + one active paused question per thread. Most-recent-wins because Sam may + have paused, been answered (reaction cleared), and paused again on a + follow-up question; the unresolved one is the most recent reaction. + + Returns the raw Slack message dict (has `ts`, `text`, etc.) or None + when there's no active pause in the thread. + + `thread_history` is the daemon's per-event fetched list (via + `conversations.replies`) which includes reactions inline per message. + No extra API call is needed for the scan. + """ + for msg in reversed(thread_history): + # Only bot-posted messages can be Sam's pause posts. + if not msg.get("bot_id"): + continue + for reaction in msg.get("reactions") or []: + if reaction.get("name") != "speech_balloon": + continue + if bot_user_id in (reaction.get("users") or []): + return msg + return None + + def _is_too_old_for_recovery( event_ts: str, now: float, max_age_seconds: int = RECOVERY_MAX_AGE_SECONDS, ) -> bool: @@ -587,6 +621,68 @@ async def _append_session_badges( # so Sam can fetch on demand via `slack-files` if it really needs to. _IMAGE_FETCH_MAX_BYTES = 20 * 1024 * 1024 # 20 MiB + def _lookup_paused_session_id( + self, + thread_ts: str, + question_post_ts: str, + ) -> Optional[str]: + """Look up the session_id that called `ask_operator` in this thread. + + Reads `sessions.jsonl` (the per-session ledger), filters to entries + in this thread with `ask_operator_called=True`, and returns the + session_id whose `ts_start` is the latest one strictly before + `question_post_ts`. That's the session whose `ask_operator` call + produced the question post we're now continuing. + + Returns None when the ledger doesn't exist, the ledger has no + matching entry, or any parse error — caller falls back gracefully + to a standard session (Sam still has thread_history, just no + explicit previous_session_id link to the audit log). + """ + import json + if not SESSIONS_PATH.exists(): + return None + try: + target_ts = float(question_post_ts) + except (TypeError, ValueError): + return None + best: tuple[float, str] | None = None # (ts_start, session_id) + try: + for line in SESSIONS_PATH.read_text().splitlines(): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError as exc: + # Malformed line in the ledger — log at debug (not + # warn — a corrupt line shouldn't spam) and skip. + log.debug( + "_lookup_paused_session_id: skipping malformed " + "sessions.jsonl line: %s", + exc, + ) + continue + if entry.get("thread_ts") != thread_ts: + continue + if not entry.get("ask_operator_called"): + continue + try: + entry_ts = float(entry.get("ts_start", 0)) + except (TypeError, ValueError): + continue + # The session must have started BEFORE the question post — + # the post is something the session DID, so its ts_start + # has to precede the post. + if entry_ts > target_ts: + continue + if best is None or entry_ts > best[0]: + best = (entry_ts, entry.get("session_id") or "") + except Exception: + log.exception("_lookup_paused_session_id failed for thread %s", thread_ts) + return None + return best[1] if (best and best[1]) else None + async def _fetch_image_attachments(self, files: list[dict]) -> list[dict]: """Pre-fetch image bytes from Slack so the runner can attach them as multimodal Parts on the user message. @@ -858,6 +954,53 @@ async def _handle_event(self, event: dict, *, recovered: bool = False) -> None: # handling — fetch failures are non-fatal; the user message still # mentions the file in text and Sam can fall back to the URL path. image_attachments = await self._fetch_image_attachments(files) + # Continuation detection — reaction-driven (Slack is the source of + # truth, no in-memory state). When a previous session in this + # thread called `ask_operator` and paused, the question post + # carries a `:speech_balloon:` reaction from this bot. We walk + # thread_history newest-first, find the most recent such post, + # look up the session_id that posted it from sessions.jsonl + # (filtered by thread_ts + ask_operator_called=True), inject + # paused_session_id, and clear the reaction (signals resolved). + # Top-level mentions (thread_ts=None) can't be continuations. + paused_session_id: Optional[str] = None + if thread_ts and thread_history and self.bot_user_id: + question_msg = _find_active_paused_question( + thread_history=thread_history, + bot_user_id=self.bot_user_id, + ) + if question_msg is not None: + question_post_ts = question_msg.get("ts") or "" + paused_session_id = self._lookup_paused_session_id( + thread_ts=thread_ts, + question_post_ts=question_post_ts, + ) + if paused_session_id: + log.info( + "thread %s has paused session %s (question post ts=%s); " + "routing reply as continuation", + thread_ts, paused_session_id, question_post_ts, + ) + # Clear the :speech_balloon: from the question post — the + # reaction's purpose was to mark "pending input", and the + # operator's reply now resolves it. Idempotent: if remove + # fails (e.g. already cleared, or API hiccup), continuation + # still proceeds correctly — the only cost is the reaction + # lingering until daily-maintenance's abandoned-question + # scan ages it out. + try: + await self.app.client.reactions_remove( + channel=channel, + timestamp=question_post_ts, + name="speech_balloon", + ) + except SlackApiError as exc: + log.warning( + "failed to clear :speech_balloon: on %s/%s after " + "detecting continuation (non-fatal): %s", + channel, question_post_ts, exc.response.get("error"), + ) + message = IncomingMessage( channel=channel, user=user, @@ -871,6 +1014,7 @@ async def _handle_event(self, event: dict, *, recovered: bool = False) -> None: thread_history=thread_history, recovered=recovered, image_attachments=image_attachments, + paused_session_id=paused_session_id, ) # Pre-warm the thread-participation cache so subsequent replies in # this thread route without a Slack round-trip. @@ -1242,6 +1386,22 @@ async def _worker(self) -> None: silent_exit = first_result.silent_exit(message=message) if not first_result.failed and not silent_exit: + # When Sam called `ask_operator`, the pause is recorded + # in two places that survive a daemon restart: the + # `:speech_balloon:` reaction on the question post + # (operator-visible state), and `ask_operator_called=True` + # in the sessions ledger (machine-readable for the + # continuation lookup). The daemon doesn't track pauses + # in memory — when an inbound arrives, `_handle_event` + # detects the pause from thread_history reactions and + # looks up the paused_session_id from sessions.jsonl. + if first_result.ask_operator_called and not message.scheduled: + log.info( + "session %s paused via ask_operator on thread %s; " + "continuation will be routed via :speech_balloon: detection", + first_result.session_id, + message.thread_ts or message.event_ts, + ) await self._append_session_badges(message, first_result) await self._mark_lifecycle( message, remove="hourglass_flowing_sand", add="white_check_mark", diff --git a/src/runtime/ledger.py b/src/runtime/ledger.py index a2ea101..8dc8b59 100644 --- a/src/runtime/ledger.py +++ b/src/runtime/ledger.py @@ -47,6 +47,12 @@ class SessionLedgerEntry: thread_ts: Optional[str] trigger: str # "slack_mention" | "cron:" | "retry" | "catchup" operator_principal: bool + # True when this session called `ask_operator` and paused awaiting input. + # The daemon's continuation detection reads this when a reply arrives in + # the same thread: filter sessions.jsonl by thread_ts + ask_operator_called + # to find the paused_session_id to inject into the new IncomingMessage. + # See `_lookup_paused_session_id` in daemon.py. + ask_operator_called: bool = False def _derive_exit_state(*, exit_code: Optional[int], stuck: bool, timed_out: bool) -> str: diff --git a/src/runtime/prompts.py b/src/runtime/prompts.py index 5a86e94..2a91d96 100644 --- a/src/runtime/prompts.py +++ b/src/runtime/prompts.py @@ -87,6 +87,43 @@ "headline. Consequence-first. Then exit." ) +CONTINUATION_INTRO = ( + "This is a CONTINUATION session. The previous session — id " + "`{previous_session_id}` — called `ask_operator(question=...)` to " + "pause mid-work and wait for the operator's input. The operator's " + "reply (the most recent Slack message in this thread, shown below) " + "is what unblocks you.\n" + "\n" + "**Your job in this session:**\n" + "\n" + "1. **Reconstruct what the previous session already did.** Read its " + "audit-log slice — `/data/tool_calls/.jsonl` filtered by " + "session_id `{previous_session_id}`. Note edits to `src/`, " + "`gh pr create / edit` invocations (and their PR numbers), Linear " + "issue saves, worker / `consult_opus` dispatches. The audit log is " + "ground truth — do not infer from journal text alone (the journal " + "could be wrong; the audit log is what actually happened).\n" + "2. **Read the operator's reply** (below + in the thread history) " + "and apply it. The reply might unblock the work, change the " + "direction, cancel the work, or contain another sub-question.\n" + "3. **Continue from where the previous session stopped.** Do not " + "redo work that already shipped (don't re-open a PR that exists; " + "don't re-edit files that were already edited). Build on what's " + "already done.\n" + "4. **Close the loop yourself before exiting.** Post a substantive " + "reply in this thread (channel={channel}, thread_ts={thread_target}) " + "summarizing what changed because of the operator's answer. If you " + "still need MORE input, call `ask_operator` again — that's fine, " + "but only if the new question is genuinely unanswerable from " + "current context." +) + +CONTINUATION_OUTRO = ( + "Remember: the prior session did real work. Read its audit log " + "first. Don't redo what's already shipped. Apply the operator's " + "answer. Then close the loop with a final reply in the thread." +) + SCHEDULED_SKILL_TEMPLATE = ( "This is a SCHEDULED SKILL invocation — not a Slack message. " "The daemon's scheduler triggered `{skill_name}`. " diff --git a/src/runtime/session.py b/src/runtime/session.py index 60bd940..ed23a2c 100644 --- a/src/runtime/session.py +++ b/src/runtime/session.py @@ -25,6 +25,8 @@ redact_secrets, ) from .prompts import ( + CONTINUATION_INTRO, + CONTINUATION_OUTRO, RETRY_SESSION_INTRO, RETRY_SESSION_OUTRO, SILENT_EXIT_INTRO, @@ -58,6 +60,12 @@ class IncomingMessage: display_name: Optional[str] = None # Slack display/real name, resolved by the daemon is_principal_operator: bool = False # True iff `user` == SAM_OPERATOR_USER_ID retry_context: Optional[dict] = None # Set on a one-shot retry session after a failed first attempt + # When the daemon detects this inbound is a reply in a thread where Sam + # previously called `ask_operator` and paused, this carries the session + # id of the paused session. The continuation prompt uses it to point + # Sam at the audit-log slice to reconstruct prior state. See + # `_format_continuation_message`. + paused_session_id: Optional[str] = None scheduled: bool = False # True when synthesised by the daemon's scheduler (not a real Slack message) recovered: bool = False # True when the daemon re-queued this on boot from `reactions.list` (see daemon._recover_from_reactions). Set so Sam knows the message is stale and the prior attempt was interrupted. raw_event: dict = field(repr=False, default_factory=dict) @@ -280,6 +288,8 @@ def to_initial_user_message(self) -> str: if self.retry_context.get("silent_exit"): return self._format_silent_exit_message() return self._format_retry_message() + if self.paused_session_id: + return self._format_continuation_message() if self.scheduled: return self._format_scheduled_message() @@ -321,6 +331,36 @@ def _format_scheduled_message(self) -> str: """ return self.text + def _format_continuation_message(self) -> str: + """Initial user message when this inbound is the operator's reply + to a session that paused via `ask_operator`. + + Distinct from silent-exit / retry retries: the previous session + SUCCEEDED at its work AND posted its question — the loop was + deliberately paused, not failed. The continuation's job is to + read the prior session's audit-log slice, see what was done, + apply the operator's answer (visible in thread_history), and + continue from where the previous session stopped. + """ + thread_target = self.thread_ts or self.event_ts + thread_part = f"thread_ts={self.thread_ts}" if self.thread_ts else "no thread" + history_block = self._format_thread_history() + preamble = f"{history_block}\n---\n\n" if history_block else "" + return ( + CONTINUATION_INTRO.format( + previous_session_id=self.paused_session_id, + channel=self.channel, + thread_target=thread_target, + ) + + "\n\n" + + preamble + + f"## The operator's reply (this Slack message in {thread_part})\n\n" + + f"From {self._sender_label()}:\n\n" + + self.text + + "\n\n" + + CONTINUATION_OUTRO + ) + def _format_silent_exit_message(self) -> str: """Initial user message for a retry triggered by silent-exit detection. @@ -470,6 +510,13 @@ class AgentRunRequest: # (cron-fired skills, scheduled wake-ups, retries, etc.). # Shape: [{"mime_type": "image/png", "data": , "name": str}]. image_attachments: list[dict] = field(default_factory=list) + # Slack origin coordinates — bound into the `ask_operator` tool's + # closure so the LLM doesn't need to pass them. None for + # scheduled / synthetic sessions; in those cases `ask_operator` + # errors gracefully (no thread to pause on). + slack_channel: Optional[str] = None + slack_thread_ts: Optional[str] = None + slack_event_ts: Optional[str] = None @dataclass @@ -544,6 +591,11 @@ class SessionResult: # taxonomy. Used by `silent_exit()` to decide whether to spawn a # narrating retry. closed_loop: bool = False + # True when the session called `ask_operator` to deliberately pause and + # await operator input. The daemon uses this to populate the + # `paused_threads` map so the operator's reply (when it comes) is + # routed as a continuation session with full audit-log context. + ask_operator_called: bool = False stderr_tail: list[str] = field(default_factory=list) synthetic_errors: list[str] = field(default_factory=list) @@ -621,6 +673,13 @@ async def run(self) -> SessionResult: agent_result.stuck, agent_result.timed_out, ) + # Classify tool use BEFORE writing the ledger so ask_operator_called + # is recorded — the daemon's reaction-based continuation detection + # looks up paused sessions via this ledger field. + worker_used, web_used, bash_used, edited_files, closed_loop, ask_operator_called = self._classify_tool_use( + agent_result.tool_use_records, + ) + # Layer 1b: per-session ledger line — best-effort, never raises (ledger swallows). try: from .ledger import SessionLedgerEntry, record_session @@ -641,14 +700,11 @@ async def run(self) -> SessionResult: thread_ts=self.message.thread_ts, trigger=self._classify_trigger(), operator_principal=self.message.is_principal_operator, + ask_operator_called=ask_operator_called, )) except Exception: log.exception("session-ledger write failed (continuing)") - worker_used, web_used, bash_used, edited_files, closed_loop = self._classify_tool_use( - agent_result.tool_use_records, - ) - result = SessionResult( session_id=self.session_id, started_at=agent_result.started_at, @@ -663,6 +719,7 @@ async def run(self) -> SessionResult: bash_used=bash_used, edited_files=edited_files, closed_loop=closed_loop, + ask_operator_called=ask_operator_called, stderr_tail=agent_result.stderr_tail, synthetic_errors=agent_result.synthetic_errors, ) @@ -684,6 +741,14 @@ def _build_run_request(self, initial_user_message: str) -> AgentRunRequest: runner can attach them as multimodal Parts. Text-only sessions (scheduled wake-ups, retries) carry an empty list. """ + # Slack coordinates are passed only for real Slack-triggered + # sessions (not scheduled / synthetic). The runner closes over + # them when constructing the `ask_operator` tool; if they're + # None, the tool errors gracefully (no thread to pause on). + is_real_slack_inbound = ( + not getattr(self.message, "scheduled", False) + and not getattr(self.message, "retry_context", None) + ) return AgentRunRequest( system_prompt=assemble_system_prompt(), initial_user_message=initial_user_message, @@ -692,6 +757,9 @@ def _build_run_request(self, initial_user_message: str) -> AgentRunRequest: cwd=str(SAM_REPO), env=os.environ.copy(), image_attachments=self.message.image_attachments, + slack_channel=self.message.channel if is_real_slack_inbound else None, + slack_thread_ts=self.message.thread_ts if is_real_slack_inbound else None, + slack_event_ts=self.message.event_ts if is_real_slack_inbound else None, ) def _agent_graph_label(self) -> str: @@ -740,9 +808,9 @@ def _classify_trigger(self) -> str: @staticmethod def _classify_tool_use( records: list[ToolUseRecord], - ) -> tuple[bool, bool, bool, bool, bool]: - """Return (worker_used, web_used, bash_used, edited_files, closed_loop) for the - post-session badges and the daemon's silent-exit gate. + ) -> tuple[bool, bool, bool, bool, bool, bool]: + """Return (worker_used, web_used, bash_used, edited_files, closed_loop, ask_operator_called) + for the post-session badges, the silent-exit gate, and the pause-state gate. ADK tool names (function names from adk_runner.py): - worker_used = "worker" (single dispatch) or "parallel_workers" @@ -795,6 +863,7 @@ def _classify_tool_use( web_used = False bash_used = False edited_files = False + ask_operator_called = False last_outward_idx = -1 last_post_idx = -1 for i, record in enumerate(records): @@ -810,6 +879,14 @@ def _classify_tool_use( is_outward = True elif name == "consult_opus": is_outward = True + elif name == "ask_operator": + # ask_operator both posts a question to Slack AND signals + # an intentional pause. Counts as a post (closes the loop: + # the operator now sees the question) — and is NOT counted + # as additional outward work, since the post IS the work + # for this turn. + is_post = True + ask_operator_called = True elif name == "bash": command = input_dict.get("command") or "" if "chat.postMessage" in command or "chat.update" in command: @@ -833,7 +910,7 @@ def _classify_tool_use( # last outward-facing call (or no outward-facing work happened # at all — Sam just answered a question without tools). closed_loop = last_post_idx >= 0 and last_post_idx > last_outward_idx - return worker_used, web_used, bash_used, edited_files, closed_loop + return worker_used, web_used, bash_used, edited_files, closed_loop, ask_operator_called def _safety_net_journal_entry(self, result: SessionResult) -> None: """Append a minimal journal entry when something went wrong. diff --git a/src/skills/daily-maintenance/skill.md b/src/skills/daily-maintenance/skill.md index b1a6ddb..c176006 100644 --- a/src/skills/daily-maintenance/skill.md +++ b/src/skills/daily-maintenance/skill.md @@ -217,7 +217,43 @@ Here are the key shifts and consequences for how we interact: Three failure modes in one post: greeting prefix (noise), "exceptionally high-velocity" (grandiose, judgement of self), and **missing the open PRs entirely**. The 2026-05-24 07:02 fire shipped exactly this shape; rewriting §5 was the response. -## 6. Skill hygiene +## 6. Review abandoned `ask_operator` questions + +When a session calls `ask_operator(question)` to pause for input, the tool adds a `:speech_balloon:` reaction to the question post. When the operator replies in the thread, the daemon detects the continuation via that reaction and clears it (marks resolved). But if the operator never replies, the `:speech_balloon:` lingers — the question is abandoned. Sam reviews these here. + +**Enumerate aged-out paused questions.** Use the Slack bot token to query the bot's own reactions and filter to `:speech_balloon:` markers older than 24h: + +```bash +curl -s "https://slack.com/api/reactions.list?user=$SAM_BOT_USER_ID&count=100" \ + -H "Authorization: Bearer $SLACK_BOT_TOKEN" \ + | jq --argjson cutoff "$(date -u -d '24 hours ago' +%s)" ' + .items[] + | select(.message.reactions[]?.name == "speech_balloon") + | select((.message.ts | tonumber | floor) < $cutoff) + | {channel: .channel, ts: .message.ts, text: .message.text} + ' +``` + +(`SAM_BOT_USER_ID` is in the env. The `count=100` is a Slack pagination param, not a magic number — increase if Sam paused that much in one stretch, which it shouldn't.) + +**For each abandoned question, decide what to do.** This is Sam's judgment call, not mechanical: + +| Signal | Action | +|---|---| +| The question is still relevant; operator may have just been busy | Post a one-line contextual reminder in the same thread: *"still curious — \?"* The reaction stays; the 24h clock resets from the new post. | +| The work has moved on or been resolved differently (check thread + recent journal entries) | Mark abandoned. Clear the `:speech_balloon:` via `reactions.remove`. Post one line: *"closing this question — feel free to start a new thread when this comes up again."* | +| The question is genuinely urgent and the silence is itself a signal | Escalate — re-post the question with explicit framing about why the answer is needed, no reaction this time. Better than spamming reminders. | + +**What to read before deciding.** +- The original question's thread (read all messages) +- The audit log slice for the paused session (find session_id via `sessions.jsonl` filtered by `thread_ts` + `ask_operator_called=True`) +- The journal entries from around the paused session — sometimes context Sam wrote there clarifies whether the question still matters + +**Don't auto-reminder more than once per question.** The 24h cutoff catches a fresh abandonment; if Sam already reminded once and the operator still didn't reply after another 24h, the right move is "mark abandoned," not a second reminder. Walk the thread before posting — if Sam's own message ≤24h ago in the thread is a prior reminder, mark abandoned instead. + +**The daemon does NOT do this autonomously.** Detection is mechanical, but the response is Sam's judgment. The daemon's role is exposing the data; Sam's role is deciding what to say. + +## 7. Skill hygiene Check the `src/skills/` directory for general hygiene. Run a quick check across `*/skill.md`: - Flag any `skill.md` over 500 lines. diff --git a/tests/runtime/test_ask_operator.py b/tests/runtime/test_ask_operator.py new file mode 100644 index 0000000..16474e4 --- /dev/null +++ b/tests/runtime/test_ask_operator.py @@ -0,0 +1,419 @@ +"""Tests for the async-question pattern (`ask_operator` tool + continuation +session shape + daemon paused_threads map). + +The pattern (per `src/capabilities/slack.md`): +- Sam, mid-work, hits a genuine unknown unknown. +- Sam calls `ask_operator(question)`. The tool posts the question to the + originating thread + adds `:speech_balloon:` atomically + returns a + sentinel string telling the LLM to exit. +- Daemon detects ask_operator was called, registers `thread_ts -> + session_id` in `_paused_threads`. +- Operator replies in the thread → daemon pops the entry, injects + `paused_session_id` into the new IncomingMessage. +- Runner dispatches the continuation prompt shape (read prior audit log + + apply operator's answer + continue from where the prior session + stopped). + +These tests defend: +1. `_classify_tool_use` correctly sets `ask_operator_called` and treats + the call as a post (closing the silent-exit loop). +2. `IncomingMessage.paused_session_id` triggers `_format_continuation_message` + instead of the default / retry / silent-exit shapes. +3. `_make_ask_operator_tool` errors gracefully when no channel is + bound (scheduled / synthetic sessions) instead of crashing. +""" +from __future__ import annotations + +from src.runtime.session import IncomingMessage, SamSession, ToolUseRecord + + +def _ask_op(question: str = "should I do A or B?") -> ToolUseRecord: + return ToolUseRecord(name="ask_operator", input={"question": question}) + + +def _bash(command: str) -> ToolUseRecord: + return ToolUseRecord(name="bash", input={"command": command}) + + +def _read(path: str = "/tmp/x.md") -> ToolUseRecord: + return ToolUseRecord(name="read_file", input={"file_path": path}) + + +# ─── _classify_tool_use — ask_operator integration ─────────────────────────── + + +def test_ask_operator_call_sets_ask_operator_called_flag(): + """The 6th classifier return value must surface ask_operator calls + so the daemon can populate paused_threads.""" + records = [_read(), _ask_op()] + *_, ask_operator_called = SamSession._classify_tool_use(records) + assert ask_operator_called is True + + +def test_ask_operator_call_closes_the_silent_exit_loop(): + """ask_operator counts as a post for the silent-exit gate — Sam + DID communicate (the question is the reply for this turn).""" + records = [_read(), _ask_op()] + *_, closed_loop, _ = SamSession._classify_tool_use(records) + assert closed_loop is True + + +def test_ask_operator_after_substantive_work_closes_loop(): + """The headline pattern: Sam does outward work (gh, edit_file) then + pauses with ask_operator. The pause IS the close-the-loop action + for this turn — the operator now has Sam's state + question.""" + records = [ + _bash("gh pr create --title 'wip'"), + _read("/tmp/sam/src/runtime/foo.py"), + _ask_op("which of these implementations do you want — X or Y?"), + ] + *_, closed_loop, ask_operator_called = SamSession._classify_tool_use(records) + assert closed_loop is True + assert ask_operator_called is True + + +def test_ask_operator_followed_by_more_work_DOES_NOT_close_loop(): + """If Sam pauses and then keeps doing tool work after the pause, + the loop isn't closed — Sam should have exited after asking. + This case shouldn't happen in practice (the tool returns a 'exit now' + sentinel), but the classifier defends against it anyway.""" + records = [ + _ask_op("which approach?"), + _bash("gh pr create"), # outward work AFTER the pause = bad + ] + *_, closed_loop, ask_operator_called = SamSession._classify_tool_use(records) + assert closed_loop is False + assert ask_operator_called is True + + +def test_no_ask_operator_means_ask_operator_called_is_false(): + """Sessions without ask_operator must have the flag set False so the + daemon doesn't accidentally register a phantom pause.""" + records = [_bash("gh pr create"), _bash("curl chat.postMessage")] + *_, ask_operator_called = SamSession._classify_tool_use(records) + assert ask_operator_called is False + + +# ─── IncomingMessage.paused_session_id → continuation prompt ───────────────── + + +def test_paused_session_id_dispatches_to_continuation_template(): + """When the daemon injects paused_session_id, to_initial_user_message + must produce the continuation-shape prompt, not the default.""" + msg = IncomingMessage( + channel="C_TEST", user="U_TEST", text="A — let's go with A", + thread_ts="100.0", event_ts="120.0", + paused_session_id="previousdead", + ) + body = msg.to_initial_user_message() + # Continuation-specific framing must appear: + assert "CONTINUATION session" in body + assert "previousdead" in body + assert "ask_operator" in body + # The operator's reply must be visible in the prompt: + assert "A — let's go with A" in body + # Must NOT use retry / silent-exit / scheduled framing: + assert "previous Sam session attempting to respond to a Slack message FAILED" not in body + assert "exited cleanly" not in body + assert "SCHEDULED SKILL" not in body + + +def test_paused_session_id_takes_precedence_over_default_shape(): + """When both paused_session_id and recovered are set, the continuation + path wins (it's more specific). Recovery is for daemon-crash-and-resume; + continuation is for deliberate-pause-and-resume.""" + msg = IncomingMessage( + channel="C_TEST", user="U_TEST", text="answer", + thread_ts="100.0", event_ts="120.0", + paused_session_id="prev123", + recovered=True, # would normally trigger the recovered preamble + ) + body = msg.to_initial_user_message() + # Continuation shape wins: + assert "CONTINUATION session" in body + assert "prev123" in body + + +def test_no_paused_session_id_means_standard_prompt_shape(): + """Sessions without paused_session_id behave exactly as before — no + accidental continuation framing leaks into normal sessions.""" + msg = IncomingMessage( + channel="C", user="U", text="hi", + thread_ts=None, event_ts="1.0", + ) + body = msg.to_initial_user_message() + assert "CONTINUATION session" not in body + assert "ask_operator" not in body + + +def test_retry_context_takes_precedence_over_paused_session_id(): + """If both are set (shouldn't happen, but defensive), retry/silent-exit + wins because failure-narration is more important than continuation + resumption. Defends against an edge case where a failed pause spawns + a retry; the retry should narrate the failure, not try to continue.""" + msg = IncomingMessage( + channel="C", user="U", text="ans", + thread_ts="100", event_ts="120", + retry_context={"exit_code": 1}, + paused_session_id="prev", + ) + body = msg.to_initial_user_message() + # Retry wins: + assert "previous Sam session attempting to respond to a Slack message FAILED" in body + assert "CONTINUATION session" not in body + + +# ─── _make_ask_operator_tool — graceful failures ───────────────────────────── + + +async def test_ask_operator_tool_errors_gracefully_without_channel(): + """Scheduled / synthetic sessions don't have a Slack origin. The tool + must return a graceful error string (not crash) so the LLM can + fall back to its normal flow.""" + from src.runtime.adk_runner import _make_ask_operator_tool + + tool = _make_ask_operator_tool(channel=None, thread_ts=None, event_ts=None) + result = await tool("any question") + assert "ask_operator unavailable" in result + assert "no originating channel" in result + + +async def test_ask_operator_tool_errors_gracefully_without_slack_token(monkeypatch): + """Missing SLACK_BOT_TOKEN must also produce a graceful error.""" + from src.runtime import adk_runner, config + + monkeypatch.setattr(config, "SLACK_BOT_TOKEN", "") + tool = adk_runner._make_ask_operator_tool( + channel="C", thread_ts="100", event_ts="120", + ) + result = await tool("any question") + assert "ask_operator unavailable" in result + assert "SLACK_BOT_TOKEN not set" in result + + +# ─── _find_active_paused_question — reaction-based detection ────────────────── + + +def test_find_active_paused_question_returns_none_for_empty_history(): + """No thread history → nothing to detect.""" + from src.runtime.daemon import _find_active_paused_question + assert _find_active_paused_question([], bot_user_id="U_BOT") is None + + +def test_find_active_paused_question_returns_none_when_no_bot_messages(): + """Only operator messages in thread → nothing to detect.""" + from src.runtime.daemon import _find_active_paused_question + history = [ + {"ts": "100", "text": "hi sam", "user": "U_OPERATOR"}, + {"ts": "110", "text": "another message", "user": "U_OPERATOR"}, + ] + assert _find_active_paused_question(history, bot_user_id="U_BOT") is None + + +def test_find_active_paused_question_returns_none_when_bot_has_no_speech_balloon(): + """Bot posted but didn't pause — no speech_balloon reaction.""" + from src.runtime.daemon import _find_active_paused_question + history = [ + {"ts": "100", "text": "hi sam", "user": "U_OPERATOR"}, + { + "ts": "110", + "text": "got it", + "bot_id": "B_SAM", + "reactions": [ + # only operator-added reactions, not speech_balloon + {"name": "thumbsup", "users": ["U_OPERATOR"], "count": 1}, + ], + }, + ] + assert _find_active_paused_question(history, bot_user_id="U_BOT") is None + + +def test_find_active_paused_question_detects_bot_speech_balloon(): + """Bot posted a question + added speech_balloon → that's the active pause.""" + from src.runtime.daemon import _find_active_paused_question + history = [ + {"ts": "100", "text": "hi sam", "user": "U_OPERATOR"}, + { + "ts": "110", + "text": "which approach do you want?", + "bot_id": "B_SAM", + "reactions": [ + {"name": "speech_balloon", "users": ["U_BOT"], "count": 1}, + ], + }, + ] + found = _find_active_paused_question(history, bot_user_id="U_BOT") + assert found is not None + assert found["ts"] == "110" + + +def test_find_active_paused_question_ignores_speech_balloon_from_other_user(): + """speech_balloon added by an operator (not the bot) doesn't count. + Only the bot's own reactions on its own messages signal a pause.""" + from src.runtime.daemon import _find_active_paused_question + history = [ + { + "ts": "110", + "text": "discussing something", + "bot_id": "B_SAM", + "reactions": [ + # speech_balloon from operator, not bot — coincidence, not pause + {"name": "speech_balloon", "users": ["U_OPERATOR"], "count": 1}, + ], + }, + ] + assert _find_active_paused_question(history, bot_user_id="U_BOT") is None + + +def test_find_active_paused_question_returns_most_recent_when_multiple(): + """If Sam paused, was answered, paused again — the most recent + :speech_balloon: is the active one (the older one's reaction would + have been cleared by the daemon after the prior continuation).""" + from src.runtime.daemon import _find_active_paused_question + history = [ + { + "ts": "100", + "text": "first question", + "bot_id": "B_SAM", + "reactions": [ + {"name": "speech_balloon", "users": ["U_BOT"], "count": 1}, + ], + }, + {"ts": "120", "text": "operator answer 1", "user": "U_OPERATOR"}, + { + "ts": "150", + "text": "second question", + "bot_id": "B_SAM", + "reactions": [ + {"name": "speech_balloon", "users": ["U_BOT"], "count": 1}, + ], + }, + ] + found = _find_active_paused_question(history, bot_user_id="U_BOT") + assert found is not None + assert found["ts"] == "150" + + +def test_find_active_paused_question_skips_messages_without_reactions_field(): + """Slack omits the `reactions` field on messages with no reactions. + Detection must not crash on its absence.""" + from src.runtime.daemon import _find_active_paused_question + history = [ + {"ts": "100", "text": "no reactions here", "bot_id": "B_SAM"}, + ] + assert _find_active_paused_question(history, bot_user_id="U_BOT") is None + + +# ─── _lookup_paused_session_id — sessions.jsonl correlation ────────────────── + + +def _write_ledger_lines(path, lines): + """Helper: write a list of dicts to a sessions.jsonl-shape file.""" + import json + path.write_text("\n".join(json.dumps(d) for d in lines) + "\n") + + +def test_lookup_paused_session_id_returns_none_when_ledger_missing(tmp_path, monkeypatch): + """If sessions.jsonl doesn't exist (fresh deploy, never written), the + lookup must return None, not crash.""" + from src.runtime import daemon as daemon_module + monkeypatch.setattr(daemon_module, "SESSIONS_PATH", tmp_path / "sessions.jsonl") + d = daemon_module.Daemon.__new__(daemon_module.Daemon) + assert d._lookup_paused_session_id(thread_ts="100", question_post_ts="120") is None + + +def test_lookup_paused_session_id_returns_matching_session(tmp_path, monkeypatch): + """The lookup correctly correlates thread_ts + ask_operator_called=True + + most-recent-before-question.""" + from src.runtime import daemon as daemon_module + ledger_path = tmp_path / "sessions.jsonl" + monkeypatch.setattr(daemon_module, "SESSIONS_PATH", ledger_path) + _write_ledger_lines(ledger_path, [ + # Sam ran a normal session in this thread — no ask_operator + {"session_id": "first", "thread_ts": "100", "ts_start": 105.0, + "ask_operator_called": False}, + # Sam ran ANOTHER session in this thread + paused + {"session_id": "second", "thread_ts": "100", "ts_start": 115.0, + "ask_operator_called": True}, + # Unrelated session in a different thread + {"session_id": "elsewhere", "thread_ts": "999", "ts_start": 117.0, + "ask_operator_called": True}, + ]) + d = daemon_module.Daemon.__new__(daemon_module.Daemon) + result = d._lookup_paused_session_id(thread_ts="100", question_post_ts="120.0") + assert result == "second" + + +def test_lookup_paused_session_id_skips_sessions_started_after_post(tmp_path, monkeypatch): + """A session that started AFTER the question post can't have authored + the post. The lookup must reject such entries.""" + from src.runtime import daemon as daemon_module + ledger_path = tmp_path / "sessions.jsonl" + monkeypatch.setattr(daemon_module, "SESSIONS_PATH", ledger_path) + _write_ledger_lines(ledger_path, [ + # Session started AFTER the question post → must be excluded + {"session_id": "future_session", "thread_ts": "100", "ts_start": 150.0, + "ask_operator_called": True}, + ]) + d = daemon_module.Daemon.__new__(daemon_module.Daemon) + result = d._lookup_paused_session_id(thread_ts="100", question_post_ts="120.0") + assert result is None + + +def test_lookup_paused_session_id_picks_most_recent_match(tmp_path, monkeypatch): + """When Sam paused multiple times in the same thread (older one + answered + cleared; newer one still pending), the lookup must + correlate with the question_post_ts — picking the most-recent + matching entry strictly before the post.""" + from src.runtime import daemon as daemon_module + ledger_path = tmp_path / "sessions.jsonl" + monkeypatch.setattr(daemon_module, "SESSIONS_PATH", ledger_path) + _write_ledger_lines(ledger_path, [ + {"session_id": "older_pause", "thread_ts": "100", "ts_start": 105.0, + "ask_operator_called": True}, + {"session_id": "newer_pause", "thread_ts": "100", "ts_start": 115.0, + "ask_operator_called": True}, + ]) + d = daemon_module.Daemon.__new__(daemon_module.Daemon) + # Question posted at 120 — newer_pause (115) is the right answer. + result = d._lookup_paused_session_id(thread_ts="100", question_post_ts="120.0") + assert result == "newer_pause" + + +# ─── Ledger field plumbing ──────────────────────────────────────────────────── + + +def test_session_ledger_entry_includes_ask_operator_called(): + """SessionLedgerEntry's new field must be settable + serialise. + The daemon's lookup depends on this field being readable from + sessions.jsonl.""" + from src.runtime.ledger import SessionLedgerEntry + entry = SessionLedgerEntry( + session_id="abc", + ts_start=100.0, ts_end=101.0, duration_s=1.0, + model="gemini-3.5-flash", + agent_graph="main:gemini-3.5-flash|...", + prompt_tokens=0, output_tokens=0, cache_hit_tokens=0, + exit_code=0, stuck=False, timed_out=False, + channel="C", thread_ts="100", trigger="slack_mention", + operator_principal=True, + ask_operator_called=True, + ) + assert entry.ask_operator_called is True + + +def test_session_ledger_entry_defaults_ask_operator_called_to_false(): + """Backwards compat: existing call sites that don't pass the new + field must still construct cleanly with the default of False.""" + from src.runtime.ledger import SessionLedgerEntry + entry = SessionLedgerEntry( + session_id="abc", + ts_start=100.0, ts_end=101.0, duration_s=1.0, + model="gemini-3.5-flash", + agent_graph="main:gemini-3.5-flash|...", + prompt_tokens=0, output_tokens=0, cache_hit_tokens=0, + exit_code=0, stuck=False, timed_out=False, + channel="C", thread_ts="100", trigger="slack_mention", + operator_principal=True, + ) + assert entry.ask_operator_called is False diff --git a/tests/runtime/test_silent_exit.py b/tests/runtime/test_silent_exit.py index ef4f1b9..292e34a 100644 --- a/tests/runtime/test_silent_exit.py +++ b/tests/runtime/test_silent_exit.py @@ -232,13 +232,15 @@ def test_journal_only_session_does_NOT_close_loop(): assert _closed_loop(records) is False -def test_classify_tool_use_returns_5_values(): - """Defends the tuple shape callers depend on.""" +def test_classify_tool_use_returns_6_values(): + """Defends the tuple shape callers depend on. Added ask_operator_called + in the async-question pattern PR — callers that unpack must use 6 names.""" result = SamSession._classify_tool_use([]) - assert len(result) == 5 - worker_used, web_used, bash_used, edited_files, closed_loop = result + assert len(result) == 6 + worker_used, web_used, bash_used, edited_files, closed_loop, ask_operator_called = result assert worker_used is False assert closed_loop is False + assert ask_operator_called is False # ─── SessionResult.silent_exit() — gate-condition logic ──────────────────────