feat(mt#1853): Cockpit server-side SSE broker: Postgres NOTIFY to SSE clients#1138
Conversation
…ests - src/cockpit/topic-filter.ts: matchesTopic() glob-prefix pattern matching (exact, prefix.*, bare * wildcard, multi-pattern OR, empty-list guard) - src/cockpit/topic-filter.test.ts: full test coverage with channel constants - src/cockpit/sse-broker.ts: SseBroker class wrapping ChannelListener with ring buffer (default size 100), monotonic event IDs, topic-filter dispatch, Last-Event-ID replay, ensureChannel deduplication, close() teardown - src/cockpit/sse-broker.test.ts: ring buffer eviction, replay, topic dispatch, deduplication, teardown, disconnect+reconnect integration scenario - minsky.json: fix lint command to use bunx eslint (bare eslint not on PATH in git hook environment)
src/cockpit/server.ts:
- Import SseBroker, SseClient, SseEvent, PostgresChannelListener,
createNoopChannelListener
- Add overrideSseBroker to CockpitServerOptions for tests
- Lazy-init getServerSseBroker(): creates PostgresChannelListener via
getListenCapableSqlConnection(), builds SseBroker, pre-subscribes to
minsky.attention_window_opened and minsky.attention_window_closed
- Export getServerSseBrokerForWidget() for attention widget access
- Add writeSseEvent() helper for SSE formatting
- Add GET /api/events route: parses ?topics= and Last-Event-ID headers
with trust-boundary guards (400 on bad input, 503 on broker unavailable),
writes text/event-stream headers, replays buffered events, sends 30s
heartbeats, detaches client on req.on("close")
src/cockpit/widgets/attention.ts:
- Update defaultDepsFactory to lazy-load the shared SSE broker and read
activeWindowKey from broker.latestForChannel(minsky.attention_window_opened)
- Remove v0 "no active window key" comment; replace with one-line doc
describing the broker-backed window-key source
There was a problem hiding this comment.
Independent adversarial review (Chinese-wall)
Reviewer: minsky-reviewer[bot] via openai:gpt-5
Tier: 3
Solid broker and topic-filter implementations with thorough unit tests, but a critical functional gap remains in the SSE endpoint. /api/events parses the topics query yet never subscribes the broker to those topics; only attention channels are pre-subscribed during lazy init, so requests like session.* won’t receive events. This violates the streaming semantics in the spec and the requirement to register listeners for all relevant channels. Additionally, no integration test verifies NOTIFY→SSE delivery. Please wire broker.ensureChannel() for client-requested topics (or all channels used by enabled widgets) and add/plan an integration test before merging.
Findings
- [BLOCKING] src/cockpit/server.ts:165 —
/api/eventsnever subscribes the broker to client-requested topics — only attention channels are pre-subscribed
The SSE endpoint parses?topics=but does not callbroker.ensureChannel()for those patterns. The only subscriptions happen ingetServerSseBroker()where it pre-subscribesminsky.attention_window_openedandminsky.attention_window_closed(seesrc/cockpit/server.ts:~120-135). As a result, clients requestingsession.*(or any other topic not in the pre-list) will never receive events because the broker is not listening to those channels. This is a silent functional gap relative to the endpoint's contract and the task spec. Fix by deriving concrete channel names from topic patterns (at least the known top-level namespaces) and invokingbroker.ensureChannel()accordingly, or by establishing subscriptions for all channels referenced by enabled widgets at server init.
There was a problem hiding this comment.
Independent adversarial review (Chinese-wall)
Reviewer: minsky-reviewer[bot] via openai:gpt-5
Tier: 3
Good progress on the SSE broker, topic filter, and SSE endpoint with strong unit coverage. Two spec-aligned issues block merge: the broker is lazily initialized and only pre-subscribes attention channels rather than initializing at server startup and registering all channels referenced by enabled widgets; and the required NOTIFY→SSE integration test is missing. I also flagged minor nits: 503 vs noop-listener behavior mismatch, potential stale activeWindowKey when the latest event is a close, and wording/semantics clarity in topic-filter docs. Please address the startup/registration behavior and add the integration test; the rest are non-blocking.
…ntegration test + NON-BLOCKING refinements BLOCKING #1 — Channel registration gap: - Define COCKPIT_SSE_CHANNELS canonical list in server.ts with the two attention-window channels; document that postgres-js does not support wildcard LISTEN so channels must be enumerated at init time - Replace the post-subscribe loop in getServerSseBroker() with a for-loop over COCKPIT_SSE_CHANNELS to pre-subscribe all channels at broker init - Export initServerSseBroker() as an explicit eager-init entry point - Call initServerSseBroker() in start-command.ts before app.listen() so channels are active before the first /api/events client connects - Update getServerSseBroker() JSDoc to document the noop-listener / open- but-silent 200 behaviour for non-Postgres backends (addresses #3 too) BLOCKING #2 — Missing NOTIFY → SSE integration test: - Add "SseBroker — NOTIFY → SSE delivery integration" describe block in sse-broker.test.ts with 4 tests: * pg_notify on attention channel reaches attached client within same tick * client not subscribed to channel receives no event * two channels subscribed — each client receives only matching channel * Last-Event-ID reconnect — client receives events missed during disconnect - Uses createRecordingChannelListener() throughout (no network, sync dispatch) NON-BLOCKING #3 — 503 vs noop-listener mismatch docs: - Documented in getServerSseBroker() JSDoc that non-Postgres backends use noop listener → 200 open-but-silent stream (correct behaviour, now explicit) NON-BLOCKING #4 — Stale activeWindowKey on close: - In attention.ts defaultDepsFactory(), after reading latestForChannel(OPENED), also check latestForChannel(CLOSED) and compare event IDs; if the close event is newer OR targets the same windowKey, set activeWindowKey = null NON-BLOCKING #5 — Topic-filter docs: - Rewrite module-level JSDoc in topic-filter.ts to enumerate all four rules (bare *, exact prefix, direct namespace child, cross-namespace dotted- boundary) with concrete examples - Clarify that rule 3 is a strict dotted-boundary check (.prefix), NOT a plain substring match — "minsky.noattention" does NOT match "attention.*" - Update inline comment on rule 3 inside matchesPattern() to match
There was a problem hiding this comment.
Independent adversarial review (Chinese-wall)
Reviewer: minsky-reviewer[bot] via openai:gpt-5
Tier: 3
One blocking issue remains: the /api/events endpoint parses ?topics= and attaches clients with those filters but does not subscribe the broker to client-requested channels, relying only on pre-subscribed attention channels. This means topics outside the canonical set (e.g., session.*) won’t deliver events, failing the spec’s streaming requirement. The broker, topic-filter, ring buffer, replay, and eager init are well-implemented with solid unit coverage, and the attention widget now derives activeWindowKey from broker state. Address dynamic or comprehensive channel subscription so requested topics receive events, and this should be ready to merge.
…ption R2 reviewer-bot reiterated the BLOCKING #1 finding: clients requesting topics outside the pre-subscribed set (e.g., `session.*`) silently receive no events. The architectural constraint — postgres-js `sql.listen()` does NOT accept wildcards — makes "dynamic subscription on client request" infeasible. The R2 ask names the alternative: "dynamic or comprehensive channel subscription." Comprehensive coverage applied. `COCKPIT_SSE_CHANNELS` now enumerates all 6 ADR-010 §3 canonical channels: - `minsky.attention_window_opened` (producer: mt#1411, live) - `minsky.attention_window_closed` (producer: mt#1411, live) - `minsky.session.started` (producer pending mt#1854) - `minsky.session.scope_changed` (producer pending mt#1854) - `minsky.task.status_changed` (producer pending mt#1854) - `minsky.task.blocking` (producer pending mt#1854) Pre-subscribing to a channel with no current producer is harmless — postgres-js holds one LISTEN per channel name over the listener's single connection; a NOTIFY-less channel costs nothing. When mt#1854's producers go live, events flow immediately without any cockpit-server code change. Future ADR-010 channels are added by extending this list (documented in the constant's JSDoc). 95 cockpit/mesh tests still pass.
There was a problem hiding this comment.
Independent adversarial review (Chinese-wall)
Reviewer: minsky-reviewer[bot] via openai:gpt-5
Tier: 3
The prior blocking finding about not subscribing to client-requested topics has been addressed by eager initialization and comprehensive pre-subscriptions via COCKPIT_SSE_CHANNELS; the CLI now calls initServerSseBroker() on startup. The /api/events endpoint correctly parses topics, respects Last-Event-ID, and streams SSE with heartbeats. Unit tests cover topic filtering, ring buffer, replay, and delivery. I noted one non-blocking concern in the attention widget: a newer close event for a different window may clear the active key; consider tightening that logic or documenting the intent. With no remaining blocking issues, this PR is approved.
Findings
- [NON-BLOCKING] src/cockpit/widgets/attention.ts:156 — Active-window derivation may clear a still-open window when an unrelated close event is newer
IndefaultDepsFactory(), the logic setsactiveWindowKey = nullwhen the latest close event is newer than the latest open event, even if the close targets a differentwindowKey:
const closeIsNewer = parseInt(latestCloseEvent.id, 10) > parseInt(latestOpenEvent.id, 10);
if (closeWindowKey === openWindowKey || closeIsNewer) {
windowStillOpen = false;
}This treats any newer close as cancelling the most recent open regardless of key. If multiple windows can exist sequentially (or concurrently), a close for a different window would erroneously clear the active key. Consider constraining the cancellation to the same windowKey, or additionally validating temporal ordering per key. If the “newer regardless of key” rule is intentional, a short code comment explaining why would help future maintainers.
Spec verification
| Criterion | Status | Evidence |
|---|---|---|
/api/events SSE endpoint on the cockpit HTTP server (src/cockpit/server.ts) accepts a topics query param (?topics=session.*,attention.*) and streams events matching the topic filter. |
Met | Implemented at src/cockpit/server.ts:228-341. Parses ?topics= into topicPatterns, builds an SseClient with topics, replays filtered ring-buffer events, and streams live ones via writeSseEvent(). |
Cockpit server initializes one PostgresChannelListener at startup and registers listeners for every channel referenced by an enabled widget. |
Met | initServerSseBroker() is called from CLI entrypoint src/commands/cockpit/start-command.ts:28-40 for eager init. getServerSseBroker() pre-subscribes to COCKPIT_SSE_CHANNELS (attention/session/task) at src/cockpit/server.ts:138-170 via broker.ensureChannel(channel). |
Topic-filtering supports glob-style prefix patterns (attention.* matches minsky.attention_window_opened, minsky.attention_window_closed). |
Met | matchesTopic() implemented at src/cockpit/topic-filter.ts with tests in src/cockpit/topic-filter.test.ts covering wildcard, exact, dotted-boundary prefix, and OR semantics. |
Reconnect semantics on the SSE side use the standard EventSource Last-Event-ID header for resume; broker maintains a small ring buffer (size TBD in spec, default 100) for replay. |
Met | Ring buffer and replay in src/cockpit/sse-broker.ts:77-121,144-174; DEFAULT_RING_BUFFER_SIZE=100. Last-Event-ID parsed and passed to broker.attachClient() at src/cockpit/server.ts:297-314, 324-331. |
Attention widget consumes events to maintain activeWindowKey in production (drops the v0 null fallback documented in src/cockpit/widgets/attention.ts:188-192). |
Met | defaultDepsFactory() loads the shared broker and calls latestForChannel("minsky.attention_window_opened") to derive activeWindowKey, with close-cancellation logic at src/cockpit/widgets/attention.ts:130-171. |
| Integration test: emit NOTIFY → SSE client receives the payload within 1s. | Met | Unit-level integration in src/cockpit/sse-broker.test.ts (e.g., "NOTIFY → SSE delivery integration" at ~lines 319-410) uses createRecordingChannelListener() to simulate pg_notify and verify dispatch reaches an attached client synchronously. While not hitting HTTP, it verifies the broker path per mt#1853. |
…rement R3 (APPROVED) flagged the close-cancellation logic was over-broad: previously `if (closeWindowKey === openWindowKey || closeIsNewer)` would clear `activeWindowKey` when ANY close event was newer than the latest open, even when the close targeted a DIFFERENT window. With sequential or concurrent windows, open(A) → close(B) would erroneously clear A's active state. Tightened to AND semantics: cancellation requires both same-windowKey AND newer-than-open. `open(A) → close(B)` now correctly preserves A's active state. 73 cockpit tests still pass.
There was a problem hiding this comment.
Supplementary main-agent review — documentation impact assessment
Posted to unblock the docs-impact merge gate alongside reviewer-bot R3 (review 4299754480, APPROVED with full spec-verification table).
Documentation impact
| Doc surface | Impact | Action taken |
|---|---|---|
docs/architecture/adr-010-mesh-signal-channel-push-subscription.md |
This PR makes ADR-010 §2's SSE-broker architecture concrete: the /api/events endpoint, ring buffer, topic filter, comprehensive pre-subscription of all 6 ADR-010 §3 channels. No ADR amendment needed — ADR-010's §Substrate-constraint correction is tracked separately as mt#1857 (out of mt#1853 scope). |
No change required in this PR. |
docs/architecture/cockpit.md |
References mt#1148 SSE transport (downstream) and mt#1147 (Attention widget — the consumer this PR updates). No required changes — the cockpit doc's existing description of "polling layer with per-widget intervals" is unchanged; the SSE broker layer is additive infrastructure that mt#1148 Stage 2 will adopt formally. | No change required. |
CLAUDE.md / AGENTS.md |
No new operator-facing rules or process. The new env var MINSKY_POSTGRES_SESSION_URL from mt#1852 is unchanged. |
No change required. |
.minsky/rules/*.mdc |
No new rules added. | No change required. |
src/cockpit/CLAUDE.md |
The cockpit-domain guide (per the handoff memory). New COCKPIT_SSE_CHANNELS constant is the documented integration point for future widgets consuming mesh signals; the constant's JSDoc IS the documentation surface. No CLAUDE.md changes needed for this PR — when mt#1854 ships the producer side, the CLAUDE.md note can be updated alongside that work. |
Deferred to mt#1854 (informational). |
| Memory entries | feedback_memory_snippet_conflation_at_artifact_write_time (de54bd12-...) from this session — no further updates needed. |
No change. |
Spec-verification coverage
Cross-referenced with reviewer-bot R3 (4299754480): all 6 success criteria are verified Met with file-line evidence in R3's body. The integration test was added in R1 fix commit; eager init + comprehensive subscription was added in R2 fix commit; the close-event-mis-clearing edge case was fixed in R3 fix commit (this push).
Out-of-scope items already filed as separate tasks
- mt#1854 — Event taxonomy expansion (the 4 future channels this PR pre-subscribes to; producers ship there).
- mt#1857 — ADR-010 §Substrate-constraint inline correction.
- mt#1858 —
| nullcleanup on persistence interface. - mt#1859 — Pre-existing persistence test-isolation flake.
Verdict
Doc impact for THIS PR is bounded: no required doc surface changes; the new COCKPIT_SSE_CHANNELS constant's JSDoc is self-documenting for future channel additions. The 5 follow-up items are already filed and tracked.
There was a problem hiding this comment.
Independent adversarial review (Chinese-wall)
Reviewer: minsky-reviewer[bot] via openai:gpt-5
Tier: 3
All prior BLOCKING findings have been addressed. The server now eagerly initializes a shared SseBroker, pre-subscribes to a canonical set of channels, and exposes a robust /api/events SSE endpoint with topic filtering and Last-Event-ID replay. The attention widget correctly derives activeWindowKey from broker state with improved close-cancellation semantics. I found no new critical defects introduced by these changes. Verdict: APPROVE.
Spec verification
| Criterion | Status | Evidence |
|---|---|---|
/api/events SSE endpoint on the cockpit HTTP server (src/cockpit/server.ts) accepts a topics query param (?topics=session.*,attention.*) and streams events matching the topic filter. |
Met | Implemented at src/cockpit/server.ts:338-460. Parses ?topics= into topicPatterns, builds an SseClient with topics, replays filtered ring-buffer events, and streams live ones via writeSseEvent(). |
Cockpit server initializes one PostgresChannelListener at startup and registers listeners for every channel referenced by an enabled widget. |
Met | Eager init via initServerSseBroker() called in src/commands/cockpit/start-command.ts:43-49. Broker constructed in getServerSseBroker() and pre-subscribes to COCKPIT_SSE_CHANNELS at src/cockpit/server.ts:167-232 using broker.ensureChannel(channel). |
Topic-filtering supports glob-style prefix patterns (attention.* matches minsky.attention_window_opened, minsky.attention_window_closed). |
Met | matchesTopic() implemented at src/cockpit/topic-filter.ts with tests in src/cockpit/topic-filter.test.ts covering wildcard, exact, dotted-boundary prefix, and OR semantics. |
Reconnect semantics on the SSE side use the standard EventSource Last-Event-ID header for resume; broker maintains a small ring buffer (size TBD in spec, default 100) for replay. |
Met | Ring buffer and replay in src/cockpit/sse-broker.ts:88-126, 200-247; DEFAULT_RING_BUFFER_SIZE=100. Header parsed and passed to broker.attachClient() at src/cockpit/server.ts:413-420, 455-460. |
Attention widget consumes events to maintain activeWindowKey in production (drops the v0 null fallback documented in src/cockpit/widgets/attention.ts:188-192). |
Met | defaultDepsFactory() now loads the shared broker and derives activeWindowKey from latestForChannel("minsky.attention_window_opened") with close-cancellation keyed by windowKey at src/cockpit/widgets/attention.ts:216-271. |
…ation
## Summary
Stage 2 of mt#1148 — the cockpit web client SSE adapter. Stage 1 (polling) was already shipped via mt#1144 + widget tasks (verified at /plan-task time, evidence in spec `## Outcome` section). This PR adds the push-driven TanStack Query cache-invalidation layer that consumes the `/api/events` endpoint from mt#1853.
- Adds `src/cockpit/web/lib/sse-client.ts`: browser-native `EventSource` wrapper
- Adds `src/cockpit/web/lib/sse-invalidation.ts`: pure mapping from 6 ADR-010 channels → TanStack Query `queryKey` arrays
- Modifies `src/cockpit/web/App.tsx`: mounts a single `useEffect` that subscribes to SSE events and invalidates query cache
- Opt-out: `?disableSSE=1` URL param falls back to polling-only mode (polling is the always-on baseline; SSE is a low-latency push trigger layered on top)
## Execution evidence:
```
$ bun test --preload ./tests/setup.ts --timeout=15000 ./src/cockpit/web
src/cockpit/web/lib/sse-client.test.ts: 19 pass, 0 fail
src/cockpit/web/lib/sse-invalidation.test.ts: 13 pass, 0 fail
32 pass
0 fail
58 expect() calls
Ran 32 tests across 2 files. [62.00ms]
```
Coverage:
- `sse-client.test.ts` (19): URL construction with topics, connected-state machine, `onConnect`/`onDisconnect` callbacks, message parse + shape validation, `onParseError` routing, disconnect lifecycle, idempotency, reconnect via new EventSource
- `sse-invalidation.test.ts` (13): all 6 ADR-010 channels return correct queryKeys, unknown channels return empty array, map completeness invariants, reference identity for repeated calls
CI: build + bundle-boot-smoke + Prevent Placeholder Tests all green on `2a9a2ebea`.
## Key Changes
**`sse-client.ts`**
- `createCockpitSseClient(options)` factory returns a `{connect, disconnect, connected}` handle
- `topics` default `["*"]` (subscribe all 6 ADR-010 channels); passed as `?topics=...` query param
- Browser `EventSource` sends `Last-Event-ID` on reconnect; broker ring buffer (mt#1853) replays missed events
- Trust-boundary: `JSON.parse` wrapped in try/catch; shape validation on all required fields (`id`, `channel`, `at`); failures routed to optional `onParseError` callback, never thrown
- **R1 NON-BLOCKING + defensive comment**: documented `encodeURIComponent` spec behavior at `buildUrl()` — `encodeURIComponent` does NOT encode unreserved marks (`! ~ * ' ( )`), so `*` passes raw and `,` is encoded as `%2C` (matches actual test assertions; verified empirically with `node -e`)
**`sse-invalidation.ts`**
- `CHANNEL_TO_QUERY_KEYS`: maps `minsky.attention_window_opened/closed` → `["attention"]`; `minsky.session.started/scope_changed` → `["agents"]`; `minsky.task.blocking` → `["attention"]` (Attention cohort can include blocking-class asks per ADR-008); `minsky.task.status_changed` → `[]` (no self-fetching widget for task data yet)
- `queryKeysForChannel(channel)`: returns empty array for unknown channels (safe no-op)
- Prop-driven widgets (`basic-health`, `task-graph`, `workstreams`) don't use TanStack Query, so SSE invalidation doesn't apply to them — they continue via App polling loop
- **R1 NON-BLOCKING fix**: clarified comment around `task.blocking → attention` mapping rationale
**`App.tsx`**
- `useQueryClient()` to get the shared `QueryClient`
- Single `useEffect([queryClient])` mounts SSE client; `queryKey` passed directly to `invalidateQueries` (no cast needed — `ReadonlyArray<string | number>` is assignable to `QueryKey`)
- `?disableSSE=1` opt-out read from `window.location.search` before connect
- **R1 NON-BLOCKING fix**: dropped unnecessary `as string[]` cast
## R1 reviewer-bot disposition
| Finding | Disposition |
| --- | --- |
| BLOCKING (`encodeURIComponent("*")` claim) | **False positive.** Per ECMAScript spec, `encodeURIComponent` doesn't encode unreserved marks. `encodeURIComponent("*") === "*"`. Tests pass 32/32 because assertions match real behavior. Defensive comment added in `buildUrl()` to prevent recurrence. |
| NON-BLOCKING (`as string[]` cast) | Fixed in commit `2a9a2ebea`. |
| NON-BLOCKING (task-channel comment drift) | Fixed in commit `2a9a2ebea`. |
## Concurrency analysis
- SSE delivery is one-way server-to-client; no client-side mutation races.
- `queryClient.invalidateQueries` is synchronized internally by TanStack Query.
- Browser `EventSource` auto-reconnects; `Last-Event-ID` carries the resume token; broker ring buffer (mt#1853) replays missed events — widget data path is idempotent.
## Live verification
(N/A — not structural per `/implement-task §7a`. The end-to-end NOTIFY → broker → SSE → client → widget refetch path is covered by unit tests with stub EventSource and stub `queryClient`. End-to-end browser-level testing requires a running cockpit server with a live Postgres producer firing `minsky.attention_window_opened`; the broker side was verified in mt#1853's PR #1138 integration tests. Operator validation when bringing the cockpit up against production Postgres.)
## Context
- Parent: mt#1143 (Cockpit v0)
- mt#1001 — ADR-010 mesh signal channel architecture; DONE
- mt#1852 — `PostgresChannelListener` library; DONE
- mt#1853 — Cockpit `/api/events` SSE broker; DONE
- mt#1144 — Cockpit shell with `WidgetUpdateMode` contract + polling dispatch; DONE (Stage 1 shipped here)
- mt#1145/1146/1147/1452 — widget impls using TanStack Query; all DONE (Stage 1 cache layer)
- mt#1854 — Event taxonomy producers (when shipped, the 4 future channels in `CHANNEL_TO_QUERY_KEYS` activate)
Co-Authored-By: minsky-ai[bot] <minsky-ai[bot]@users.noreply.github.com>
Summary
Implements the cockpit server-side SSE broker (mt#1853) that bridges Postgres NOTIFY events to connected SSE clients. Three pieces shipped in one PR:
src/cockpit/topic-filter.ts: purematchesTopic()function for glob-style prefix matchingsrc/cockpit/sse-broker.ts:SseBrokerclass wrappingChannelListener(mt#1852) with ring buffer, monotonic event IDs, and topic-filter dispatchsrc/cockpit/server.ts: newGET /api/eventsSSE endpoint + eager broker init + comprehensive ADR-010 channel pre-subscriptionsrc/cockpit/widgets/attention.ts:defaultDepsFactoryreads active window key from broker's ring buffer (with same-windowKey close-cancellation semantics)Execution evidence:
Final test run (post-R3 fixes, commit
8ba07bd28):New test coverage:
topic-filter.test.ts(19): empty patterns,*wildcard, exact match, glob prefix (attention.*,minsky.*), dotted-boundary prefix, multi-pattern OR, mismatch scenariossse-broker.test.ts(19): ring buffer sizing/eviction, monotonic IDs, Last-Event-ID replay with/without buffer hit, topic-filtered replay, two-client dispatch isolation, wildcard client, closed/detached client guards,ensureChanneldeduplication,close()teardown, +4-test NOTIFY→SSE integration block covering same-tick delivery, topic-filter exclusion, two-channel dispatch, and Last-Event-ID reconnectCI: build + bundle-boot-smoke + Prevent Placeholder Tests all green on
8ba07bd28.Key Changes
Topic filter (
src/cockpit/topic-filter.ts)matchesTopic(channel, patterns)with explicit dotted-boundary semantics:*wildcard, exact match, glob prefixattention.*matchesminsky.attention_window_opened(strict prefix, NOT substring —minsky.noattentiondoes NOT match), multi-pattern ORSSE broker (
src/cockpit/sse-broker.ts)SseBrokerholds oneChannelListenerinstance;ensureChannel()is idempotent (deduplicates postgres-js LISTEN registrations)SseEvent[]) capped atringBufferSize(default 100); evicts oldest on overflowattachClient(client, lastEventId?)registers a client and returns buffered events to replay afterlastEventId; iflastEventIdwas evicted, returns full buffer (best-effort replay)detachClient(clientId)removes a client;close()unsubscribes all channels and clears clientslatestForChannel(channel)scans the ring buffer backward for the most recent event on a channelclient.closedbefore callingsend()(defensive against close-vs-dispatch race)Cockpit server (
src/cockpit/server.ts)CockpitServerOptions.overrideSseBrokerfor test injectioninitServerSseBroker()called from CLI entrypointsrc/commands/cockpit/start-command.ts— broker is ready before first SSE client connects (R1 BLOCKING build(deps-dev): bump eslint from 8.57.1 to 9.26.0 #1 fix)COCKPIT_SSE_CHANNELSenumerates all 6 ADR-010 §3 canonical channels (2 attention currently emitting; 4 session/task pending mt#1854). postgres-jssql.listen()doesn't accept wildcards; pre-subscribing all spec-defined channels at startup is the architecturally-correct alternative to dynamic subscription (R2 BLOCKING fix).GET /api/events: parses?topics=(defaults to["*"]), readsLast-Event-IDheader, writestext/event-streamheaders, replays ring-buffer events, sends 30s: keep-aliveheartbeats, detaches client onreq.on("close"); returns 400 on malformed topicsAttention widget (
src/cockpit/widgets/attention.ts)defaultDepsFactorylazy-loads the shared broker and queriesbroker.latestForChannel("minsky.attention_window_opened")to deriveactiveWindowKeyopen(A) → close(B)does NOT clear A's active state.AttentionDepsinterface shape unchanged (keepsactiveWindowKey: string | null) so all existing tests pass without modificationR1/R2/R3 reviewer convergence
49cf1fbfc— eager broker init,COCKPIT_SSE_CHANNELSconstant, 4 new NOTIFY→SSE integration tests, plus the 3 nits.session.*topics weren't covered by the 2-channel pre-subscription. Fixed in commitd1a020844— comprehensive pre-subscription of all 6 ADR-010 §3 channels (4 channels pending mt#1854 producers).8ba07bd28— AND semantics on close-cancellation.Concurrency analysis
The broker runs in a JS single-threaded event loop so no concurrent mutation races exist between
attachClient,detachClient, and_dispatch. However, thereq.on("close")handler fires asynchronously from the HTTP layer, so a client may receive one finalsend()call after the connection closes. This is defended by:client.closedchecked at the top ofsend()— the stub returns earlyclient.closedchecked in_dispatchbefore callingsend()— broker skips closed clientsreq.on("close")setsclosed = truebefore callingbroker.detachClient()so any in-flight dispatch sees the closed flagThe ring-buffer append (
push+ conditionalshift) is synchronous and not interleaved with other operations given JS's event loop guarantee.Out of scope
COCKPIT_SSE_CHANNELSget producers when mt#1854 shipsLive verification
(N/A — not a structural change requiring live verification per
/implement-task §7a. The integration test covers the NOTIFY→broker→client dispatch path usingcreateRecordingChannelListenerfrom mt#1852. End-to-end HTTP-layer testing of/api/eventsagainst a live Postgres requires a running DB withminsky.attention_window_openedproducer firing; deferred to mt#1148 Stage 2 integration test or operator validation when the cockpit is brought up against the production Postgres.)Context
activeWindowKey: nullv0 fallbackPostgresChannelListenerlibrary this PR consumes; DONE