Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions nerve/agent/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,11 @@ async def run(
self.sessions.mark_running(session_id)
if channel is not None:
self._active_channel[session_id] = channel
# Mark the turn as in flight so the finally below can
# detect "ended without sending done/stopped/error" and
# ship a synthetic done. Clearing happens automatically
# when a terminal event is broadcast.
broadcaster.mark_turn_open(session_id)
# Notify all connected clients that this session started running
await broadcaster.broadcast("__global__", {
"type": "session_running",
Expand All @@ -1442,6 +1447,28 @@ async def run(
finally:
self.sessions.mark_not_running(session_id)
self._active_channel.pop(session_id, None)
# Backstop: if _run_inner exited without broadcasting
# done/stopped/error (post-stream DB exception, hung
# CLI cancelled by an outer mechanism, etc.), the
# frontend never learned the turn ended and is still
# showing "thinking..." even though the server has
# cleared is_running. Ship a synthetic done so the
# streaming UI exits cleanly.
if broadcaster.is_turn_open(session_id):
logger.warning(
"Session %s ended without a terminal event "
"(done/stopped/error); sending synthetic done "
"so the frontend exits streaming state",
session_id,
)
try:
await broadcaster.broadcast_done(session_id)
except Exception as e:
logger.warning(
"Synthetic done broadcast failed for %s: %s",
session_id, e,
)
broadcaster.clear_turn_open(session_id)
broadcaster.stop_buffering(session_id)
# Notify all connected clients that this session stopped
await broadcaster.broadcast("__global__", {
Expand Down
32 changes: 32 additions & 0 deletions nerve/agent/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ def __init__(self, max_buffer_size: int = MAX_BUFFER_SIZE):
# Per-session event buffer for reconnect replay
self._session_buffers: dict[str, list[dict[str, Any]]] = {}
self._max_buffer_size = max_buffer_size
# Per-session "turn is in flight" flag. Set by mark_turn_open()
# at the start of a run, cleared automatically when broadcast()
# ships a terminal event ("done", "stopped", "error"). Used by
# the engine's run() finally as a backstop: if the flag is still
# set after _run_inner exits, no terminal event was sent (post-
# stream exception, hung CLI cancelled externally, etc.) and we
# need to ship a synthetic "done" so the frontend exits its
# streaming UI state. Without this, the chat detail stays on
# "thinking..." forever even though the server has cleared
# is_running and the session card has dropped out of the
# "Running" sidebar group.
self._open_turns: set[str] = set()

async def register(self, session_id: str, callback_id: str, callback: BroadcastCallback) -> None:
"""Register a broadcast listener for a session."""
Expand All @@ -72,6 +84,11 @@ async def unregister(self, session_id: str, callback_id: str) -> None:

async def broadcast(self, session_id: str, message: dict[str, Any]) -> None:
"""Send a message to all listeners of a session. Also buffers if active."""
# Terminal events close the open-turn flag so the engine's
# backstop in run() knows no synthetic "done" is needed.
if message.get("type") in ("done", "stopped", "error"):
self._open_turns.discard(session_id)

# Buffer events during active streaming
if session_id in self._session_buffers:
buf = self._session_buffers[session_id]
Expand All @@ -89,6 +106,21 @@ async def broadcast(self, session_id: str, message: dict[str, Any]) -> None:
except Exception as e:
logger.warning("Broadcast to %s failed: %s", callback_id, e)

# --- Turn tracking (engine backstop) ---

def mark_turn_open(self, session_id: str) -> None:
"""Mark a turn as in flight. Cleared when a terminal event is
broadcast (done/stopped/error) or via clear_turn_open()."""
self._open_turns.add(session_id)

def is_turn_open(self, session_id: str) -> bool:
"""Whether a turn is in flight (no terminal event sent yet)."""
return session_id in self._open_turns

def clear_turn_open(self, session_id: str) -> None:
"""Force-clear the open-turn flag without broadcasting."""
self._open_turns.discard(session_id)

# --- Buffering for reconnect replay ---

def start_buffering(self, session_id: str) -> None:
Expand Down
66 changes: 66 additions & 0 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,72 @@ async def test_no_buffering_when_not_started(self):
assert bc.get_buffer("s1") == []


@pytest.mark.asyncio
class TestOpenTurnTracking:
"""The engine's run() finally relies on open-turn tracking to ship a
synthetic ``done`` when ``_run_inner`` exits without sending a
terminal event. Without it, the chat detail stays on
"thinking..." forever even though the server has cleared
is_running and dropped the session out of the sidebar's "Running"
group.
"""

async def test_mark_open_and_terminal_clears(self):
bc = StreamBroadcaster()
assert not bc.is_turn_open("s1")

bc.mark_turn_open("s1")
assert bc.is_turn_open("s1")

# Each terminal type should clear the flag
for terminal in ("done", "stopped", "error"):
bc.mark_turn_open("s1")
await bc.broadcast("s1", {"type": terminal})
assert not bc.is_turn_open("s1"), (
f"{terminal!r} should clear the open-turn flag"
)

async def test_non_terminal_does_not_clear(self):
bc = StreamBroadcaster()
bc.mark_turn_open("s1")

# Streaming events must not close the turn
for nonterminal in (
{"type": "token", "content": "hi"},
{"type": "thinking", "content": "..."},
{"type": "tool_use", "tool": "Bash"},
{"type": "tool_result", "tool_use_id": "x"},
):
await bc.broadcast("s1", nonterminal)
assert bc.is_turn_open("s1"), (
f"{nonterminal['type']} should not clear the open-turn flag"
)

async def test_clear_turn_open_explicit(self):
bc = StreamBroadcaster()
bc.mark_turn_open("s1")
assert bc.is_turn_open("s1")
bc.clear_turn_open("s1")
assert not bc.is_turn_open("s1")

async def test_open_turns_isolated_per_session(self):
bc = StreamBroadcaster()
bc.mark_turn_open("s1")
bc.mark_turn_open("s2")
await bc.broadcast("s1", {"type": "done"})
assert not bc.is_turn_open("s1")
assert bc.is_turn_open("s2")

async def test_broadcast_done_helper_clears_open_turn(self):
# The engine backstop calls broadcaster.broadcast_done() to ship
# the synthetic terminal event. Verify the helper closes the
# turn so a second call wouldn't double-broadcast.
bc = StreamBroadcaster()
bc.mark_turn_open("s1")
await bc.broadcast_done("s1", usage={"input_tokens": 1})
assert not bc.is_turn_open("s1")


@pytest.mark.asyncio
class TestBoundedBuffers:
"""Test that buffers are bounded to max_buffer_size."""
Expand Down
26 changes: 26 additions & 0 deletions web/src/stores/handlers/sessionHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,32 @@ export function handleSessionRunning(
updates.streamingBlocks = [];
updates.agentStatus = { state: 'thinking' };
}
// Defensive: server says the run ended but the frontend is still in
// streaming mode. This happens when done/stopped/error never made
// it to the client (lost WS message during reconnect, post-stream
// exception on the server before broadcast_done fired, etc.).
// Without this branch the chat detail stays on "thinking..." while
// the sidebar entry has already dropped out of the "Running" group,
// which looks like the chat is stuck between steps. The server's
// backstop in engine.run() ships a synthetic done in most of those
// cases; this is belt-and-suspenders for when even that signal is
// missed.
if (msg.session_id === s.activeSession && !msg.is_running && s.isStreaming) {
const finalBlocks = s.streamingBlocks.map(b =>
b.type === 'tool_call' && b.status === 'running'
? { ...b, status: 'complete' as const }
: b,
);
if (finalBlocks.length > 0) {
updates.messages = [
...s.messages,
{ role: 'assistant' as const, blocks: finalBlocks },
];
}
updates.streamingBlocks = [];
updates.isStreaming = false;
updates.agentStatus = { state: 'idle' };
}
return updates;
});
}
Expand Down