Skip to content

feat(terminal): wire CDC session-state streaming through /mux WebSocket#105

Closed
Pritom14 wants to merge 3 commits into
mainfrom
feat/cdc-mux-session-patch-streaming
Closed

feat(terminal): wire CDC session-state streaming through /mux WebSocket#105
Pritom14 wants to merge 3 commits into
mainfrom
feat/cdc-mux-session-patch-streaming

Conversation

@Pritom14
Copy link
Copy Markdown
Collaborator

@Pritom14 Pritom14 commented Jun 4, 2026

Closes #104

What changed and why

Before this PR, the /mux WebSocket session-state channel forwarded raw CDC notification fields (seq, projectId, sessionId, eventType) directly onto the wire. The TypeScript MuxProvider consumer expects a SessionPatch shape (id, status, activity, attentionLevel, lastActivityAt). The mismatch meant the dashboard never received live session-state updates -- it could only poll or wait for a page refresh.

This PR closes that gap end-to-end.


Flow

DB INSERT/UPDATE
(sessions / pr / pr_checks)
        |
   [DB trigger -- atomic with the write]
        |
    change_log
        |
  cdc.Poller (100ms tick)
        |
  cdc.Broadcaster.Publish(Event)
        |
        +---> terminal.Manager subscriber callback
                  |
           [non-blocking: push SessionID onto per-conn channel]
                  |             (broadcaster contract: must not block)
                  |
         writer goroutine (writeLoop)
                  |
        sessionSrc.Session()  <--------+
                  |                    |
          domain.DeriveStatus()        sqlite store
                  |                    |
        attentionLevel()      GetDisplayPRFactsForSession()
                  |
     serverMsg{ ch:"sessions", type:"snapshot",
                sessions:[{ id, status, activity,
                            attentionLevel, lastActivityAt }] }
                  |
        WebSocket frame to client
                  |
           MuxProvider.tsx

On subscribe, a full snapshot of all current sessions is queued through the same ordered channel before any per-session patches, so the client always starts with complete state.


Changes

internal/terminal/patches.go (new)

Defines SessionSource (the terminal package's view of session state -- no service-layer import), attentionLevel() (server-side port of getDetailedAttentionLevel from packages/web/src/lib/types.ts), and toSessionPatch / toSessionPatches helpers.

internal/terminal/protocol.go

Replaced the thin sessionUpdate struct with sessionPatch carrying the full wire shape the TS consumer expects. Updated serverMsg.Sessions accordingly.

internal/terminal/manager.go

Three changes, each fixing a separate problem:

1. Off-hot-path enrichment. Added sessionEvents chan sessionEvent to connState. The CDC subscriber callback now pushes only a SessionID (non-blocking); all DB reads happen in writeLoop, off the broadcaster's serialized Publish loop. This honors the documented contract: "fn is called synchronously from the poller loop, so it must not block."

2. No-gap subscribe/snapshot ordering. Subscribe is registered before the initial snapshot is queued. Both the snapshot (full read) and per-session patches (single-session read) flow through the same ordered sessionEvents channel and are resolved live at write time -- so the last frame the client sees for any session always reflects current state, regardless of interleaving.

3. Connection context for DB reads. The writer uses the connection-scoped ctx, so a disconnect cancels any in-flight query.

internal/domain/status.go

Moved DeriveStatus and prPipelineStatus from internal/service/session/status.go into the domain package. They use only domain types; keeping them in the service layer created coupling pressure. internal/service/session/status.go now delegates to the domain function.

internal/daemon/session_source.go (new)

daemonSessionSource implements terminal.SessionSource over the sqlite store. Per session: loads the record, fetches PR facts, calls domain.DeriveStatus. Wired into terminal.NewManager via WithSessionSource in daemon.go.

internal/httpd/terminal_mux.go

Exported TerminalMuxHandler (was terminalMuxHandler) so the integration test can mount it on httptest.Server without importing the full httpd stack.


Tests

Unit (internal/terminal/patches_test.go) -- table-driven coverage of all 16 attentionLevel branches (every status constant + activity fallbacks + priority-ordering cases), toSessionPatch field shape including UTC timestamp normalization, and toSessionPatches ordering.

Unit (internal/terminal/manager_test.go, protocol_test.go) -- updated wire-shape golden test, subscribe/snapshot/CDC-forward tests with in-memory fake conn.

Functional (internal/integration/mux_streaming_functional_test.go) -- real sqlite store, real CDC poller, real terminal.Manager, real WebSocket via httptest.Server + coder/websocket.Dial:

  • TestMuxStreamingSessionPatchDelivery: subscribe (empty snapshot) -> spawn -> poll CDC -> exact idle/idle/working patch on the wire
  • TestMuxStreamingPatchReflectsPRDerivedStatus: apply CI failure -> poll CDC -> ci_failed/review patch on the wire (per-event enrichment path with non-trivial status mapping)
  • TestMuxStreamingInitialSnapshotContainsExistingSessions: sessions in the store before connect appear in the first snapshot

All tests pass under -race. No new failures introduced.

Sessions changing state (spawn, PR update, kill) now push live sessionPatch
frames to every subscribed dashboard client over the /mux WebSocket, closing
the gap where the frontend MuxProvider received thin CDC notifications instead
of the full SessionPatch shape it expects.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 4, 2026

Greptile Summary

This PR closes the wire-shape mismatch between the Go /mux WebSocket and the TypeScript MuxProvider consumer by replacing the thin sessionUpdate CDC-field projection with a properly shaped sessionPatch carrying id, status, activity, attentionLevel, and lastActivityAt. It also introduces off-hot-path enrichment (DB reads happen in writeLoop, not inside the broadcaster callback), a no-gap subscribe/snapshot ordering via a coalescing signal channel, and promotes DeriveStatus into the domain package.

  • internal/terminal/manager.goconnState gains a sessionEvents chan struct{} coalescing channel; handleSubscribe subscribes to CDC before queuing the initial snapshot; writeLoop drains signals and calls AllSessions once per burst.
  • internal/terminal/patches.go (new) — SessionSource interface, server-side attentionLevel port of the TS getDetailedAttentionLevel, and toSessionPatch/toSessionPatches helpers.
  • internal/daemon/session_source.go (new) — daemonSessionSource wires the sqlite store + DeriveStatus into terminal.SessionSource; internal/domain/status.go receives the moved DeriveStatus and prPipelineStatus, with service/session/status.go delegating to it.

Confidence Score: 4/5

Safe to merge after addressing the phantom-subscription path in handleSubscribe; the rest of the change is well-structured and thoroughly tested.

When a Manager is constructed without WithSessionSource (the default), handleSubscribe still proceeds past its guard, creates a real CDC subscription, and latches the dedup flag — leaving any client that sends a sessions-subscribe frame permanently subscribed with no data and no ability to re-subscribe. All other paths (DeriveStatus refactor, signal coalescing, writeLoop enrichment) are correct and covered by both unit and functional tests under -race.

backend/internal/terminal/manager.go (handleSubscribe guard), backend/internal/daemon/session_source.go (previously flagged silent error discard in toSession)

Important Files Changed

Filename Overview
backend/internal/terminal/manager.go Core change: CDC callback now pushes struct{} signals to sessionEvents channel; writeLoop drains+coalesces and calls AllSessions; handleSubscribe guard changed from type-based to topic-based; a subtle path leaves clients permanently subscribed but data-less when sessionSrc is nil
backend/internal/terminal/patches.go New file: SessionSource interface, attentionLevel mapping, and toSessionPatch/toSessionPatches helpers; zero-time LastActivityAt serializes as "0001-01-01T00:00:00Z" (previously flagged)
backend/internal/daemon/session_source.go New file: daemonSessionSource implements SessionSource over sqlite; GetDisplayPRFactsForSession errors silently discarded (previously flagged)
backend/internal/domain/status.go DeriveStatus and prPipelineStatus moved from service/session into domain; logic identical to the original, clean refactor
backend/internal/service/session/status.go Now a thin delegation shim to domain.DeriveStatus; no behavioral change
backend/internal/integration/mux_streaming_functional_test.go New functional tests covering initial snapshot, CDC-triggered patch, and PR-derived status; testSessionSource duplicates daemonSessionSource logic verbatim (previously flagged)
backend/internal/terminal/protocol.go sessionUpdate replaced by sessionPatch with the correct SessionPatch wire shape; msgSubscribe constant removed, topicSessions added; Topics field added to clientMsg
backend/internal/httpd/terminal_mux.go terminalMuxHandler exported to TerminalMuxHandler for integration test access; purely mechanical rename

Sequence Diagram

sequenceDiagram
    participant DB as DB (sessions/pr/pr_checks)
    participant CL as change_log
    participant CP as cdc.Poller
    participant BC as cdc.Broadcaster
    participant HS as handleSubscribe
    participant SE as sessionEvents chan
    participant WL as writeLoop
    participant SS as SessionSource (daemonSessionSource)
    participant WS as WebSocket (client)

    Note over HS,SE: On subscribe (ch=subscribe, topics=[sessions])
    HS->>BC: Subscribe(callback)
    HS->>SE: pushSessionEvent() [initial snapshot signal]
    WL->>SE: receive signal
    WL->>SE: drainSignals() [coalesce burst]
    WL->>SS: AllSessions(ctx)
    SS-->>WL: []domain.Session
    WL->>WS: WriteJSON(sessions/snapshot)

    Note over DB,WS: On CDC event (DB write)
    DB->>CL: INSERT trigger
    CP->>CL: Poll()
    CP->>BC: "Publish(Event{SessionID!=empty})"
    BC->>HS: callback fires
    HS->>SE: pushSessionEvent() [non-blocking]
    WL->>SE: receive signal
    WL->>SE: drainSignals()
    WL->>SS: AllSessions(ctx)
    SS-->>WL: []domain.Session
    WL->>WS: WriteJSON(sessions/snapshot)
Loading

Reviews (3): Last reviewed commit: "fix(terminal): always send full session ..." | Re-trigger Greptile

Comment on lines +41 to +45
pr, ok, _ := s.store.GetDisplayPRFactsForSession(ctx, rec.ID)
if ok {
return domain.Session{SessionRecord: rec, Status: domain.DeriveStatus(rec, &pr)}
}
return domain.Session{SessionRecord: rec, Status: domain.DeriveStatus(rec, nil)}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Silent DB error swallows PR facts

GetDisplayPRFactsForSession errors are silently discarded (blank _). When the DB returns a transient error (e.g., a context deadline or busy reader), ok is false and DeriveStatus receives nil for pr, so a session that actually has a PR gets derived as StatusWorking or StatusIdle instead of the correct PR-derived status such as StatusCIFailed or StatusMergeable. There is no log entry, so the wrong status is invisible to operators until the next CDC event corrects it. At minimum, the error should be logged so transient DB failures surface in traces.

Comment on lines +194 to +200
// the session derive to ci_failed and asserts the live patch carries the derived
// status and its mapped attention level — exercising the per-event enrichment path
// with a non-trivial status mapping.
func TestMuxStreamingPatchReflectsPRDerivedStatus(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Test duplicates production toSession logic without coverage link

testSessionSource.toSession is a verbatim copy of daemonSessionSource.toSession — including the same silent discard of GetDisplayPRFactsForSession errors. If the production implementation changes (e.g., adds logging or falls back differently on error), this test continues to pass while diverging from the actual code path being tested. Consider extracting the shared logic into a package-level helper or accepting the daemon package import via an integration-test build tag so the test exercises the real implementation.

Comment thread backend/internal/terminal/manager.go Outdated
Comment on lines +394 to +400
case ev := <-c.sessionEvents:
// Resolve state here, off the broadcaster's hot path. Reading live at
// write time (not enqueue time) means the last frame for any session
// always reflects current state, whatever the queue interleaving.
if c.mgr.sessionSrc == nil {
continue
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unreachable nil guard in writeLoop

The if c.mgr.sessionSrc == nil { continue } branch at line 398 can never execute. Items are pushed to sessionEvents only from pushSessionEvent, which is called exclusively when sessionSrc != nil — both in the CDC callback (if c.mgr.sessionSrc == nil { return }) and in handleSubscribe (if c.mgr.sessionSrc != nil { ... }). The guard is dead code and mildly misleads readers into thinking the sessionSrc could become nil between push and drain.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Status: string(s.Status),
Activity: string(s.Activity.State),
AttentionLevel: attentionLevel(s.Status, s.Activity.State),
LastActivityAt: s.Activity.LastActivityAt.UTC().Format(time.RFC3339),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Zero-time LastActivityAt serializes to "0001-01-01T00:00:00Z"

For a freshly spawned session that has never received any activity, s.Activity.LastActivityAt is the zero time.Time{}, which Format(time.RFC3339) renders as "0001-01-01T00:00:00Z". The TS MuxProvider consumer likely does not expect this sentinel value and may render the field directly, exposing the year-1 timestamp in the dashboard. Consider omitting lastActivityAt (or serializing it as null/empty string) when the time is zero.

The frontend sends a subscribe frame as {ch:"subscribe", topics:[...]} with
no "type" field, but handleSubscribe required msg.Type == "subscribe" and
clientMsg had no Topics field. The real client's subscribe was silently
dropped, so no snapshot or CDC patches ever reached the dashboard.

Honor the topics array server-side: add Topics to clientMsg and subscribe to
the session feed only when topics contains "sessions". This matches the
contract the frontend already speaks and gates the per-event enrichment so
clients that did not opt into "sessions" do no session work on the hot path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Pritom14
Copy link
Copy Markdown
Collaborator Author

Pritom14 commented Jun 4, 2026

Follow-up: subscribe handshake fix

While verifying the end-to-end path against the frontend, I found that the /mux subscribe handshake did not match what the real client sends, so the sessions channel never actually started for the dashboard. This commit fixes that.

What was wrong

The frontend subscribes by sending only a channel and a topics list, with no message type:

{ "ch": "subscribe", "topics": ["sessions", "notifications"] }

On the server, handleSubscribe required msg.Type == "subscribe" and clientMsg had no topics field at all. So:

  1. msg.Type was empty, the guard returned early, and Subscribe was never called.
  2. No initial snapshot was queued and no CDC patches were forwarded.

The functional tests passed only because they sent a synthetic {"ch":"subscribe","type":"subscribe"} frame that the real client never sends.

The fix

Gate the subscription on the topics array the frontend already sends, not on the message type.

client                          server (handleSubscribe)
------                          ------------------------
{ch:"subscribe",                if events != nil &&
 topics:["sessions",   ───────▶    slices.Contains(topics, "sessions"):
          "notifications"]}            Subscribe(...)        // CDC fan-out
                                       queue full snapshot   // immediate state
                                else:
                                       no-op                 // no session work
  • Added a Topics []string field to clientMsg.
  • Subscribe to the session feed only when topics contains "sessions".
  • Removed the now-unused msgSubscribe constant.
  • Updated the functional tests to send the real frontend frame (topics array, no type).

Why gate on topics rather than just sending a type

Honoring topics is strictly more capable than making the client send a type. The frontend already declares intent per channel, so gating on it lets a client that only wants notifications skip all session enrichment (the per-event DB read plus JSON write in the write loop). The alternative would have left topics as a dead field while the server fanned out session work to every connection.

Tests

  • TestServeSubscribeWithoutSessionsTopicSendsNoSnapshot: a client subscribing to only notifications gets no snapshot and no forwarded CDC events.
  • TestClientMsgSubscribeFrameDecodes: the real frontend frame (ch + topics, no type) decodes correctly.
  • Existing snapshot, CDC forwarding, and the three TestMuxStreaming* functional tests updated to the real frame and passing.

Still open (separate decision needed)

Single-session CDC patches are currently sent as type:"snapshot" with a one-element array, and the frontend's setSessions(msg.sessions) replaces the whole array. After this handshake fix, the first single-session patch would drop every other session from the client's view. Fixing that requires either always sending full snapshots (server-only, less efficient) or adding a type:"patch" with a merge-by-id on the frontend. That touches the frontend, so it is out of scope for this commit and tracked separately.

Comment thread backend/internal/terminal/manager.go Outdated
Comment on lines +403 to +408
if ev.snapshot {
all, err := c.mgr.sessionSrc.AllSessions(ctx)
if err != nil {
c.mgr.log.Warn("terminal: failed to fetch session snapshot", "err", err)
continue
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Dropped initial snapshot with no retry leaves client without full session state

When AllSessions fails (e.g., a transient SQLite busy-reader or context deadline), the continue discards the event. The sessionEvent{snapshot: true} was enqueued exactly once in handleSubscribe and is now gone — there is no re-queue path. Subsequent CDC events only deliver per-session patches for sessions that change after connect; sessions that are idle never appear. A user who connects during a transient DB hiccup will see a permanently empty dashboard for all unchanged sessions, with no indication that state is missing.

…patches

The frontend replaces its session list wholesale on every sessions/snapshot
frame (setSessions(msg.sessions)). The writer was sending single-session
updates as a one-element snapshot, which on the client dropped every other
session from view on the first CDC event after subscribe.

Collapse the writer to always emit a full snapshot of all sessions on any
session-state signal. This keeps the existing frontend correct with no
client-side change, matching the protocol's stated goal of letting the
existing client connect unchanged. Signals are coalesced (drainSignals) so a
burst of CDC events collapses into one AllSessions read, bounding the read
amplification the full-snapshot approach introduces.

Removes the now-dead single-session path: SessionSource.Session and its
implementations on the daemon source and test fakes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Pritom14
Copy link
Copy Markdown
Collaborator Author

Pritom14 commented Jun 4, 2026

Follow-up: full snapshots instead of single-session patches

This closes the second issue noted above (single-session updates clobbering the client's session list), without any frontend change.

What was wrong

The frontend treats a sessions/snapshot frame as the complete session list:

// MuxProvider.tsx
if (msg.type === "snapshot") setSessions(msg.sessions);  // replaces the whole array

The writer, though, sent single-session CDC updates as a one-element snapshot. On the client that meant the first CDC event after subscribe replaced the entire list with just one session, dropping every other session from view.

The fix

Always emit a full snapshot of all sessions on any session-state signal, rather than a single-session patch.

CDC event  ──▶ push signal ──▶ writer: drain+coalesce signals
                                       AllSessions()  (one read)
                                       send {type:"snapshot", sessions:[...all]}

This keeps the existing frontend correct with zero client-side change, which matches the protocol's stated goal of letting the existing client connect unchanged.

Why this over a "patch" message type

A type:"patch" plus client-side merge-by-id would be more bandwidth-efficient, but it requires a coordinated change in the separate frontend repo. Full snapshots stay entirely in this repo and are correct against the current client. The read amplification this introduces is bounded by coalescing: a burst of CDC events drains into a single AllSessions read per write cycle (drainSignals), and the read is a local single-user SQLite query off the broadcaster hot path.

Cleanup

Removed the now-dead single-session path: SessionSource.Session and its implementations on the daemon source and the test fakes. The interface is now just AllSessions.

Tests

All terminal and integration tests pass (race-clean), including the three TestMuxStreaming* functional tests and the topic-gating tests from the previous commit. The only failure is the pre-existing TestSessionStreamsRealZellijPane, which is an environmental zellij socket-path limit unrelated to this change.

Comment on lines +324 to 326
if c.mgr.events == nil || !slices.Contains(msg.Topics, topicSessions) {
return
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 When events != nil but sessionSrc == nil (the default when WithSessionSource is not passed), handleSubscribe still proceeds past this guard: it creates a real CDC subscription (whose callback always returns early) and sets c.unsubEvts, permanently latching the dedup flag. The client receives no snapshot and no error, and any future re-subscribe is silently ignored because c.unsubEvts != nil short-circuits on line 328. Adding sessionSrc == nil to this guard makes the no-op path explicit and prevents the phantom subscription.

Suggested change
if c.mgr.events == nil || !slices.Contains(msg.Topics, topicSessions) {
return
}
if c.mgr.events == nil || c.mgr.sessionSrc == nil || !slices.Contains(msg.Topics, topicSessions) {
return
}

Copy link
Copy Markdown
Collaborator

@neversettle17-101 neversettle17-101 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving the /mux-vs-dedicated-endpoint question aside (taking that separately) — two things about this change that I'd want addressed before merge. Both are about where the new DB work lands.

1. The session snapshot read runs on the terminal's single write goroutine

In writeLoop (backend/internal/terminal/manager.go), the new sessionEvents case calls sessionSrc.AllSessions(ctx) synchronously:

case <-c.sessionEvents:
    drainSignals(c.sessionEvents)
    all, err := c.mgr.sessionSrc.AllSessions(ctx)   // synchronous DB read
    ...
    c.conn.WriteJSON(ctx, serverMsg{Ch: chSessions, Type: msgSnapshot, Sessions: toSessionPatches(all)})

writeLoop is the single writer for the whole connection, including PTY output drained from c.out. So this DB read now sits inline on the path that flushes terminal bytes — a slow or contended snapshot read stalls PTY output for that connection.

The PR description's goal was to move DB reads "off the broadcaster's hot path," and it does correctly get them off the poller loop. But it lands them on the write loop, which is the latency-sensitive path for the terminal. Suggest resolving the snapshot off writeLoop (e.g. a small dedicated goroutine that feeds resolved patches into c.out) so a slow read can't block PTY flushing.

2. The snapshot read is N+1 per session, and it fires per CDC event per connection

AllSessions (session_source.go) calls GetDisplayPRFactsForSession once per session, which is a single-session query (backend/internal/storage/sqlite/store/pr_facts.go:16). So each snapshot is O(sessions) queries. Combined with #1 and the full-snapshot-per-event model (every CDC event re-reads and re-ships all sessions to every connected client), this is the part that bites first as session count grows. The coalescing helps batch bursts but doesn't bound the per-snapshot cost. Worth either a batched PR-facts query, or at minimum a comment acknowledging the fan-out as a known scaling limit.

Everything else looks good — the DeriveStatus move into domain is a clean call, the subscribe-before-snapshot ordering is right, and the functional tests are thorough.

@Pritom14 Pritom14 closed this Jun 4, 2026
@Pritom14
Copy link
Copy Markdown
Collaborator Author

Pritom14 commented Jun 4, 2026

closing this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(terminal): wire CDC session-state streaming to /mux WebSocket (live sessionPatch delivery)

2 participants