Skip to content
Merged
42 changes: 16 additions & 26 deletions aggregate/event_sourced_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@ import (
"context"
"fmt"

"golang.org/x/sync/errgroup"

"github.com/get-eventually/go-eventually/event"
"github.com/get-eventually/go-eventually/serde"
"github.com/get-eventually/go-eventually/version"
)

// RehydrateFromEvents rehydrates an Aggregate Root from a read-only Event Stream.
func RehydrateFromEvents[I ID](root Root[I], eventStream event.StreamRead) error {
for event := range eventStream {
if err := root.Apply(event.Message); err != nil {
// RehydrateFromEvents rehydrates an Aggregate Root from a Stream of persisted
// Domain Events.
//
// The Stream is iterated to completion or until the Aggregate Root's Apply
// method returns an error. After iteration, the stream's terminal error (if
// any) is checked via Stream.Err.
func RehydrateFromEvents[I ID](root Root[I], stream *event.Stream) error {
for evt := range stream.Iter() {
if err := root.Apply(evt.Message); err != nil {
return fmt.Errorf("aggregate.RehydrateFromEvents: failed to record event, %w", err)
}

root.setVersion(event.Version)
root.setVersion(evt.Version)
}

if err := stream.Err(); err != nil {
return fmt.Errorf("aggregate.RehydrateFromEvents: streaming failed, %w", err)
}

return nil
Expand Down Expand Up @@ -74,31 +81,14 @@ func NewEventSourcedRepository[I ID, T Root[I]](eventStore event.Store, typ Type
func (repo EventSourcedRepository[I, T]) Get(ctx context.Context, id I) (T, error) {
var zeroValue T

ctx, cancel := context.WithCancel(ctx)
defer cancel()

streamID := event.StreamID(id.String())
eventStream := make(event.Stream, 1)

group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
if err := repo.eventStore.Stream(ctx, eventStream, streamID, version.SelectFromBeginning); err != nil {
return fmt.Errorf("aggregate.EventSourcedRepository: failed while reading event from stream, %w", err)
}

return nil
})
stream := repo.eventStore.Stream(ctx, streamID, version.SelectFromBeginning)

root := repo.typ.Factory()

if err := RehydrateFromEvents(root, eventStream); err != nil {
if err := RehydrateFromEvents(root, stream); err != nil {
return zeroValue, fmt.Errorf("aggregate.EventSourcedRepository: failed to rehydrate aggregate root, %w", err)
}

if err := group.Wait(); err != nil {
return zeroValue, err
}

if root.Version() == 0 {
return zeroValue, ErrRootNotFound
}
Expand Down
56 changes: 31 additions & 25 deletions event/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,49 @@ package event
import (
"context"

"github.com/get-eventually/go-eventually/message"
"github.com/get-eventually/go-eventually/version"
)

// Stream represents a stream of persisted Domain Events coming from some
// stream-able source of data, like an Event Store.
type Stream = chan Persisted

// StreamWrite provides write-only access to an event.Stream object.
type StreamWrite chan<- Persisted
// Stream is a single-use, iterator-backed sequence of persisted Domain Events
// coming from some stream-able source of data, like an Event Store.
//
// Stream is an alias for message.Stream[Persisted]. See [message.Stream] for
// the full iteration and error-reporting contract.
type Stream = message.Stream[Persisted]

// StreamRead provides read-only access to an event.Stream object.
type StreamRead <-chan Persisted
// NewStream wraps a producer into a Stream. Convenience re-export of
// [message.NewStream] for values of type [Persisted].
func NewStream(produce func(yield func(Persisted) bool) error) *Stream {
return message.NewStream(produce)
}

// SliceToStream converts a slice of event.Persisted domain events to an event.Stream type.
// SliceToStream returns a Stream that yields each element of events in order.
//
// The event.Stream channel has the same buffer size as the input slice.
//
// The channel returned by the function contains all the original slice elements
// and is already closed.
func SliceToStream(events []Persisted) Stream {
ch := make(chan Persisted, len(events))
defer close(ch)

for _, event := range events {
ch <- event
}
// Useful for tests and for adapting fully-buffered results.
func SliceToStream(events []Persisted) *Stream {
return NewStream(func(yield func(Persisted) bool) error {
for _, evt := range events {
if !yield(evt) {
return nil
}
}

return ch
return nil
})
}

// Streamer is an event.Store trait used to open a specific Event Stream and stream it back
// in the application.
// Streamer is an event.Store trait used to open a specific Event Stream and
// stream it back in the application.
//
// Implementations should respect ctx cancellation between yields by checking
// ctx.Err() at loop boundaries inside the producer.
type Streamer interface {
Stream(ctx context.Context, stream StreamWrite, id StreamID, selector version.Selector) error
Stream(ctx context.Context, id StreamID, selector version.Selector) *Stream
}

// Appender is an event.Store trait used to append new Domain Events in the Event Stream.
// Appender is an event.Store trait used to append new Domain Events in the
// Event Stream.
type Appender interface {
Append(ctx context.Context, id StreamID, expected version.Check, events ...Envelope) (version.Version, error)
}
Expand Down
72 changes: 32 additions & 40 deletions event/store_inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,50 @@ func NewInMemoryStore() *InMemoryStore {
}
}

func contextErr(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return fmt.Errorf("event.InMemoryStore: context error, %w", err)
}

return nil
}

// Stream streams committed events in the Event Store onto the provided EventStream,
// from the specified Global Sequence Number in `from`, based on the provided stream.Target.
// Stream returns a Stream over the committed events for the given Event Stream,
// filtered by the provided version.Selector.
//
// Note: this call is synchronous, and will return when all the Events
// have been successfully written to the provided EventStream, or when
// the context has been canceled.
// The returned Stream holds a read-lock on the underlying store for the
// duration of iteration; long-paused iterations will block concurrent writers.
//
// This method fails only when the context is canceled.
// Iteration stops if the consumer abandons the range loop or if the context
// is canceled between yields.
func (es *InMemoryStore) Stream(
ctx context.Context,
eventStream StreamWrite,
id StreamID,
selector version.Selector,
) error {
es.mx.RLock()
defer es.mx.RUnlock()
defer close(eventStream)

events, ok := es.events[id]
if !ok {
return nil
}
) *Stream {
return NewStream(func(yield func(Persisted) bool) error {
es.mx.RLock()
defer es.mx.RUnlock()

events, ok := es.events[id]
if !ok {
return nil
}

for i, evt := range events {
eventVersion := version.Version(i) + 1
for i, evt := range events {
if err := ctx.Err(); err != nil {
return fmt.Errorf("event.InMemoryStore: context error, %w", err)
}

if eventVersion < selector.From {
continue
}
eventVersion := version.Version(i) + 1

persistedEvent := Persisted{
Envelope: evt,
StreamID: id,
Version: eventVersion,
}
if eventVersion < selector.From {
continue
}

select {
case eventStream <- persistedEvent:
case <-ctx.Done():
return contextErr(ctx)
if !yield(Persisted{
Envelope: evt,
StreamID: id,
Version: eventVersion,
}) {
return nil
}
}
}

return nil
return nil
})
}

// Append inserts the specified Domain Events into the Event Stream specified
Expand Down
121 changes: 121 additions & 0 deletions event/store_inmemory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package event_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/get-eventually/go-eventually/event"
"github.com/get-eventually/go-eventually/message"
"github.com/get-eventually/go-eventually/version"
)

type noopMessage struct{ id int }

func (noopMessage) Name() string { return "noop" }

var _ message.Message = noopMessage{}

const testStreamID event.StreamID = "stream"

func appendN(t *testing.T, store *event.InMemoryStore, n int) {
t.Helper()

envelopes := make([]event.Envelope, 0, n)
for i := range n {
envelopes = append(envelopes, event.Envelope{
Message: noopMessage{id: i},
Metadata: nil,
})
}

_, err := store.Append(t.Context(), testStreamID, version.Any, envelopes...)
require.NoError(t, err)
}

func collectIDs(stream *event.Stream) []int {
ids := make([]int, 0)
for evt := range stream.Iter() {
ids = append(ids, evt.Message.(noopMessage).id) //nolint:errcheck,forcetypeassert // test helper
}

return ids
}

func TestInMemoryStore_Stream_EmptyStream(t *testing.T) {
store := event.NewInMemoryStore()

stream := store.Stream(t.Context(), "missing", version.SelectFromBeginning)

count := 0
for range stream.Iter() {
count++
}

require.NoError(t, stream.Err())
assert.Equal(t, 0, count)
}

func TestInMemoryStore_Stream_YieldsAllEvents(t *testing.T) {
store := event.NewInMemoryStore()
appendN(t, store, 3)

stream := store.Stream(t.Context(), testStreamID, version.SelectFromBeginning)
got := collectIDs(stream)

require.NoError(t, stream.Err())
assert.Equal(t, []int{0, 1, 2}, got)
}

func TestInMemoryStore_Stream_SelectorFiltersFromVersion(t *testing.T) {
store := event.NewInMemoryStore()
appendN(t, store, 5)

stream := store.Stream(t.Context(), testStreamID, version.Selector{From: 3})
got := collectIDs(stream)

require.NoError(t, stream.Err())
// Versions start at 1, so selector.From=3 yields events at index 2,3,4.
assert.Equal(t, []int{2, 3, 4}, got)
}

func TestInMemoryStore_Stream_ConsumerAbandonment(t *testing.T) {
store := event.NewInMemoryStore()
appendN(t, store, 10)

stream := store.Stream(t.Context(), testStreamID, version.SelectFromBeginning)

got := make([]int, 0, 3)
for evt := range stream.Iter() {
//nolint:errcheck,forcetypeassert // test helper
got = append(got, evt.Message.(noopMessage).id)

if len(got) == 3 {
break
}
}

require.NoError(t, stream.Err(), "abandonment is not a failure")
assert.Equal(t, []int{0, 1, 2}, got)
}

func TestInMemoryStore_Stream_ContextCancellation(t *testing.T) {
store := event.NewInMemoryStore()
appendN(t, store, 5)

ctx, cancel := context.WithCancel(t.Context())
cancel() // cancel before iteration starts

stream := store.Stream(ctx, testStreamID, version.SelectFromBeginning)

count := 0
for range stream.Iter() {
count++
}

require.Error(t, stream.Err())
require.ErrorIs(t, stream.Err(), context.Canceled)
assert.Equal(t, 0, count)
}
12 changes: 8 additions & 4 deletions event/store_tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@ import (
// committed to the inner Event Store.
//
// Useful for tests assertion.
//
// TrackingStore embeds a full [Store]: Stream is inherited through the
// embedded value; only Append is overridden to record events as they are
// appended.
type TrackingStore struct {
Appender
Store

mx sync.RWMutex
recorded []Persisted
}

// NewTrackingStore wraps an Event Store to capture events that get
// appended to it.
func NewTrackingStore(appender Appender) *TrackingStore {
func NewTrackingStore(store Store) *TrackingStore {
return &TrackingStore{
Appender: appender,
Store: store,
mx: sync.RWMutex{},
recorded: nil,
}
Expand Down Expand Up @@ -54,7 +58,7 @@ func (es *TrackingStore) Append(
es.mx.Lock()
defer es.mx.Unlock()

v, err := es.Appender.Append(ctx, id, expected, events...)
v, err := es.Store.Append(ctx, id, expected, events...)
if err != nil {
return v, err
}
Expand Down
Loading
Loading