Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions eventstore/inmemory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/get-eventually/go-eventually/eventstore/stream"
)

var _ eventstore.Store = &EventStore{}
var (
_ eventstore.Store = &EventStore{}
_ eventstore.SequenceNumberGetter = &EventStore{}
)

// EventStore is an in-memory eventstore.Store implementation.
type EventStore struct {
Expand All @@ -29,6 +32,14 @@ func NewEventStore() *EventStore {
}
}

func contextErr(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return fmt.Errorf("inmemory.EventStore: context done: %w", err)
}

return nil
}

func (s *EventStore) streamAll(ctx context.Context, es eventstore.EventStream, selectt eventstore.Select) error {
for _, event := range s.events {
if event.SequenceNumber < selectt.From {
Expand Down Expand Up @@ -162,6 +173,12 @@ func (s *EventStore) Stream(
}
}

func (s *EventStore) ensureMapsAreCreated(typ string) {
if v, ok := s.byTypeAndInstance[typ]; !ok || v == nil {
s.byTypeAndInstance[typ] = make(map[string][]int)
}
}

// Append inserts the specified Domain Events into the Event Stream specified
// by the current instance, returning the new version of the Event Stream.
//
Expand Down Expand Up @@ -223,16 +240,11 @@ func (s *EventStore) Append(
return lastCommittedEvent.Version, nil
}

func (s *EventStore) ensureMapsAreCreated(typ string) {
if v, ok := s.byTypeAndInstance[typ]; !ok || v == nil {
s.byTypeAndInstance[typ] = make(map[string][]int)
}
}

func contextErr(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return fmt.Errorf("inmemory.EventStore: context done: %w", err)
}
// LatestSequenceNumber returns the size of the internal Event Log.
// This method never fails.
func (s *EventStore) LatestSequenceNumber(ctx context.Context) (int64, error) {
s.mx.RLock()
defer s.mx.RUnlock()

return nil
return int64(len(s.events)), nil
}
8 changes: 8 additions & 0 deletions eventstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,11 @@ type Streamer interface {
// Stream opens one or more Event Streams as specified by the provided Event Stream target.
Stream(ctx context.Context, es EventStream, target stream.Target, selectt Select) error
}

// SequenceNumberGetter is an Event Store trait that is used to interact with
// sequence numbers (e.g. queries).
type SequenceNumberGetter interface {
// LatestSequenceNumber should return the latest, global Sequence Number
// registered by the Event Store.
LatestSequenceNumber(ctx context.Context) (int64, error)
}
6 changes: 5 additions & 1 deletion subscription/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func (s *CatchUp) Start(ctx context.Context, eventStream eventstore.EventStream)
for {
select {
case <-ctx.Done():
return ctx.Err()
if err := ctx.Err(); err != nil {
return fmt.Errorf("subscription.CatchUp: context error: %w", err)
}

return nil

case <-time.After(b.NextBackOff()):
sequenceNumber, err := s.catchUp(ctx, eventStream, lastSequenceNumber)
Expand Down
5 changes: 3 additions & 2 deletions subscription/catchup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

Expand All @@ -25,14 +26,14 @@ func TestCatchUp(t *testing.T) {
s := new(CatchUpSuite)

logger, err := zap.NewDevelopment()
assert.NoError(t, err)
require.NoError(t, err)

s.makeSubscription = func(store eventstore.Store) subscription.Subscription {
return &subscription.CatchUp{
SubscriptionName: t.Name(),
Checkpointer: checkpoint.NopCheckpointer,
Target: stream.All{},
Logger: zaplogger.Wrap(logger),
Logger: zaplogger.Wrap(logger.With(zap.String("test", t.Name()))),
EventStore: store,
PullEvery: 10 * time.Millisecond,
MaxInterval: 50 * time.Millisecond,
Expand Down
138 changes: 84 additions & 54 deletions subscription/volatile.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,86 @@
package subscription

// NOTE(ar3s3ru): Volatile subscriptions are currently disabled, since the
// support for Subscriptions is currently being deprecated.

// import (
// "context"
// "fmt"

// "github.com/get-eventually/go-eventually/eventstore"
// )

// var _ Subscription = Volatile{}

// // Volatile is a Subscription type that does not keep state of
// // the last Event processed or received, nor survives the Subscription
// // checkpoint between restarts.
// //
// // Use this Subscription type for volatile processes, such as projecting
// // realtime metrics, or when you're only interested in newer events
// // committed to the Event Store.
// type Volatile struct {
// SubscriptionName string
// Target TargetStream
// EventStore eventstore.Subscriber
// }

// // Name is the name of the subscription.
// func (v Volatile) Name() string { return v.SubscriptionName }

// // Start starts the Subscription by opening a subscribing Event Stream
// // using the subscription's Subscriber instance.
// func (v Volatile) Start(ctx context.Context, stream eventstore.EventStream) error {
// var err error

// switch t := v.Target.(type) {
// case TargetStreamAll:
// err = v.EventStore.SubscribeToAll(ctx, stream)
// case TargetStreamType:
// err = v.EventStore.SubscribeToType(ctx, stream, t.Type)
// default:
// return fmt.Errorf("subscription.Volatile: unexpected target type")
// }

// if err != nil {
// return fmt.Errorf("subscription.Volatile: event subscriber exited with error: %w", err)
// }

// return nil
// }

// // Checkpoint is a no-op operation, since the transient nature of the
// // Subscription does not require to persist its current state.
// func (Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error {
// return nil
// }
import (
"context"
"fmt"
"time"

"github.com/get-eventually/go-eventually/eventstore"
"github.com/get-eventually/go-eventually/eventstore/stream"
"github.com/get-eventually/go-eventually/logger"
"github.com/get-eventually/go-eventually/subscription/checkpoint"
)

var _ Subscription = &Volatile{}

// Volatile is a Subscription type that does not keep state of
// the last Event processed or received, nor survives the Subscription
// checkpoint between restarts.
//
// Use this Subscription type for volatile processes, such as projecting
// realtime metrics, or when you're only interested in newer events
// committed to the Event Store.
type Volatile struct {
SubscriptionName string
Target stream.Target
Logger logger.Logger
EventStore interface {
eventstore.Streamer
eventstore.SequenceNumberGetter
}

// PullEvery is the minimum interval between each streaming call to the Event Store.
//
// Defaults to DefaultPullInterval if unspecified or negative value
// has been provided.
PullEvery time.Duration

// MaxInterval is the maximum interval between each streaming call to the Event Store.
// Use this value to ensure a specific eventual consistency window.
//
// Defaults to DefaultMaxPullInterval if unspecified or negative value
// has been provided.
MaxInterval time.Duration

// BufferSize is the size of buffered channels used as EventStreams
// by the Subscription when receiving Events from the Event Store.
//
// Defaults to DefaultPullCatchUpBufferSize if unspecified or a negative
// value has been provided.
BufferSize int
}

// Name is the name of the subscription.
func (v *Volatile) Name() string { return v.SubscriptionName }

// Start starts the Subscription by opening a subscribing Event Stream
// using the subscription's Subscriber instance.
func (v *Volatile) Start(ctx context.Context, es eventstore.EventStream) error {
latestSequenceNumber, err := v.EventStore.LatestSequenceNumber(ctx)
if err != nil {
return fmt.Errorf("subscription.Volatile: failed to get latest sequence number from event store: %w", err)
}

catchUpSubscription := &CatchUp{
SubscriptionName: v.SubscriptionName,
Target: v.Target,
EventStore: v.EventStore,
Checkpointer: checkpoint.FixedCheckpointer{StartingFrom: latestSequenceNumber},
Logger: v.Logger,
PullEvery: v.PullEvery,
MaxInterval: v.MaxInterval,
BufferSize: v.BufferSize,
}

if err := catchUpSubscription.Start(ctx, es); err != nil {
return fmt.Errorf("subscription.Volatile: internal catch-up subscription exited with error: %w", err)
}

return nil
}

// Checkpoint is a no-op operation, since the transient nature of the
// Subscription does not require to persist its current state.
func (*Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error {
return nil
}
Loading