feat(orchestrator): persist agent stream events in WebSocket handler#1172
feat(orchestrator): persist agent stream events in WebSocket handler#1172geoffjay merged 2 commits intoissue-1159from
Conversation
ShawnSunClio
left a comment
There was a problem hiding this comment.
Review — feat(orchestrator): persist agent stream events in WebSocket handler
Stack note: This PR bases on issue-1159 (PR #1167, still open) — sibling of #1168 and #1170. All three need #1167 to merge first. Diff is conflict-free relative to the parent.
Blocking issues (2)
1. Race condition at reconnect: events get session_number=0 during async lookup window
register() sets the in-memory session counter to 0 immediately, then spawns an async task to read the real value from storage:
// Sets to 0 right now
self.session_numbers.write().await.entry(agent_id).or_insert(0);
// Correct value arrives asynchronously — could be seconds later under load
tokio::spawn(async move {
match storage.get_usage_stats(&agent_id).await {
Ok(stats) => {
session_numbers.write().await.insert(agent_id, stats.session_count as i64);
}
...
}
});Any assistant, result, or user-message event that arrives before the async task completes will be tagged with session_number: 0, regardless of the agent's actual session history. For a reconnecting agent (restart, network blip, crash-recovery) that has already had two or three sessions, every event during this window lands in session 0 — permanently misattributed with no way to correct it after the fact.
The window is typically short, but during Sidekiq load or SQLite contention it could be seconds. Reconnection is the exact scenario where correct session attribution matters most (distinguishing the new run from prior ones).
Options:
- Read the session number synchronously before completing registration (accept the small latency hit at connect time)
- Defer event persistence until the session counter is confirmed (buffer events in a
VecDeque, flush when the lookup resolves) - Query the max
session_numberfromconversation_eventsdirectly in the async task (no round-trip throughagent_sessions) as a faster path
2. No tests for any of the new persistence logic
The PR adds:
persist_event()— fire-and-forget spawn, error loggingget_session_number()— async RwLock readpersist_context_cleared()— counter write + persist + stream broadcastregister()session init + async lookup raceunregister()session cleanupsend_user_message()two new persist callshandle_incoming_message()— six newpersist_eventcall sites across three match arms
None of these paths have tests. The existing websocket test suite (mod tests at line 1227) covers connection lifecycle and message routing but has no coverage for the persistence layer.
At minimum the following should be tested before merge:
register()initialises session counter to 0 when storage is absentunregister()removes the session counter entrypersist_context_cleared()updates the counter and callspersist_eventsend_user_message()callspersist_eventtwice (prompt_sent + activity_changed)- An event persisted in
handle_incoming_messagefor each of the threeassistant/resultarms
These are all testable with an in-memory SQLite storage instance (the same pattern already used in storage.rs tests via create_test_storage()).
Non-blocking observations
3. ActivityChanged events double the per-agent event volume
Every interaction cycle writes 2 ActivityChanged events (busy on send_user_message, idle on result) on top of the content events. For an active agent producing 20 assistant messages per hour, ActivityChanged pairs alone add ~40 rows/hour — roughly 40% of event volume for a typical run. Given the 50 k default cap from PR #1168, this reduces effective conversation history retention. Consider whether ActivityChanged events need to be persisted at all (vs. just broadcast on the stream), or document the trade-off explicitly.
4. Session semantics at initial connect rely on agent_sessions row count
register() initialises the counter to stats.session_count, assuming that the count of session rows equals the current session number. This holds as long as the first session is numbered 0 and clear_context() is the only thing that creates new session rows. If that invariant ever changes (e.g., seeding, import, a new workflow phase that rotates sessions), the session tagging silently diverges. A more robust approach would be to read MAX(session_number) directly from conversation_events for the agent, which is always self-consistent.
5. Minor: (*storage).clone() in main.rs
AgentStorage derives Clone and wraps a DatabaseConnection (SeaORM's Arc-backed pool). The clone shares the underlying connection pool, which is correct. The (*storage) deref is needed because storage is Arc<AgentStorage> — this is fine, just worth a comment that it's intentionally cloning the pool handle, not duplicating the DB connection.
geoffjay
left a comment
There was a problem hiding this comment.
Review: feat(orchestrator): persist agent stream events in WebSocket handler
Stack note: This PR is stacked on issue-1159 (PR #1167, needs-rework). It cannot merge until #1167 is fixed. The code review below is independent of that dependency.
🔴 Fix required — session number initialization is off-by-one
websocket.rs, register():
match storage.get_usage_stats(&agent_id).await {
Ok(stats) => {
session_numbers.write().await.insert(agent_id, stats.session_count as i64);
}AgentUsageStats::session_count is documented as "Total number of sessions (including the current one, if any)" — confirmed by the storage tests:
// After start_new_session:
assert_eq!(stats.session_count, 2); // session 0 ended + session 1 active = 2
// After recording usage (no explicit end):
assert_eq!(stats.session_count, 1); // session 0 active = 1The invariant holds: session_count = session_number + 1 whenever a current usage session is active. Any agent that has processed at least one message will have an active session, so on reconnect:
| Context clears | Correct session_number |
stats.session_count |
Used value |
|---|---|---|---|
| 0 (but has usage) | 0 | 1 | 1 ← wrong |
| 1 | 1 | 2 | 2 ← wrong |
| N | N | N+1 | N+1 ← wrong |
The only case that accidentally works is a brand-new agent with no recorded usage at all (session_count = 0 = session_number).
Consequence: On reconnect, events in the current session get tagged as session_number = N+1 instead of N. The REST API (#1161) and UI (#1162) would show them as belonging to a non-existent session.
Fix — add a storage method that queries conversation_events directly:
// storage.rs — new method on AgentStorage
pub async fn get_max_conversation_session_number(&self, agent_id: Uuid) -> Result<i64> {
use sea_orm::{ConnectionTrait, Statement, DbBackend};
let row = self.db.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite,
"SELECT COALESCE(MAX(session_number), 0) FROM conversation_events WHERE agent_id = ?",
[agent_id.to_string().into()],
)).await?;
Ok(row.and_then(|r| r.try_get_by_index::<i64>(0).ok()).unwrap_or(0))
}Then in register():
tokio::spawn(async move {
match storage.get_max_conversation_session_number(agent_id).await {
Ok(session) => {
session_numbers.write().await.insert(agent_id, session);
}
Err(e) => {
warn!(%agent_id, error = %e,
"Failed to initialize session number from storage; defaulting to 0");
}
}
});This is always correct because it reads directly from the same table where session numbers are written — no dependency on usage session semantics.
Alternatively (simpler, no new method), adjust the current approach:
// session_count includes the current active session, so subtract 1 when active
let session_number = if stats.current_session.is_some() {
(stats.session_count as i64).saturating_sub(1)
} else {
stats.session_count as i64
};
session_numbers.write().await.insert(agent_id, session_number);The get_max_conversation_session_number approach is preferred — it doesn't rely on the relationship between usage sessions and conversation sessions staying in sync.
✅ What's correct — solid implementation throughout
Fire-and-forget pattern (persist_event): spawns a task, logs warn on error, no-op when storage is None — exactly right. ✅
Session counter lifecycle:
new(): initialised to emptyHashMap✅register():or_insert(0)provides an immediate safe default ✅unregister():remove(agent_id)prevents unbounded growth ✅persist_context_cleared():insert(agent_id, new_session_number)updates the counter before persisting the event ✅
handle_incoming_message() — session read strategy: The single let session = registry.get_session_number(agent_id).await at the top of the NDJSON loop means all events from one message batch share one session number and one lock acquisition. ✅
Event coverage vs acceptance criteria in #1160:
output(per non-empty line) ✅tool_use(with fulltool_useJSON as metadata) ✅thinking✅result(with usage metadata includingis_error) ✅prompt_sent+activity_changed(busy)fromsend_user_message✅context_clearedviapersist_context_cleared✅activity_changed(idle)from result arm ✅
Fixed session_number: 0 hardcode in the agent:usage_update stream event (flagged in pre-implementation analysis) ✅
main.rs: (*storage).clone() correctly dereferences the Arc<AgentStorage> before cloning the inner value (which is Clone via DatabaseConnection). ✅
manager.rs: persist_context_cleared called after storage.end_session() completes and before the API returns — no race possible since the user cannot send a message while clear_context is in-flight. ✅
New agent:context_cleared stream event in persist_context_cleared: additive change, existing UI clients ignore unknown event types safely. ✅
|
This change is part of the following stack: Change managed by git-spice. |
Address two blocking issues from PR review: 1. Session number off-by-one on reconnect - replace get_usage_stats().session_count (which is N+1 when active) with a new get_max_conversation_session_number() storage method that queries MAX(session_number) directly from conversation_events. This is always correct regardless of usage-session semantics. 2. Add persistence tests - 9 new tests covering: session counter init without storage, session counter removed on unregister, storage-backed session init reads MAX(session_number), persist_context_cleared updates counter and writes event, send_user_message persists prompt_sent + activity_changed, handle_incoming_message persists output/tool_use/thinking/result events. 3. Fix race condition in register(): async storage init now uses max(storage, current) to avoid overwriting a newer value set by persist_context_cleared between register() and the spawned lookup completing. 4. Add #[allow(dead_code)] to storage query helpers and ConversationQuery struct that are used only in test code (clippy -D warnings). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Wires
AgentStorageintoConnectionRegistryso that every agent event flowing through the WebSocket handler is persisted to SQLite as aconversation_eventrow, without blocking the WebSocket read loop.What changed
websocket.rsstorage: Option<AgentStorage>andsession_numbers: Arc<RwLock<HashMap<Uuid, i64>>>toConnectionRegistrywith_storage(storage)builder method to attach storage at startuppersist_event()— fire-and-forget helper; spawns a task, logs warn on error, never panicsget_session_number()— reads the in-memory per-agent session counterpersist_context_cleared()— updates the session counter, persists the event, and broadcastsagent:context_clearedon the streamregister()— initialises session counter to 0 then fires an async lookup to sync from storageunregister()— cleans up the session counter entrysend_user_message()— persistsprompt_sent+activity_changed(busy)handle_incoming_message()— persistsoutput(per line),tool_use,thinking,resultwith usage metadata,activity_changed(idle); also fixes the hardcodedsession_number: 0in theusage_updatestream eventmanager.rsclear_context()callsregistry.persist_context_cleared()after advancing the storage sessionmain.rs.with_storage((*storage).clone())Event types persisted
assistantoutput(per line),tool_use,thinkingresultresult(with usage metadata),activity_changed(idle)send_user_messageprompt_sent,activity_changed(busy)clear_contextcontext_clearedsystem,control_request,keep_aliveCloses #1160