Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ venv/
.idea/
.DS_Store

.ruff_cache/
.ruff_cache/
# Blog/writing scratch produced by mining session jsonls — not committed
mining/
23 changes: 23 additions & 0 deletions src/capabilities/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ The ACK is a commitment to a direction, not a contract. If work surfaces somethi

Why this rule exists: the gap between a `:hourglass:` reaction and Sam's final reply is anywhere from 30 seconds to 6 minutes. In that gap, the operator can't tell whether Sam understood the ask, is on the right track, or is about to ship the wrong thing. The ACK closes that gap.

## Pausing for operator input mid-work — `ask_operator`

Sam runs as a single-event-at-a-time async daemon. There is no synchronous "stop and wait for input" primitive — the LLM cannot pause an in-flight session and resume it. But Sam CAN break out async-style: post the question, exit cleanly, and let the operator's reply (whenever it arrives) trigger a continuation session.

The mechanism is the `ask_operator(question: str)` tool, available only to the main agent. When called, the tool atomically posts the question to the originating thread + adds a `:speech_balloon:` reaction on the question post. The session then exits. **Slack reactions are the source of truth for the paused state** — no in-memory daemon map, no parallel state store. The `:speech_balloon:` on a bot post means "this question is awaiting input." Operator-visible at a glance.

When the operator replies in the thread, the daemon: (1) scans `thread_history` (already fetched on every inbound) for the most recent bot message carrying `:speech_balloon:` from this bot, (2) looks up the session_id of that pause from the per-session ledger (`sessions.jsonl` filtered by `thread_ts` + `ask_operator_called=True` + most-recent-before-the-question-post), (3) injects `paused_session_id` into the new IncomingMessage, (4) clears the `:speech_balloon:` (marks resolved). Sam's continuation prompt then instructs it to: read the prior audit-log slice to see what was already done, apply the operator's answer, continue from where the previous session stopped.

Daemon restart is not a special case. The reaction lives on Slack, the ledger lives on GCS, both persist. Next inbound triggers detection naturally — no in-memory rebuild needed.

**Use `ask_operator` ONLY for genuine unknown unknowns** — something Sam cannot resolve from:
- the current Slack thread
- the journal (`/data/journal/*.md`)
- the audit log (`/data/tool_calls/<today>.jsonl`)
- Sam's own source (`src/identity.md`, capabilities, skills)
- the operator's earlier messages in this same thread

When the operator already gave the answer earlier in the thread, re-reading is the right move — `ask_operator` for something already-answered burns operator attention and is a worse failure than just paying attention. When Sam is 70% sure, the right move is the existing pattern: make the conservative call, disclose it in the final reply (*"Hit a fork between A and B — picked A because constraint K; flag if wrong"*). `ask_operator` is for the cases where conservative-call-and-disclose is genuinely unsafe.

**Workers, pro_executor, and the mentor do NOT have `ask_operator`.** Narrow-scoped subagents must return their finding to the main agent, never escalate to the operator directly. The main agent decides whether to pause.

**Abandoned questions are reviewed by Sam itself during `daily-maintenance`.** The daily-maintenance skill's "abandoned questions" section enumerates `:speech_balloon:` reactions from this bot via `reactions.list`, filters to those older than 24h, and Sam decides per-question whether to: post a contextual reminder (keeps `:speech_balloon:`, resets the clock), mark abandoned (clears `:speech_balloon:`, posts a one-line close), or escalate. The daemon does NOT autonomously do this — daemon detects mechanically, Sam holds the judgment. No separate cron, no new state store; the existing daily routine handles it.

## Live UX (streaming, plan, feedback, references)

These are *patterns* with judgment calls, not always-on defaults. The detailed when/when-not rules are in `src/skills/slack-dynamic-messaging.md` — read it before deciding whether to stream a reply, attach feedback buttons, set a plan block, or cite sources. The shape of the decision is "would a human reader be glad I turned this on?", not "is this feature available?"
Expand Down
135 changes: 135 additions & 0 deletions src/runtime/adk_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,126 @@ async def consult_opus(
return consult_opus


def _make_ask_operator_tool(
*,
channel: Optional[str],
thread_ts: Optional[str],
event_ts: Optional[str],
):
"""Return a coroutine that posts a question to the originating Slack
thread, adds a `:speech_balloon:` reaction on the question post
atomically (race-mitigation), and signals the LLM to end the session.

The async-question pattern: Sam, mid-work, hits a genuine unknown
unknown — something it cannot resolve from existing context. Sam
calls `ask_operator(question=...)`. The tool posts the question,
reacts on its own post with `:speech_balloon:` so the operator can
glance and see "Sam paused, awaiting input", then returns a string
telling the LLM to exit. The session ends cleanly; the daemon's
in-memory `paused_threads` map records the pause; when the operator
replies in the thread, the daemon detects the continuation and
spawns a session with the previous session_id injected so Sam can
reconstruct prior state from the audit log.

Only the MAIN agent gets this tool. Workers, pro_executor, and the
mentor cannot pause — they have narrow scopes and must return to
their caller, never escalate to the operator directly.

Channel/thread_ts come from the IncomingMessage and are bound here
at closure-time, so the LLM doesn't need to pass them. If they're
missing (scheduled/retry/synthetic sessions), the tool errors —
`ask_operator` is meaningless without an originating thread.
"""
async def ask_operator(question: str) -> str:
"""Pause this session mid-work and await operator input via Slack.

Use ONLY for genuine unknown unknowns — something you cannot
resolve from the current Slack thread, the journal, the audit
log, or your own source files. NOT for "I'm 70% sure" (make the
conservative call + disclose). NOT for clarifications the
operator already gave (re-read the thread). NOT for things the
operator might know — verify the gap first.

The pattern is async: this tool posts your question, the
session exits cleanly, and the operator's reply (whenever it
comes) will trigger a new continuation session with full
audit-log context so you can pick up where you left off.

question: the question to post. Write it consequence-first —
"Should I do A or B?" beats "I'm thinking about A and B
and not sure". The operator should be able to reply in
one sentence.

Returns a sentinel string telling the LLM to end the session.
"""
import aiohttp
from .config import SLACK_BOT_TOKEN

if not channel:
return "(ask_operator unavailable: no originating channel — this session has no Slack thread to pause on. Make the call yourself and disclose in your final reply.)"
if not SLACK_BOT_TOKEN:
return "(ask_operator unavailable: SLACK_BOT_TOKEN not set in env. Make the call yourself and disclose in your final reply.)"

target_thread_ts = thread_ts or event_ts
post_body = {
"channel": channel,
"text": question,
}
if target_thread_ts:
post_body["thread_ts"] = target_thread_ts

headers = {
"Authorization": f"Bearer {SLACK_BOT_TOKEN}",
"Content-Type": "application/json; charset=utf-8",
}

try:
async with aiohttp.ClientSession() as http:
# 1. Post the question.
async with http.post(
"https://slack.com/api/chat.postMessage",
headers=headers,
json=post_body,
) as resp:
post_json = await resp.json()
if not post_json.get("ok"):
err = post_json.get("error", "unknown")
return f"(ask_operator failed at chat.postMessage: {err}. Make the call yourself and disclose in your final reply.)"

posted_ts = post_json.get("ts")
# 2. Atomically add :speech_balloon: to the question post.
# If this fails, the question is still posted — the
# operator will see and reply, the daemon's continuation
# detection still works via the paused_threads map. So
# we don't fail the whole tool on reaction failure.
if posted_ts:
try:
async with http.post(
"https://slack.com/api/reactions.add",
headers=headers,
json={
"channel": channel,
"timestamp": posted_ts,
"name": "speech_balloon",
},
) as react_resp:
await react_resp.json()
except Exception:
log.exception("ask_operator: reaction.add failed (non-fatal)")
except Exception as exc:
log.exception("ask_operator HTTP call failed")
return f"(ask_operator HTTP error: {type(exc).__name__}: {exc}. Make the call yourself and disclose in your final reply.)"

return (
"Question posted to the Slack thread; :speech_balloon: added "
"to the post. **End this session now** — do not call any "
"more tools. The operator's reply will trigger a continuation "
"session that picks up via the audit log."
)

return ask_operator


# ─── ADK runner ───────────────────────────────────────────────────────────────


Expand Down Expand Up @@ -837,6 +957,20 @@ async def run(self, request: AgentRunRequest) -> AgentRunResult:
tools=worker_tools,
)

# `ask_operator` — pause the session mid-work and post a question
# to the originating Slack thread, then exit cleanly. The
# operator's reply triggers a continuation session via the
# daemon's `paused_threads` map. ONLY the main agent gets this
# tool — workers / pro_executor / mentor must not escalate to
# the operator directly; they return to their caller. Tool
# closes over Slack origin coords so the LLM signature is just
# `ask_operator(question: str)`.
ask_operator_tool = _make_ask_operator_tool(
channel=request.slack_channel,
thread_ts=request.slack_thread_ts,
event_ts=request.slack_event_ts,
)

# `consult_opus` — dispatch the mentor (Opus, read-only) to review
# accumulated context. Sam DOES NOT decide to invoke autonomously —
# only the `daily-maintenance` skill (review step) and the
Expand All @@ -862,6 +996,7 @@ async def run(self, request: AgentRunRequest) -> AgentRunResult:
FunctionTool(func=parallel_workers),
AgentTool(agent=pro_executor_agent),
FunctionTool(func=consult_opus_tool),
FunctionTool(func=ask_operator_tool),
],
)

Expand Down
Loading
Loading