Skip to content

feat(cdc): add SSE event stream replay#106

Merged
harshitsinghbhandari merged 6 commits into
mainfrom
cdc-sse-events-main
Jun 6, 2026
Merged

feat(cdc): add SSE event stream replay#106
harshitsinghbhandari merged 6 commits into
mainfrom
cdc-sse-events-main

Conversation

@Vaibhaav-Tiwari
Copy link
Copy Markdown
Collaborator

Adds GET /api/v1/events for CDC SSE streaming with durable replay from change_log. Keeps cdc.Broadcaster live-only; the HTTP stream layer subscribes before replay, drains buffered live events, and dedupes by seq. Supports after and Last-Event-ID cursors, documents text/event-stream in OpenAPI, and adds tests for replay/live handoff plus invalid after. Tests: go test ./...

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 4, 2026

Greptile Summary

Adds GET /api/v1/events — a durable SSE CDC stream that replays historical events from change_log and then hands off to the live Broadcaster, with deduplication by sequence number to handle the replay/live overlap window.

  • events.go: Subscribes to the live broadcaster before starting replay so no events are lost between the two phases; the live channel buffer (1024) handles concurrent arrival during replay, and a full buffer cancels the stream rather than dropping events silently, relying on client reconnect + Last-Event-ID replay to recover.
  • specgen/build.go: Generalises the spec builder with a per-status contentTypes override so the 200 response is documented as text/event-stream rather than application/json.
  • Tests cover subscribe-before-replay ordering, live-event deduplication, Last-Event-ID cursor parsing, invalid after rejection, and SSE event-name newline sanitisation.

Confidence Score: 5/5

Safe to merge — the subscribe-before-replay ordering, deduplication logic, and overflow handling are all correct, and the tests cover the critical edge cases.

The core invariants — subscribe before replay, bounded live buffer with cancel-on-overflow, and seq-based deduplication — are implemented correctly and verified by tests. Context propagation through replay and the live loop is correct, header ordering is right, and the SSE event-name sanitisation guards against frame injection. No data-loss or correctness issues were found on the changed paths.

No files require special attention.

Important Files Changed

Filename Overview
backend/internal/httpd/events.go New SSE streaming handler: subscribe-before-replay ordering is correct, live channel overflow handling is safe (cancel + client reconnects), dedup by seq is correct, header ordering and flusher check are right
backend/internal/httpd/events_test.go Solid test coverage for subscribe-before-replay ordering, live-event deduplication, Last-Event-ID header parsing, invalid after rejection, and SSE event name sanitization
backend/internal/httpd/api.go EventsController wired into APIDeps and registered outside the REST timeout group — correct placement for a long-lived SSE route
backend/internal/daemon/daemon.go Simple two-line wiring: store passed as cdc.Source, cdcPipe.Broadcaster passed as cdcSubscriber — matches interface expectations
backend/internal/httpd/apispec/specgen/build.go Adds per-status content-type override and eventOperations(); the 200 response correctly documents text/event-stream instead of the default application/json
backend/internal/httpd/apispec/openapi.yaml Auto-generated spec updated with /api/v1/events endpoint; content type, query param, and error responses match the implementation
frontend/src/api/schema.ts Auto-generated TypeScript types updated to match the new streamEvents operation; paths and response shapes are accurate

Sequence Diagram

sequenceDiagram
    participant Client
    participant EventsController
    participant Broadcaster
    participant ChangeLog as change_log (cdc.Source)

    Client->>EventsController: "GET /api/v1/events?after=N"
    EventsController->>Broadcaster: Subscribe(callback → live chan)
    Note over EventsController: Headers sent (200, text/event-stream)
    EventsController->>ChangeLog: EventsAfter(N, 512)
    ChangeLog-->>EventsController: events[N+1..M]
    Note over Broadcaster,EventsController: Live events buffered in chan during replay
    loop "replay batches until len < 512"
        EventsController->>Client: id/event/data frames (seq N+1..M)
        EventsController->>ChangeLog: EventsAfter(M, 512)
        ChangeLog-->>EventsController: [] (empty)
    end
    loop live drain (dedup by sentSeq)
        EventsController->>Client: "id/event/data frames (seq > M)"
    end
    alt live chan full
        Broadcaster->>EventsController: cancel() context
        EventsController-->>Client: connection closed
        Client->>EventsController: reconnect with Last-Event-ID
    else client disconnect
        Client--xEventsController: TCP close
        EventsController->>Broadcaster: Unsubscribe
    end
Loading

Reviews (5): Last reviewed commit: "fix: harden SSE event stream headers" | Re-trigger Greptile

Comment thread backend/internal/httpd/events.go
Comment thread backend/internal/httpd/events.go Outdated
@Vaibhaav-Tiwari Vaibhaav-Tiwari requested a review from Pritom14 June 4, 2026 18:18
Pritom14 and others added 4 commits June 5, 2026 17:45
Two gaps in events_test.go coverage:

- TestEventsStreamDeduplicatesLiveEventOverlappingReplay: a live event whose
  seq falls within the already-replayed range must be silently dropped by
  writeSSEEvent so the client sees each seq exactly once. Publishes seq=5
  (duplicate of replay) and seq=6 (new) into the live buffer before replay
  returns; asserts the client receives [5,6], not [5,5,6].

- TestEventsStreamParsesLastEventIDHeader: Last-Event-ID header must be used
  as the replay cursor when the after query param is absent. Source returns
  after+1, so receiving seq=8 proves the header was parsed as 7.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Regenerated with npm run api after merging the OpenAPI spec generation
tooling from main. Adds the streamEvents operation and its after cursor
parameter to the TypeScript API types.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@harshitsinghbhandari harshitsinghbhandari added this to the rewrite milestone Jun 5, 2026
@harshitsinghbhandari harshitsinghbhandari merged commit a9b08cd into main Jun 6, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants