Skip to content

Commit

Permalink
Merge pull request #2 from aneshas/projections
Browse files Browse the repository at this point in the history
Projections
  • Loading branch information
aneshas committed Dec 15, 2021
2 parents 87f419e + bf0b78b commit 0f03fcc
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 56 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -16,10 +16,11 @@ In the future I might choose to abstract the persistence mechanism away and add
- Reading events from the stream
- Reading all events
- Subscribing (streaming) all events from the event store (real-time)
- Fault-tolerant projection system (Projector)

## Upcoming

Configurable fault-tolerant projection system with offset handling.
Add offset handling and retry mechanism to the default Projector.

## Example

Expand Down
38 changes: 23 additions & 15 deletions eventstore.go
Expand Up @@ -95,16 +95,18 @@ type gormEvent struct {
Meta string
}

type appendStreamConfig struct {
// AppendStreamConfig (configure using AppendStreamOpt)
type AppendStreamConfig struct {
meta map[string]string
}

type appendStreamOpt func(appendStreamConfig) appendStreamConfig
// AppendStreamOpt represents append to stream option
type AppendStreamOpt func(AppendStreamConfig) AppendStreamConfig

// WithMetaData is an AppendStream option that can be used to
// associate arbitrary meta data to a batch of events to store
func WithMetaData(meta map[string]string) appendStreamOpt {
return func(cfg appendStreamConfig) appendStreamConfig {
func WithMetaData(meta map[string]string) AppendStreamOpt {
return func(cfg AppendStreamConfig) AppendStreamConfig {
cfg.meta = meta

return cfg
Expand All @@ -129,7 +131,7 @@ func (es *EventStore) AppendStream(
stream string,
expectedVer int,
evts []interface{},
opts ...appendStreamOpt) error {
opts ...AppendStreamOpt) error {

if len(stream) == 0 {
return fmt.Errorf("stream name must be provided")
Expand All @@ -143,7 +145,7 @@ func (es *EventStore) AppendStream(
return fmt.Errorf("please provide at least one event to append")
}

cfg := appendStreamConfig{}
cfg := AppendStreamConfig{}

for _, opt := range opts {
cfg = opt(cfg)
Expand Down Expand Up @@ -182,17 +184,19 @@ func (es *EventStore) AppendStream(
return tx.Error
}

type subAllConfig struct {
// SubAllConfig (configure using SubAllOpt)
type SubAllConfig struct {
offset int
batchSize int
}

type subAllOpt func(subAllConfig) subAllConfig
// SubAllOpt represents subscribe to all events option
type SubAllOpt func(SubAllConfig) SubAllConfig

// WithOffset is a subscription / read all option that indicates an offset in
// the event store from which to start reading events (exclusive)
func WithOffset(offset int) subAllOpt {
return func(cfg subAllConfig) subAllConfig {
func WithOffset(offset int) SubAllOpt {
return func(cfg SubAllConfig) SubAllConfig {
cfg.offset = offset

return cfg
Expand All @@ -201,8 +205,8 @@ func WithOffset(offset int) subAllOpt {

// WithBatchSize is a subscription/read all option that specifies the read
// batch size (limit) when reading events from the event store
func WithBatchSize(size int) subAllOpt {
return func(cfg subAllConfig) subAllConfig {
func WithBatchSize(size int) SubAllOpt {
return func(cfg SubAllConfig) SubAllConfig {
cfg.batchSize = size

return cfg
Expand All @@ -226,14 +230,18 @@ type Subscription struct {

// Close closes the subscription and halts the polling of sqldb
func (s Subscription) Close() {
if s.close == nil {
return
}

s.close <- struct{}{}
}

// ReadAll will read all events from the event store by internally creating a
// a subscription and depleting it until io.EOF is encountered
// WARNING: Use with caution as this method will read the entire event store
// in a blocking fashion (porbably best used in combination with offset option)
func (es *EventStore) ReadAll(ctx context.Context, opts ...subAllOpt) ([]EventData, error) {
func (es *EventStore) ReadAll(ctx context.Context, opts ...SubAllOpt) ([]EventData, error) {
sub, err := es.SubscribeAll(ctx, opts...)
if err != nil {
return nil, err
Expand All @@ -260,8 +268,8 @@ func (es *EventStore) ReadAll(ctx context.Context, opts ...subAllOpt) ([]EventDa

// SubscribeAll will create a subscription which can be used to stream all events in an
// orderly fashion. This mechanism should probably be mostly useful for building projections
func (es *EventStore) SubscribeAll(ctx context.Context, opts ...subAllOpt) (Subscription, error) {
cfg := subAllConfig{
func (es *EventStore) SubscribeAll(ctx context.Context, opts ...SubAllOpt) (Subscription, error) {
cfg := SubAllConfig{
offset: 0,
batchSize: 100,
}
Expand Down
4 changes: 2 additions & 2 deletions example/README.md
Expand Up @@ -2,10 +2,10 @@

This example shows a simplistic but typical event-sourcing use case scenario.

It contains a single "aggregate" (Account) that produces a single account opening event, an accompanying repository implementation along with a simple console output projection.
It contains a single "aggregate" (Account) that produces a single account opening event, an accompanying repository implementation along with a simple console output projection which uses a projector that is included in the eventstore package.

## How to run

Run both `cmd/api/main.go` and `cmd/projections/main.go` from the same directory (so they use the same sqlite db). This will start a simple http api on `localhost:8080` and run projection binary which will subscribe to the event store and wait for incoming events in order to process them.
Run both `cmd/api/main.go` and `cmd/projections/main.go` in any order from the same directory (so they use the same sqlite db). This will start a simple http api on `localhost:8080` and run projection binary which will subscribe to the event store and wait for incoming events in order to process them.

In order to simulate the account being open simply hit `http://localhost:8080/accounts/open` from your browser and monitor the terminal window from which you have started the projections binary.
55 changes: 17 additions & 38 deletions example/cmd/projections/main.go
Expand Up @@ -2,9 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"io"
"log"

"github.com/aneshas/eventstore"
Expand All @@ -22,16 +20,13 @@ func main() {

defer estore.Close()

ctx, cancel := context.WithCancel(context.Background())
p := eventstore.NewProjector(estore)

defer cancel()

sub, err := estore.SubscribeAll(ctx)
checkErr(err)

defer sub.Close()
p.Add(
NewConsoleOutputProjection(),
)

runConsoleOutputProjection(sub)
log.Fatal(p.Run(context.Background()))
}

func checkErr(err error) {
Expand All @@ -40,35 +35,19 @@ func checkErr(err error) {
}
}

// An example projection that outputs new accounts to the console
// it might as well be any kind of database, disk, memory etc...
func runConsoleOutputProjection(sub eventstore.Subscription) {
for {
select {
case data := <-sub.EventData:
handle(data)

case err := <-sub.Err:
if err != nil {
if errors.Is(err, io.EOF) {
// If there are no more events (indicated by io.EOF)
// we choose to break in order to keep the subscription open
// so we are notified of new events as they are appended
break
}

log.Fatal(err)
}
// NewConsoleOutputProjection consutructs an example projection that outputs
// new accounts to the console. It might as well be to any kind of
// database, disk, memory etc...
func NewConsoleOutputProjection() eventstore.Projection {
return func(data eventstore.EventData) error {
switch data.Event.(type) {
case account.NewAccountOpenned:
evt := data.Event.(account.NewAccountOpenned)
fmt.Printf("Account: #%s | Holder: <%s>\n", evt.ID, evt.Holder)
default:
fmt.Println("not interested in this event")
}
}
}

func handle(data eventstore.EventData) {
switch data.Event.(type) {
case account.NewAccountOpenned:
evt := data.Event.(account.NewAccountOpenned)
fmt.Printf("Account: #%s | Holder: <%s>\n", evt.ID, evt.Holder)
default:
fmt.Println("not interested in this event")
return nil
}
}
113 changes: 113 additions & 0 deletions projection.go
@@ -0,0 +1,113 @@
package eventstore

import (
"context"
"errors"
"io"
"log"
"sync"
)

// EventStreamer represents an event stream that can be subscribed to
// This package offers EventStore as EventStreamer implementation
type EventStreamer interface {
SubscribeAll(context.Context, ...SubAllOpt) (Subscription, error)
}

// NewProjector constructs a Projector
// TODO Configure logger, and retry
func NewProjector(s EventStreamer) *Projector {
return &Projector{
streamer: s,
logger: log.Default(),
}
}

// Projector is an event projector which will subscribe to an
// event stream (evet store) and project events to each
// individual projection in an asynchronous manner
type Projector struct {
streamer EventStreamer
projections []Projection
logger *log.Logger
}

// Projection represents a projection that should be able to handle
// projected events
type Projection func(EventData) error

// Add effectively registers a projection with the projector
// Make sure to add all of your projections before calling Run
func (p *Projector) Add(projections ...Projection) {
p.projections = append(p.projections, projections...)
}

// Run will start the projector
func (p *Projector) Run(ctx context.Context) error {
var wg sync.WaitGroup

for _, projection := range p.projections {
wg.Add(1)

go func(projection Projection) {
defer wg.Done()

for {
// TODO retry with backoff
sub, err := p.streamer.SubscribeAll(ctx)
if err != nil {
p.logErr(err)

return
}

defer sub.Close()

if err := p.run(ctx, sub, projection); err != nil {
continue
}

return
}
}(projection)
}

wg.Wait()

return nil
}

func (p *Projector) run(ctx context.Context, sub Subscription, projection Projection) error {
for {
select {
case data := <-sub.EventData:
err := projection(data)
if err != nil {
p.logErr(err)
// TODO retry with backoff

return err
}

case err := <-sub.Err:
if err != nil {
if errors.Is(err, io.EOF) {
break
}

if errors.Is(err, ErrSubscriptionClosedByClient) {
return nil
}

p.logErr(err)
}

case <-ctx.Done():
return nil
}
}
}

func (p *Projector) logErr(err error) {
p.logger.Printf("projector error: %v", err)
}

0 comments on commit 0f03fcc

Please sign in to comment.