Skip to content

feat!: replace channel-based event streaming with message.Stream iterator#330

Merged
ar3s3ru merged 9 commits intomainfrom
feat/event-stream-iterator
Apr 21, 2026
Merged

feat!: replace channel-based event streaming with message.Stream iterator#330
ar3s3ru merged 9 commits intomainfrom
feat/event-stream-iterator

Conversation

@ar3s3ru
Copy link
Copy Markdown
Collaborator

@ar3s3ru ar3s3ru commented Apr 21, 2026

This PR replaces the channel-based event.Stream API with message.Stream[T], a single-use iterator-backed stream type. Event consumers now write idiomatic range loops with a terminal error check instead of managing channels, goroutines, and errgroups.

// before
stream := make(event.Stream, 1)
go func() { _ = store.Stream(ctx, stream, id, sel) }()
for evt := range stream { ... }

// after
stream := store.Stream(ctx, id, sel)
for evt := range stream.Iter() { ... }
if err := stream.Err(); err != nil { ... }

The message.Stream[T] shape uses a producer callback with an error return, which Stream captures and surfaces via Err(). This avoids the iter.Seq2[T, error] tuple-check-on-every-iteration wart while staying composable with the stdlib iter package via Iter().

Breaking changes

  • event.Store.Stream signature: (ctx, StreamWrite, id, selector) error(ctx, id, selector) *event.Stream
  • event.StreamWrite, event.StreamRead, channel-based event.Stream: removed
  • event.SliceToStream: return type chan Persisted*event.Stream
  • aggregate.RehydrateFromEvents: parameter type event.StreamRead*event.Stream
  • event.NewTrackingStore: parameter AppenderStore
  • event.TrackingStore: embeds Store instead of Appender; Stream promoted through embedding

Migration

Use the "before/after" snippet at the top. event.NewTrackingStore callers lose the ability to pass a bare Appender — this is intentional; previously TrackingStore couldn't satisfy event.Store at all, making it awkward to use with the aggregate repository.

ar3s3ru added 8 commits April 21, 2026 20:00
Add message.Stream[T], a single-use, iterator-backed sequence type with
push-producer construction and error-at-end retrieval. The consumer
loop stays free of (value, error) tuple-checking:

  for v := range stream.Iter() { ... }
  if err := stream.Err(); err != nil { ... }

Producers return terminal errors from their closure, which Stream
captures and surfaces via Err(). Consumer abandonment (break) is not
an error. Iter() is single-use: subsequent calls yield an empty
sequence and set Err() to ErrAlreadyIterated.

This type will back the new event.Stream alias in a follow-up commit.
BREAKING CHANGE: event.Store.Stream now returns *event.Stream (an alias
of message.Stream[event.Persisted]) instead of pushing to a provided
channel. event.StreamWrite, event.StreamRead, and the channel-based
event.Stream type are removed.

Consumers migrate from:

    stream := make(event.Stream, 1)
    go func() { _ = store.Stream(ctx, stream, id, sel) }()
    for evt := range stream { ... }

to:

    stream := store.Stream(ctx, id, sel)
    for evt := range stream.Iter() { ... }
    if err := stream.Err(); err != nil { ... }

InMemoryStore is rewritten to use the new producer shape. The ctx check
moves to the loop top inside the producer; consumer abandonment is
handled automatically by yield returning false.

SliceToStream is kept, now returning *event.Stream.
BREAKING CHANGE: event.NewTrackingStore now accepts a full event.Store
(not just event.Appender). event.TrackingStore embeds Store so Stream
is promoted through the wrapper, making TrackingStore a drop-in Store
replacement rather than an Appender-only decorator.

Previously, a TrackingStore could not satisfy event.Store and had to
be paired with a FusedStore to be usable with code that also needed
Stream access (e.g. the aggregate repository). Now it works directly.

Consumers migrate from:

    tracking := event.NewTrackingStore(inner.(event.Appender))
    store := event.FusedStore{Appender: tracking, Streamer: inner}

to:

    store := event.NewTrackingStore(inner)
The Stream method now returns *event.Stream built via event.NewStream.
The producer opens pgx rows with defer-close, iterates rows.Next(),
and yields each persisted event.

The select/case/ctx.Done dance that used to guard the channel send is
gone: yield returning false cleanly terminates the producer, the
deferred rows.Close() fires, and the context error flows through
pgx's own ctx handling in rows.Next(). rows.Err() is still checked
after the loop to surface iteration errors.
…ption

InstrumentedEventStore.Stream now returns *event.Stream built by
wrapping the inner store's stream. Span creation and metric recording
live inside the producer's deferred cleanup, so both complete on
normal iteration, consumer abandonment, and producer error alike.

Note: the eventually.event_store.stream.duration.milliseconds metric
now measures the full iteration duration (including consumer
backpressure), not just the server-side work. This is more truthful
than the previous semantics but represents a behavioral change for
dashboards that track this metric.
With event.Streamer returning an iterator, EventSourcedRepository.Get
collapses to a direct range loop:

  - no errgroup ceremony;
  - no explicit context cancellation scoping;
  - no intermediate channel;
  - no producer goroutine to leak.

RehydrateFromEvents now takes *event.Stream, iterates via Iter(), and
checks stream.Err() after the loop to surface any terminal error.
Add direct tests for the rewritten InMemoryStore.Stream:

  - empty stream yields nothing;
  - yields all events in order from a populated stream;
  - selector.From filters correctly (events at index >= from-1);
  - consumer abandonment (break) terminates iteration cleanly;
  - context cancellation surfaces as a wrapped error via Err().

These supplement the existing aggregate and postgres integration
suites, which already exercise the Stream path end-to-end via
EventSourcedRepository.Get.
With EventSourcedRepository.Get no longer using errgroup, the module
no longer imports golang.org/x/sync directly. Tidy demotes it to
indirect (still required transitively).
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 66.35514% with 36 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.21%. Comparing base (6ae1f8e) to head (0db0683).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
opentelemetry/event_store.go 0.00% 19 Missing ⚠️
postgres/event_store.go 69.44% 6 Missing and 5 partials ⚠️
aggregate/event_sourced_repository.go 50.00% 1 Missing and 3 partials ⚠️
event/store.go 75.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #330      +/-   ##
==========================================
+ Coverage   63.64%   64.21%   +0.57%     
==========================================
  Files          38       39       +1     
  Lines        1400     1414      +14     
==========================================
+ Hits          891      908      +17     
+ Misses        447      444       -3     
  Partials       62       62              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ar3s3ru ar3s3ru merged commit a96b37a into main Apr 21, 2026
7 checks passed
@ar3s3ru ar3s3ru deleted the feat/event-stream-iterator branch April 21, 2026 20:43
@github-actions github-actions Bot mentioned this pull request Apr 21, 2026
ar3s3ru added a commit that referenced this pull request Apr 22, 2026
…ming API (#331)

This PR resurrects the `feature/add-todolist-example` branch (archived
at `archive/feature/add-todolist-example`) as a clean reapply on top of
`feat/event-stream-iterator` (#330). The example serves as a real-world
litmus test for the post-migration library API.

It's composed of:

- `TodoList` aggregate with a child `Item` entity, built on
`aggregate.BaseRoot` + `aggregate.RecordThat`.
- A number of command handlers (`CreateTodoList`, `AddTodoListItem`)
implementing `command.Handler[Cmd]`.
- One query handler (`GetTodoList`) implementing `query.Handler[Q, R]`.
- BDD tests via `aggregate.Scenario`, `command.Scenario`.
- A Connect service over HTTP/2 (h2c) with gRPC health + reflection.

The command path loads the aggregate through
`EventSourcedRepository.Get`, which internally streams events through
the new `message.Stream[event.Persisted]` iterator. That's the litmus
test.

## Design choices (departures from the original branch)

- **Connect only.** No `google.api.http` annotations, no `googleapis`
dep. Connect already speaks gRPC + gRPC-Web + Connect-over-HTTP.
- **Commands return `google.protobuf.Empty`.** Clients generate IDs
(UUID) and pass them in the request. Idempotent on retry; no
response-payload coupling.
- **`connectrpc.com/{connect,grpchealth,grpcreflect}`** (not the
deprecated `bufbuild/connect-*` module path).
- **In-memory store.** Swap `event.NewInMemoryStore()` for
`postgres.NewEventStore(...)` in `main.go` to get durability.
- **Nested `go.mod`** with `go.work` to resolve inter-module
dependencies.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant