diff --git a/README.md b/README.md index 259c37a..afd75b0 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,11 @@ In the future I might choose to abstract the persistence mechanism away and add - Reading events from the stream - Reading all events - Subscribing (streaming) all events from the event store (real-time) +- Fault-tolerant projection system (Projector) ## Upcoming -Configurable fault-tolerant projection system with offset handling. +Add offset handling and retry mechanism to the default Projector. ## Example diff --git a/eventstore.go b/eventstore.go index eebd5a0..39c1f83 100644 --- a/eventstore.go +++ b/eventstore.go @@ -95,16 +95,18 @@ type gormEvent struct { Meta string } -type appendStreamConfig struct { +// AppendStreamConfig (configure using AppendStreamOpt) +type AppendStreamConfig struct { meta map[string]string } -type appendStreamOpt func(appendStreamConfig) appendStreamConfig +// AppendStreamOpt represents append to stream option +type AppendStreamOpt func(AppendStreamConfig) AppendStreamConfig // WithMetaData is an AppendStream option that can be used to // associate arbitrary meta data to a batch of events to store -func WithMetaData(meta map[string]string) appendStreamOpt { - return func(cfg appendStreamConfig) appendStreamConfig { +func WithMetaData(meta map[string]string) AppendStreamOpt { + return func(cfg AppendStreamConfig) AppendStreamConfig { cfg.meta = meta return cfg @@ -129,7 +131,7 @@ func (es *EventStore) AppendStream( stream string, expectedVer int, evts []interface{}, - opts ...appendStreamOpt) error { + opts ...AppendStreamOpt) error { if len(stream) == 0 { return fmt.Errorf("stream name must be provided") @@ -143,7 +145,7 @@ func (es *EventStore) AppendStream( return fmt.Errorf("please provide at least one event to append") } - cfg := appendStreamConfig{} + cfg := AppendStreamConfig{} for _, opt := range opts { cfg = opt(cfg) @@ -182,17 +184,19 @@ func (es *EventStore) AppendStream( return tx.Error } -type subAllConfig struct { +// SubAllConfig (configure using SubAllOpt) +type SubAllConfig struct { offset int batchSize int } -type subAllOpt func(subAllConfig) subAllConfig +// SubAllOpt represents subscribe to all events option +type SubAllOpt func(SubAllConfig) SubAllConfig // WithOffset is a subscription / read all option that indicates an offset in // the event store from which to start reading events (exclusive) -func WithOffset(offset int) subAllOpt { - return func(cfg subAllConfig) subAllConfig { +func WithOffset(offset int) SubAllOpt { + return func(cfg SubAllConfig) SubAllConfig { cfg.offset = offset return cfg @@ -201,8 +205,8 @@ func WithOffset(offset int) subAllOpt { // WithBatchSize is a subscription/read all option that specifies the read // batch size (limit) when reading events from the event store -func WithBatchSize(size int) subAllOpt { - return func(cfg subAllConfig) subAllConfig { +func WithBatchSize(size int) SubAllOpt { + return func(cfg SubAllConfig) SubAllConfig { cfg.batchSize = size return cfg @@ -226,6 +230,10 @@ type Subscription struct { // Close closes the subscription and halts the polling of sqldb func (s Subscription) Close() { + if s.close == nil { + return + } + s.close <- struct{}{} } @@ -233,7 +241,7 @@ func (s Subscription) Close() { // a subscription and depleting it until io.EOF is encountered // WARNING: Use with caution as this method will read the entire event store // in a blocking fashion (porbably best used in combination with offset option) -func (es *EventStore) ReadAll(ctx context.Context, opts ...subAllOpt) ([]EventData, error) { +func (es *EventStore) ReadAll(ctx context.Context, opts ...SubAllOpt) ([]EventData, error) { sub, err := es.SubscribeAll(ctx, opts...) if err != nil { return nil, err @@ -260,8 +268,8 @@ func (es *EventStore) ReadAll(ctx context.Context, opts ...subAllOpt) ([]EventDa // SubscribeAll will create a subscription which can be used to stream all events in an // orderly fashion. This mechanism should probably be mostly useful for building projections -func (es *EventStore) SubscribeAll(ctx context.Context, opts ...subAllOpt) (Subscription, error) { - cfg := subAllConfig{ +func (es *EventStore) SubscribeAll(ctx context.Context, opts ...SubAllOpt) (Subscription, error) { + cfg := SubAllConfig{ offset: 0, batchSize: 100, } diff --git a/example/README.md b/example/README.md index 2476e1d..2a8d37c 100644 --- a/example/README.md +++ b/example/README.md @@ -2,10 +2,10 @@ This example shows a simplistic but typical event-sourcing use case scenario. -It contains a single "aggregate" (Account) that produces a single account opening event, an accompanying repository implementation along with a simple console output projection. +It contains a single "aggregate" (Account) that produces a single account opening event, an accompanying repository implementation along with a simple console output projection which uses a projector that is included in the eventstore package. ## How to run -Run both `cmd/api/main.go` and `cmd/projections/main.go` from the same directory (so they use the same sqlite db). This will start a simple http api on `localhost:8080` and run projection binary which will subscribe to the event store and wait for incoming events in order to process them. +Run both `cmd/api/main.go` and `cmd/projections/main.go` in any order from the same directory (so they use the same sqlite db). This will start a simple http api on `localhost:8080` and run projection binary which will subscribe to the event store and wait for incoming events in order to process them. In order to simulate the account being open simply hit `http://localhost:8080/accounts/open` from your browser and monitor the terminal window from which you have started the projections binary. diff --git a/example/cmd/projections/main.go b/example/cmd/projections/main.go index 9f1e3e6..30c514d 100644 --- a/example/cmd/projections/main.go +++ b/example/cmd/projections/main.go @@ -2,9 +2,7 @@ package main import ( "context" - "errors" "fmt" - "io" "log" "github.com/aneshas/eventstore" @@ -22,16 +20,13 @@ func main() { defer estore.Close() - ctx, cancel := context.WithCancel(context.Background()) + p := eventstore.NewProjector(estore) - defer cancel() - - sub, err := estore.SubscribeAll(ctx) - checkErr(err) - - defer sub.Close() + p.Add( + NewConsoleOutputProjection(), + ) - runConsoleOutputProjection(sub) + log.Fatal(p.Run(context.Background())) } func checkErr(err error) { @@ -40,35 +35,19 @@ func checkErr(err error) { } } -// An example projection that outputs new accounts to the console -// it might as well be any kind of database, disk, memory etc... -func runConsoleOutputProjection(sub eventstore.Subscription) { - for { - select { - case data := <-sub.EventData: - handle(data) - - case err := <-sub.Err: - if err != nil { - if errors.Is(err, io.EOF) { - // If there are no more events (indicated by io.EOF) - // we choose to break in order to keep the subscription open - // so we are notified of new events as they are appended - break - } - - log.Fatal(err) - } +// NewConsoleOutputProjection consutructs an example projection that outputs +// new accounts to the console. It might as well be to any kind of +// database, disk, memory etc... +func NewConsoleOutputProjection() eventstore.Projection { + return func(data eventstore.EventData) error { + switch data.Event.(type) { + case account.NewAccountOpenned: + evt := data.Event.(account.NewAccountOpenned) + fmt.Printf("Account: #%s | Holder: <%s>\n", evt.ID, evt.Holder) + default: + fmt.Println("not interested in this event") } - } -} -func handle(data eventstore.EventData) { - switch data.Event.(type) { - case account.NewAccountOpenned: - evt := data.Event.(account.NewAccountOpenned) - fmt.Printf("Account: #%s | Holder: <%s>\n", evt.ID, evt.Holder) - default: - fmt.Println("not interested in this event") + return nil } } diff --git a/projection.go b/projection.go new file mode 100644 index 0000000..f61cdec --- /dev/null +++ b/projection.go @@ -0,0 +1,113 @@ +package eventstore + +import ( + "context" + "errors" + "io" + "log" + "sync" +) + +// EventStreamer represents an event stream that can be subscribed to +// This package offers EventStore as EventStreamer implementation +type EventStreamer interface { + SubscribeAll(context.Context, ...SubAllOpt) (Subscription, error) +} + +// NewProjector constructs a Projector +// TODO Configure logger, and retry +func NewProjector(s EventStreamer) *Projector { + return &Projector{ + streamer: s, + logger: log.Default(), + } +} + +// Projector is an event projector which will subscribe to an +// event stream (evet store) and project events to each +// individual projection in an asynchronous manner +type Projector struct { + streamer EventStreamer + projections []Projection + logger *log.Logger +} + +// Projection represents a projection that should be able to handle +// projected events +type Projection func(EventData) error + +// Add effectively registers a projection with the projector +// Make sure to add all of your projections before calling Run +func (p *Projector) Add(projections ...Projection) { + p.projections = append(p.projections, projections...) +} + +// Run will start the projector +func (p *Projector) Run(ctx context.Context) error { + var wg sync.WaitGroup + + for _, projection := range p.projections { + wg.Add(1) + + go func(projection Projection) { + defer wg.Done() + + for { + // TODO retry with backoff + sub, err := p.streamer.SubscribeAll(ctx) + if err != nil { + p.logErr(err) + + return + } + + defer sub.Close() + + if err := p.run(ctx, sub, projection); err != nil { + continue + } + + return + } + }(projection) + } + + wg.Wait() + + return nil +} + +func (p *Projector) run(ctx context.Context, sub Subscription, projection Projection) error { + for { + select { + case data := <-sub.EventData: + err := projection(data) + if err != nil { + p.logErr(err) + // TODO retry with backoff + + return err + } + + case err := <-sub.Err: + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + if errors.Is(err, ErrSubscriptionClosedByClient) { + return nil + } + + p.logErr(err) + } + + case <-ctx.Done(): + return nil + } + } +} + +func (p *Projector) logErr(err error) { + p.logger.Printf("projector error: %v", err) +} diff --git a/projection_test.go b/projection_test.go new file mode 100644 index 0000000..286e149 --- /dev/null +++ b/projection_test.go @@ -0,0 +1,216 @@ +package eventstore_test + +import ( + "context" + "fmt" + "io" + "reflect" + "testing" + "time" + + "github.com/aneshas/eventstore" +) + +type streamer struct { + evts []interface{} + err error + streamErr error + noClose bool +} + +func (s streamer) SubscribeAll(ctx context.Context, opts ...eventstore.SubAllOpt) (eventstore.Subscription, error) { + if s.err != nil { + return eventstore.Subscription{}, s.err + } + + sub := eventstore.Subscription{ + Err: make(chan error, 1), + EventData: make(chan eventstore.EventData), + } + + go func() { + for _, evt := range s.evts { + sub.EventData <- eventstore.EventData{ + Event: evt, + } + + if s.streamErr != nil { + sub.Err <- s.streamErr + continue + } + + sub.Err <- io.EOF + } + + if !s.noClose { + sub.Err <- eventstore.ErrSubscriptionClosedByClient + } + }() + + return sub, nil +} + +func TestShouldProjectEventsToProjections(t *testing.T) { + evts := []interface{}{ + SomeEvent{ + UserID: "user-1", + }, + SomeEvent{ + UserID: "user-2", + }, + SomeEvent{ + UserID: "user-3", + }, + } + + s := streamer{ + evts: evts, + } + + p := eventstore.NewProjector(s) + + var got []interface{} + var anotherGot []interface{} + + p.Add( + func(ed eventstore.EventData) error { + got = append(got, ed.Event) + + return nil + }, + func(ed eventstore.EventData) error { + anotherGot = append(anotherGot, ed.Event) + + return nil + }, + ) + + p.Run(context.TODO()) + + if !reflect.DeepEqual(got, evts) || + !reflect.DeepEqual(anotherGot, evts) { + t.Fatal("all projections should have received all events") + } +} + +func TestShouldRetryAndRestartIfProjectionErrorsOut(t *testing.T) { + evts := []interface{}{ + SomeEvent{ + UserID: "user-1", + }, + } + + s := streamer{ + evts: evts, + } + + p := eventstore.NewProjector(s) + + var got []interface{} + + var times int + + p.Add( + func(ed eventstore.EventData) error { + if times < 3 { + times++ + return fmt.Errorf("some transient error") + } + + got = append(got, ed.Event) + + return nil + }, + ) + + p.Run(context.TODO()) + + if !reflect.DeepEqual(got, evts) { + t.Fatal("projection should have caught up after erroring out") + } +} + +func TestShouldRetrySubscriptionIfProjectionFailsToSubscribe(t *testing.T) { + someErr := fmt.Errorf("some terminal error") + + s := streamer{ + err: someErr, + } + + p := eventstore.NewProjector(s) + + p.Add( + func(ed eventstore.EventData) error { + return nil + }, + ) + + p.Run(context.TODO()) +} + +func TestShouldExitIfContextIsCanceled(t *testing.T) { + evts := []interface{}{ + SomeEvent{ + UserID: "user-1", + }, + } + + s := streamer{ + evts: evts, + noClose: true, + } + + p := eventstore.NewProjector(s) + + p.Add( + func(ed eventstore.EventData) error { + return nil + }, + func(ed eventstore.EventData) error { + return nil + }, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + + defer cancel() + + p.Run(ctx) +} + +func TestShouldContinueProjectingIfStreamingErrorOccurs(t *testing.T) { + evts := []interface{}{ + SomeEvent{ + UserID: "user-1", + }, + SomeEvent{ + UserID: "user-2", + }, + SomeEvent{ + UserID: "user-3", + }, + } + + s := streamer{ + evts: evts, + streamErr: fmt.Errorf("some error"), + } + + p := eventstore.NewProjector(s) + + var got []interface{} + + p.Add( + func(ed eventstore.EventData) error { + got = append(got, ed.Event) + + return nil + }, + ) + + p.Run(context.TODO()) + + if !reflect.DeepEqual(got, evts) { + t.Fatal("projection should have caught up after erroring out") + } +}