feat!: replace channel-based event streaming with message.Stream iterator#330
Merged
feat!: replace channel-based event streaming with message.Stream iterator#330
Conversation
Add message.Stream[T], a single-use, iterator-backed sequence type with
push-producer construction and error-at-end retrieval. The consumer
loop stays free of (value, error) tuple-checking:
for v := range stream.Iter() { ... }
if err := stream.Err(); err != nil { ... }
Producers return terminal errors from their closure, which Stream
captures and surfaces via Err(). Consumer abandonment (break) is not
an error. Iter() is single-use: subsequent calls yield an empty
sequence and set Err() to ErrAlreadyIterated.
This type will back the new event.Stream alias in a follow-up commit.
BREAKING CHANGE: event.Store.Stream now returns *event.Stream (an alias
of message.Stream[event.Persisted]) instead of pushing to a provided
channel. event.StreamWrite, event.StreamRead, and the channel-based
event.Stream type are removed.
Consumers migrate from:
stream := make(event.Stream, 1)
go func() { _ = store.Stream(ctx, stream, id, sel) }()
for evt := range stream { ... }
to:
stream := store.Stream(ctx, id, sel)
for evt := range stream.Iter() { ... }
if err := stream.Err(); err != nil { ... }
InMemoryStore is rewritten to use the new producer shape. The ctx check
moves to the loop top inside the producer; consumer abandonment is
handled automatically by yield returning false.
SliceToStream is kept, now returning *event.Stream.
BREAKING CHANGE: event.NewTrackingStore now accepts a full event.Store
(not just event.Appender). event.TrackingStore embeds Store so Stream
is promoted through the wrapper, making TrackingStore a drop-in Store
replacement rather than an Appender-only decorator.
Previously, a TrackingStore could not satisfy event.Store and had to
be paired with a FusedStore to be usable with code that also needed
Stream access (e.g. the aggregate repository). Now it works directly.
Consumers migrate from:
tracking := event.NewTrackingStore(inner.(event.Appender))
store := event.FusedStore{Appender: tracking, Streamer: inner}
to:
store := event.NewTrackingStore(inner)
The Stream method now returns *event.Stream built via event.NewStream. The producer opens pgx rows with defer-close, iterates rows.Next(), and yields each persisted event. The select/case/ctx.Done dance that used to guard the channel send is gone: yield returning false cleanly terminates the producer, the deferred rows.Close() fires, and the context error flows through pgx's own ctx handling in rows.Next(). rows.Err() is still checked after the loop to surface iteration errors.
…ption InstrumentedEventStore.Stream now returns *event.Stream built by wrapping the inner store's stream. Span creation and metric recording live inside the producer's deferred cleanup, so both complete on normal iteration, consumer abandonment, and producer error alike. Note: the eventually.event_store.stream.duration.milliseconds metric now measures the full iteration duration (including consumer backpressure), not just the server-side work. This is more truthful than the previous semantics but represents a behavioral change for dashboards that track this metric.
With event.Streamer returning an iterator, EventSourcedRepository.Get collapses to a direct range loop: - no errgroup ceremony; - no explicit context cancellation scoping; - no intermediate channel; - no producer goroutine to leak. RehydrateFromEvents now takes *event.Stream, iterates via Iter(), and checks stream.Err() after the loop to surface any terminal error.
Add direct tests for the rewritten InMemoryStore.Stream: - empty stream yields nothing; - yields all events in order from a populated stream; - selector.From filters correctly (events at index >= from-1); - consumer abandonment (break) terminates iteration cleanly; - context cancellation surfaces as a wrapped error via Err(). These supplement the existing aggregate and postgres integration suites, which already exercise the Stream path end-to-end via EventSourcedRepository.Get.
With EventSourcedRepository.Get no longer using errgroup, the module no longer imports golang.org/x/sync directly. Tidy demotes it to indirect (still required transitively).
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #330 +/- ##
==========================================
+ Coverage 63.64% 64.21% +0.57%
==========================================
Files 38 39 +1
Lines 1400 1414 +14
==========================================
+ Hits 891 908 +17
+ Misses 447 444 -3
Partials 62 62 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ar3s3ru
added a commit
that referenced
this pull request
Apr 22, 2026
…ming API (#331) This PR resurrects the `feature/add-todolist-example` branch (archived at `archive/feature/add-todolist-example`) as a clean reapply on top of `feat/event-stream-iterator` (#330). The example serves as a real-world litmus test for the post-migration library API. It's composed of: - `TodoList` aggregate with a child `Item` entity, built on `aggregate.BaseRoot` + `aggregate.RecordThat`. - A number of command handlers (`CreateTodoList`, `AddTodoListItem`) implementing `command.Handler[Cmd]`. - One query handler (`GetTodoList`) implementing `query.Handler[Q, R]`. - BDD tests via `aggregate.Scenario`, `command.Scenario`. - A Connect service over HTTP/2 (h2c) with gRPC health + reflection. The command path loads the aggregate through `EventSourcedRepository.Get`, which internally streams events through the new `message.Stream[event.Persisted]` iterator. That's the litmus test. ## Design choices (departures from the original branch) - **Connect only.** No `google.api.http` annotations, no `googleapis` dep. Connect already speaks gRPC + gRPC-Web + Connect-over-HTTP. - **Commands return `google.protobuf.Empty`.** Clients generate IDs (UUID) and pass them in the request. Idempotent on retry; no response-payload coupling. - **`connectrpc.com/{connect,grpchealth,grpcreflect}`** (not the deprecated `bufbuild/connect-*` module path). - **In-memory store.** Swap `event.NewInMemoryStore()` for `postgres.NewEventStore(...)` in `main.go` to get durability. - **Nested `go.mod`** with `go.work` to resolve inter-module dependencies.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR replaces the channel-based
event.StreamAPI withmessage.Stream[T], a single-use iterator-backed stream type. Event consumers now write idiomatic range loops with a terminal error check instead of managing channels, goroutines, anderrgroups.The
message.Stream[T]shape uses a producer callback with anerrorreturn, whichStreamcaptures and surfaces viaErr(). This avoids theiter.Seq2[T, error]tuple-check-on-every-iteration wart while staying composable with the stdlibiterpackage viaIter().Breaking changes
event.Store.Streamsignature:(ctx, StreamWrite, id, selector) error→(ctx, id, selector) *event.Streamevent.StreamWrite,event.StreamRead, channel-basedevent.Stream: removedevent.SliceToStream: return typechan Persisted→*event.Streamaggregate.RehydrateFromEvents: parameter typeevent.StreamRead→*event.Streamevent.NewTrackingStore: parameterAppender→Storeevent.TrackingStore: embedsStoreinstead ofAppender;Streampromoted through embeddingMigration
Use the "before/after" snippet at the top.
event.NewTrackingStorecallers lose the ability to pass a bareAppender— this is intentional; previouslyTrackingStorecouldn't satisfyevent.Storeat all, making it awkward to use with the aggregate repository.