Problem
Currently ChannelManager creates a new asyncio.Task for every inbound message without checking whether the same session already has a turn in progress. This leads to:
- Race conditions on tape read/write when multiple turns run concurrently for the same session
- Wasted work when a user sends a correction while a previous turn is still running
- Conflicts with stateful external harnesses (e.g. Codex threads cannot be safely accessed concurrently)
Related: #165 (BufferedMessageHandler breaks per-session ordering)
Proposal
Add an admit_message hookspec that lets plugins declare per-session admission policy. The framework exposes primitives, plugins express intent, and core executes the runtime effects.
class AdmitAction(Enum):
PROCESS = "process" # start immediately (backward-compatible default)
WAIT = "wait" # queue until current turn finishes
DROP = "drop" # discard message
INJECT = "inject" # append to running turn's steering buffer
@dataclass(frozen=True)
class TurnSnapshot:
session_id: str
is_running: bool
running_count: int
pending_count: int
@dataclass(frozen=True)
class AdmitDecision:
action: AdmitAction
reason: str | None = None
fallback: AdmitAction | None = None # used when INJECT not supported
@hookspec(firstresult=True)
def admit_message(message: Envelope, session_id: str, turn: TurnSnapshot) -> AdmitDecision | None:
"""Decide how an inbound message enters the session runtime."""
Core Execution Semantics
Introduce a per-session SessionTurnController that maintains:
current_task: asyncio.Task | None
pending_queue: deque[Envelope] — for WAIT
steering_buffer: deque[Envelope] — for INJECT
When no plugin responds (returns None), default to PROCESS for full backward compatibility.
Use Cases
- Serial execution (WAIT): Plugins managing stateful backends (Codex, external agents) return WAIT to prevent concurrent access
- Mid-turn correction (INJECT): User sends a correction while agent is running; message is injected into the running turn's context at the next safe checkpoint
- Rate limiting / spam (DROP): Discard messages that exceed policy
Reference Implementation
I've implemented a minimal version in my fork:
The bub-codex plugin hook is just 5 lines:
@hookimpl
def admit_message(session_id: str, message: Envelope, turn: TurnSnapshot) -> AdmitDecision | None:
if turn.is_running:
return AdmitDecision(action=AdmitAction.WAIT, reason="codex turn in progress")
return None
Suggested Implementation Order
- Introduce
SessionTurnController (upgrade _ongoing_tasks dict)
- Add
admit_message hookspec + types
- Implement PROCESS / WAIT / DROP
- Add INJECT with steering buffer + consumer API
- Reserve CANCEL_AND_PROCESS for future
Open Questions
- Hook timeout / exception handling: degrade to PROCESS?
- Pending queue bounds (count + bytes)?
- Drain strategy when turn finishes and multiple messages are pending?
- Should INJECT-eligible messages bypass
BufferedMessageHandler debounce?
Happy to contribute a PR if the direction looks good.
Problem
Currently
ChannelManagercreates a newasyncio.Taskfor every inbound message without checking whether the same session already has a turn in progress. This leads to:Related: #165 (BufferedMessageHandler breaks per-session ordering)
Proposal
Add an
admit_messagehookspec that lets plugins declare per-session admission policy. The framework exposes primitives, plugins express intent, and core executes the runtime effects.Core Execution Semantics
Introduce a per-session
SessionTurnControllerthat maintains:current_task: asyncio.Task | Nonepending_queue: deque[Envelope]— for WAITsteering_buffer: deque[Envelope]— for INJECTWhen no plugin responds (returns
None), default toPROCESSfor full backward compatibility.Use Cases
Reference Implementation
I've implemented a minimal version in my fork:
The bub-codex plugin hook is just 5 lines:
Suggested Implementation Order
SessionTurnController(upgrade_ongoing_tasksdict)admit_messagehookspec + typesOpen Questions
BufferedMessageHandlerdebounce?Happy to contribute a PR if the direction looks good.