perf(stream): project SSE frames once per role, not per subscriber#353
perf(stream): project SSE frames once per role, not per subscriber#353EricAndrechek wants to merge 2 commits into
Conversation
Move the broadcast hub into internal/stream as `Hub`: subscribers register under (topic, role), and Broadcast decodes each event once, applies each subscribed role's column policy once, builds one SSE frame per role, and fans it to every member of that role's Bucket. Previously every connection re-ran unmarshal -> Evaluate -> filter -> marshal (plus a second unmarshal for the id: timestamp) on the same event, so the work scaled with subscribers, not distinct output shapes — the ~2270 deliveries/s ceiling from #294. The (topic, role) key is claims-independent: column visibility derives only from the role+table policy entry, and the stream path applies no row-level filter (documented invariant). The handler's two select cases collapse into one byte-pump over a single Subscriber.Frames() queue of typed Frames; the queue grows from cap 1 to 64 so live events buffer while the handler is mid-write. Gap-fill replay and Last-Event-ID/?since= resumption are unchanged (replay stays per-connection via the shared stream.ReplayFrame; live frames carry the same id: <received_timestamp>). Slow-consumer drops now increment wavehouse_sse_dropped_frames_total; an inert Subscriber.Evicted() seam is wired for the eviction follow-up. The per-delivery OTel span (another #294 item) was already removed in #346. Also drops the now-orphaned, test-only internal/api/transform.go (transformForClient had no non-test caller). First PR of the #294 epic. Deferred to follow-ups: active slow-consumer eviction (#94) and right-sizing the subscriber buffer + lock cost (#152). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF
|
Warning Review limit reached
More reviews will be available in 24 minutes and 17 seconds. Learn how PR review limits work. Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file). ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits. 🚦 How do rate limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Plus Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughThe SSE path now uses ChangesSSE Delivery-Path Refactor
Sequence Diagram(s)sequenceDiagram
participant embeddedMQ as embeddedMQ subscription callback
participant streamHub as stream.Hub
participant streamSubscriber as stream.Subscriber
participant StreamHandler as StreamHandler
participant Heartbeater as Heartbeater
embeddedMQ->>streamHub: Broadcast(msg.Subject, msg.Data)
streamHub->>streamSubscriber: Send(Frame)
Heartbeater->>streamSubscriber: Send(KindKeepalive frame)
StreamHandler->>streamSubscriber: read Frames()
StreamHandler->>streamSubscriber: stop on Evicted()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
📚 Docs preview is live → https://554b1433-wavehouse-docs.wave-rf.workers.dev
|
There was a problem hiding this comment.
Actionable comments posted: 6
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: d4ab6da2-7dcf-4d42-b5f8-704077280b08
📒 Files selected for processing (25)
AGENTS.mdCHANGELOG.mdcmd/wavehouse/main.godocs/src/content/docs/architecture.mdinternal/api/errors_test.gointernal/api/hub.gointernal/api/hub_test.gointernal/api/router_test.gointernal/api/stream.gointernal/api/stream_test.gointernal/api/transform.gointernal/api/transform_test.gointernal/policy/policy.gointernal/stream/bucket.gointernal/stream/bucket_test.gointernal/stream/doc.gointernal/stream/filter_test.gointernal/stream/heartbeat.gointernal/stream/heartbeat_test.gointernal/stream/hub.gointernal/stream/hub_test.gointernal/stream/metrics.gointernal/stream/subscriber.gointernal/stream/subscriber_test.gotests/integration/setup_test.go
💤 Files with no reviewable changes (4)
- internal/api/transform.go
- internal/api/hub.go
- internal/api/transform_test.go
- internal/api/hub_test.go
📜 Review details
⏰ Context from checks skipped due to timeout. (2)
- GitHub Check: Docs preview
- GitHub Check: Lint
⚠️ CI failures not shown inline (2)
GitHub Actions: PR housekeeping / 0_PR housekeeping.txt: perf(stream): project SSE frames once per role, not per subscriber
Conclusion: failure
##[group]Run # Single source of truth for the rule: scripts/lint-pr-title.sh — the
�[36;1m# Single source of truth for the rule: scripts/lint-pr-title.sh — the�[0m
�[36;1m# SAME validator the local agent gate runs (.claude/hooks/agent-bash-gate.sh),�[0m
�[36;1m# so CI and local can't drift. The checkout above is ref: main, so this is�[0m
�[36;1m# always the default-branch script. Dependabot's grouped-update titles�[0m
�[36;1m# routinely exceed the 72-char subject cap and the format isn't�[0m
�[36;1m# configurable, so Dependabot PRs are exempt from the length check�[0m
�[36;1m# (the format check still applies).�[0m
�[36;1mif [[ "$PR_AUTHOR" == "dependabot[bot]" || "$PR_AUTHOR" == "app/dependabot" ]]; then�[0m
�[36;1m export PR_TITLE_SKIP_LENGTH=1�[0m
�[36;1mfi�[0m
�[36;1m�[0m
�[36;1mif reason=$(bash scripts/lint-pr-title.sh "$PR_TITLE" 2>&1); then�[0m
�[36;1m echo "passed=true" >> "$GITHUB_OUTPUT"�[0m
�[36;1m echo "PR title OK: $PR_TITLE"�[0m
�[36;1melse�[0m
�[36;1m echo "passed=false" >> "$GITHUB_OUTPUT"�[0m
�[36;1m printf '%s\n' "$reason"�[0m
�[36;1m echo "::error::$(printf '%s' "$reason" | head -1)"�[0m
GitHub Actions: PR housekeeping / PR housekeeping: perf(stream): project SSE frames once per role, not per subscriber
Conclusion: failure
##[group]Run # Single source of truth for the rule: scripts/lint-pr-title.sh — the
�[36;1m# Single source of truth for the rule: scripts/lint-pr-title.sh — the�[0m
�[36;1m# SAME validator the local agent gate runs (.claude/hooks/agent-bash-gate.sh),�[0m
�[36;1m# so CI and local can't drift. The checkout above is ref: main, so this is�[0m
�[36;1m# always the default-branch script. Dependabot's grouped-update titles�[0m
�[36;1m# routinely exceed the 72-char subject cap and the format isn't�[0m
�[36;1m# configurable, so Dependabot PRs are exempt from the length check�[0m
�[36;1m# (the format check still applies).�[0m
�[36;1mif [[ "$PR_AUTHOR" == "dependabot[bot]" || "$PR_AUTHOR" == "app/dependabot" ]]; then�[0m
�[36;1m export PR_TITLE_SKIP_LENGTH=1�[0m
�[36;1mfi�[0m
�[36;1m�[0m
�[36;1mif reason=$(bash scripts/lint-pr-title.sh "$PR_TITLE" 2>&1); then�[0m
�[36;1m echo "passed=true" >> "$GITHUB_OUTPUT"�[0m
�[36;1m echo "PR title OK: $PR_TITLE"�[0m
�[36;1melse�[0m
�[36;1m echo "passed=false" >> "$GITHUB_OUTPUT"�[0m
�[36;1m printf '%s\n' "$reason"�[0m
�[36;1m echo "::error::$(printf '%s' "$reason" | head -1)"�[0m
🧰 Additional context used
📓 Path-based instructions (7)
internal/stream/**
📄 CodeRabbit inference engine (AGENTS.md)
Structured-query and live-stream reads must share the same per-column decision function so column visibility cannot drift.
Files:
internal/stream/doc.gointernal/stream/filter_test.gointernal/stream/subscriber_test.gointernal/stream/heartbeat.gointernal/stream/metrics.gointernal/stream/heartbeat_test.gointernal/stream/bucket.gointernal/stream/subscriber.gointernal/stream/bucket_test.gointernal/stream/hub.gointernal/stream/hub_test.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
Use table-driven tests with
t.Run(tt.name, ...), and add corresponding test cases for every new function.
Files:
internal/stream/filter_test.gointernal/stream/subscriber_test.gointernal/api/errors_test.gotests/integration/setup_test.gointernal/stream/heartbeat_test.gointernal/stream/bucket_test.gointernal/api/router_test.gointernal/stream/hub_test.gointernal/api/stream_test.go
internal/policy/**
📄 CodeRabbit inference engine (AGENTS.md)
Hasura-style access control must fail closed:
policy.IsAdminis the single admin check, empty or absent roles match nothing,Validaterejects empty role keys,nilpolicy denies everyone, anddefault_role == admin_roleis dev-only and loudly warned.
Files:
internal/policy/policy.go
internal/api/**
📄 CodeRabbit inference engine (AGENTS.md)
Handler error responses must stay in sync with the API docs error tables.
Files:
internal/api/errors_test.gointernal/api/router_test.gointernal/api/stream_test.gointernal/api/stream.go
CHANGELOG.md
📄 CodeRabbit inference engine (AGENTS.md)
Record any notable change under
[Unreleased]inCHANGELOG.md.
Files:
CHANGELOG.md
docs/src/content/docs/architecture.md
📄 CodeRabbit inference engine (AGENTS.md)
When changing core architecture or adding a package, update
docs/src/content/docs/architecture.mdandAGENTS.md.
Files:
docs/src/content/docs/architecture.md
docs/src/content/docs/**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
Author Mermaid diagrams vertically by default, avoid wide side-by-side diagrams, and keep node labels short so diagrams remain legible in the docs column width.
Files:
docs/src/content/docs/architecture.md
🧠 Learnings (5)
📓 Common learnings
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Validate locally before every push by running `make ci` the documented way; do not use CI as the first feedback loop.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: On PR-branch pushes, run the required pre-push reviewers via `/prepush` until each applicable reviewer returns `ship_it`, and never hardcode the reviewer set.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Every code change must update its corresponding documentation and `CHANGELOG.md` in the same PR.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Address and resolve every review finding; do not silently drop findings, and either fix them or track them in an issue.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Agents must create draft PRs only, and PR titles must pass the Conventional Commits gate and stay within 72 characters.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Never force-push or rebase a PR branch; merge `origin/main` instead when syncing with upstream.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Never hand-write markers or use `--no-verify`; use the documented gates and marker tooling instead.
📚 Learning: 2026-06-26T12:23:22.696Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 346
File: internal/stream/subscriber_test.go:9-28
Timestamp: 2026-06-26T12:23:22.696Z
Learning: In this Go repository, prefer table-driven tests (e.g., `[]struct{...}` with `t.Run(...)`) only for tests that cover multiple scenarios/inputs and can be cleanly enumerated. Do not artificially rewrite a clear single-scenario sequential behavioral-flow test into a table-driven form just to fit the pattern; if there’s only one meaningful scenario, keep the test as a straightforward linear flow (as in `TestSubscriber_SendDeliversThenDropsWhenFull`).
Applied to files:
internal/stream/filter_test.gointernal/stream/subscriber_test.gointernal/api/errors_test.gotests/integration/setup_test.gointernal/stream/heartbeat_test.gointernal/stream/bucket_test.gointernal/api/router_test.gointernal/stream/hub_test.gointernal/api/stream_test.go
📚 Learning: 2026-05-20T01:02:00.784Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 164
File: internal/api/router_test.go:289-350
Timestamp: 2026-05-20T01:02:00.784Z
Learning: In WaveHouse’s internal API tests (files matching internal/api/**/*_test.go), follow the existing separation-of-concerns convention for testing the RequireRole middleware: inject `ContextKeyRole` directly into the request `context.Context` instead of using `testutil.MakeJWT`/JWT-driven flows. Do not refactor role-gate tests to use JWT tokens—JWT parsing and token handling are covered separately in `middleware_test.go` (the dedicated JWT parsing tests), and mixing those concerns would expand the failure surface and reduce isolation.
Applied to files:
internal/api/errors_test.gointernal/api/router_test.gointernal/api/stream_test.go
📚 Learning: 2026-05-23T01:23:59.268Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 174
File: internal/api/ingest_test.go:111-111
Timestamp: 2026-05-23T01:23:59.268Z
Learning: In WaveHouse Go tests in internal/api/**/*_test.go, use internal/testutil.AssertJSONErrorResponse(t, w) for HTTP error-path JSON assertions. Do not use (or reintroduce) package-local assertJSONErrorResponse helpers. AssertJSONErrorResponse verifies the response Content-Type is application/json, includes the X-Content-Type-Options: nosniff header, and that the JSON body contains an "error" field.
Applied to files:
internal/api/errors_test.gointernal/api/router_test.gointernal/api/stream_test.go
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.
Applied to files:
CHANGELOG.mddocs/src/content/docs/architecture.mdAGENTS.md
🪛 GitHub Check: CodeQL
internal/stream/hub.go
[failure] 232-232: Size computation for allocation may overflow
This operation, which is used in an allocation, involves a potentially large value and might overflow.
🔇 Additional comments (30)
AGENTS.md (5)
31-31: LGTM!
44-44: LGTM!
390-390: LGTM!
403-403: LGTM!
61-61: No change needed:internal/stream/hub.godefinesfilterColumns, so thestream.filterColumnsreference is correct.docs/src/content/docs/architecture.md (6)
31-31: LGTM!
52-52: LGTM!
65-65: LGTM!
79-83: LGTM!
84-92: LGTM!
253-262: LGTM!internal/policy/policy.go (2)
282-282: LGTM! (pending verification thatfilterColumnsexists ininternal/stream/— see prior request)
326-326: LGTM! (pending verification thatfilterColumnsexists ininternal/stream/— see prior request)internal/stream/doc.go (1)
3-11: LGTM!CHANGELOG.md (1)
22-23: No changelog structure fix needed The file already has an## Unreleasedsection above### Changed; the bracketed## [Unreleased]form isn’t used here.> Likely an incorrect or invalid review comment.internal/stream/hub_test.go (2)
20-38: LGTM!Also applies to: 54-64, 71-125, 148-200, 224-266
44-44: 🎯 Functional CorrectnessNo issue here: the repo targets Go 1.26.4, which supports both
strings.SplitSeqandfor i := range 8.> Likely an incorrect or invalid review comment.internal/api/stream.go (1)
18-24: LGTM!Also applies to: 46-50, 80-85, 105-110, 125-147
cmd/wavehouse/main.go (1)
283-287: LGTM!Also applies to: 311-315, 357-359
internal/api/errors_test.go (1)
17-17: LGTM!Also applies to: 197-197
internal/api/router_test.go (1)
13-13: LGTM!Also applies to: 256-256, 312-312, 384-384, 445-445, 496-496, 521-521, 618-618
internal/api/stream_test.go (1)
18-25: LGTM!Also applies to: 53-53, 78-78, 117-117
tests/integration/setup_test.go (1)
36-36: LGTM!Also applies to: 314-327
internal/stream/subscriber.go (1)
3-68: LGTM!internal/stream/bucket.go (1)
5-14: LGTM!Also applies to: 47-64
internal/stream/heartbeat.go (1)
112-112: LGTM!internal/stream/bucket_test.go (1)
34-36: LGTM!Also applies to: 56-61, 70-75, 83-95
internal/stream/heartbeat_test.go (1)
69-69: LGTM!Also applies to: 110-110
internal/stream/metrics.go (1)
27-27: LGTM!Also applies to: 43-45, 75-83
internal/stream/subscriber_test.go (1)
11-12: LGTM!Also applies to: 30-41
…mes) - project: fail closed when a policy store is wired but the payload isn't a valid EventMessage, so malformed JSON on ingest.<table> can't bypass column filtering (passthrough now only when no store is configured). - wireFrame: build by append (no payload-derived allocation size — clears the CodeQL go/allocation-size-overflow alert) and emit a "data:" prefix per newline-split line so a multi-line payload stays valid SSE. - ReplayFrame becomes a *Hub method using the Hub's own policy store, so replay can't project against a different (or nil) policy than the live fan-out; drop the now-redundant StreamHandler.PolicyStore field and its main.go wiring. - tests: table-driven filterColumns / passthrough+fail-closed / ReplayFrame, guard both frame buffers before indexing, and a wireFrame multi-line test. Addresses CodeRabbit + CodeQL feedback on #353. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF
Code Coverage OverviewLanguages: Go GoThe overall coverage in the branch remains at 90%, unchanged from the branch. Show a code coverage summary of the most impacted files.
Code Coverage is in Public Preview. Learn more and provide us with your feedback. |
What
First PR of the #294 SSE delivery-path throughput epic: move the broadcast hub into
internal/streamand project/serialize each live event once per(topic, role)instead of once per subscriber, building on theinternal/streamprimitives from #346.Today every connection's read loop independently runs
json.Unmarshal → policy.Evaluate → filterEventColumns → json.Marshal(plus a second unmarshal just to read theid:timestamp) on the same event — byte-identical work repeated N times. For a single-role audience (the public dashboard, every viewerpublic) that's the measured ~2 270 deliveries/s ceiling. This collapses N re-projections to 1.How
internal/stream/hub.go(new) —Hub. Subscribers register under(topic, role).Broadcastdecodes the event once, snapshots the policy once, then per subscribed role applies column policy once, builds one SSE frame, and fans it to that role'sBucket. Skips the decode entirely when nobody is listening.(topic, role)is the whole key — claims don't matter. The stream path's only per-subscriber transform is column filtering (policy.IsColumnAllowed), which derives solely from the role+table policy entry. Claims feed only the row-levelWHERE/CHECK, which the stream path never applies — so the projection is byte-identical for every subscriber of a(role, table). Documented as an invariant in code: if row-level filtering is ever added to streaming, the key must take claims into account.selectcases (keepalive vs. per-subscriber event) collapse into one pump over a singleSubscriber.Frames()queue of typedFrame{Kind, Data}. The queue grows from cap 1 (keepalive-only) to 64 so live events buffer while the handler is mid-write.stream.ReplayFrame; live frames carry the sameid: <received_timestamp>, soLast-Event-ID/?since=resumption and the SSE wire format are byte-for-byte identical (no SDK change).wavehouse_sse_dropped_frames_total(silent before). An inertSubscriber.Evicted()seam is wired for the eviction follow-up.internal/api/transform.go(transformForClienthad no non-test caller).Scope / staging
This PR is the architecture shift only. Already done by #346: the per-delivery OTel span (another #294 checkbox). Deferred to follow-ups:
Evicted()→ handler disconnects → client reconnects + gap-fills.Testing
internal/stream/{hub,filter,subscriber,bucket,heartbeat}_test.go: project-once-per-role (with a shared-backing-array assertion proving a single serialization), column-filter + table-denial, topic isolation, passthrough/invalid payloads, bucket/topic GC, the drop-metric increment,ReplayFrame, and a concurrent add/remove/broadcast race.make cigreen: unit 89.5% / integration / e2e (67 passed) + every coverage gate, all under-race.Docs
architecture.md(diagram, package tree,stream/section, Streaming Path),AGENTS.md,CHANGELOG.md, andinternal/stream/doc.goare in sync. SSE wire format / event payload (api.md) is unchanged.Part of #294 (epic — not auto-closed).
🤖 Generated with Claude Code
https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF