From 897163702e22e41f513d72e21d75d5eb9cefef51 Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Fri, 3 Sep 2021 22:29:49 +0200 Subject: [PATCH 1/3] feat(eventstore): add SequenceNumberGetter interface --- eventstore/inmemory/store.go | 32 +++++++++++++++++++++----------- eventstore/store.go | 8 ++++++++ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/eventstore/inmemory/store.go b/eventstore/inmemory/store.go index 6c2b8110..c5586557 100644 --- a/eventstore/inmemory/store.go +++ b/eventstore/inmemory/store.go @@ -11,6 +11,7 @@ import ( ) var _ eventstore.Store = &EventStore{} +var _ eventstore.SequenceNumberGetter = &EventStore{} // EventStore is an in-memory eventstore.Store implementation. type EventStore struct { @@ -29,6 +30,14 @@ func NewEventStore() *EventStore { } } +func contextErr(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return fmt.Errorf("inmemory.EventStore: context done: %w", err) + } + + return nil +} + func (s *EventStore) streamAll(ctx context.Context, es eventstore.EventStream, selectt eventstore.Select) error { for _, event := range s.events { if event.SequenceNumber < selectt.From { @@ -162,6 +171,12 @@ func (s *EventStore) Stream( } } +func (s *EventStore) ensureMapsAreCreated(typ string) { + if v, ok := s.byTypeAndInstance[typ]; !ok || v == nil { + s.byTypeAndInstance[typ] = make(map[string][]int) + } +} + // Append inserts the specified Domain Events into the Event Stream specified // by the current instance, returning the new version of the Event Stream. // @@ -223,16 +238,11 @@ func (s *EventStore) Append( return lastCommittedEvent.Version, nil } -func (s *EventStore) ensureMapsAreCreated(typ string) { - if v, ok := s.byTypeAndInstance[typ]; !ok || v == nil { - s.byTypeAndInstance[typ] = make(map[string][]int) - } -} - -func contextErr(ctx context.Context) error { - if err := ctx.Err(); err != nil { - return fmt.Errorf("inmemory.EventStore: context done: %w", err) - } +// LatestSequenceNumber returns the size of the internal Event Log. +// This method never fails. +func (s *EventStore) LatestSequenceNumber(ctx context.Context) (int64, error) { + s.mx.RLock() + defer s.mx.RUnlock() - return nil + return int64(len(s.events)), nil } diff --git a/eventstore/store.go b/eventstore/store.go index cb05cd2a..350a76c7 100644 --- a/eventstore/store.go +++ b/eventstore/store.go @@ -107,3 +107,11 @@ type Streamer interface { // Stream opens one or more Event Streams as specified by the provided Event Stream target. Stream(ctx context.Context, es EventStream, target stream.Target, selectt Select) error } + +// SequenceNumberGetter is an Event Store trait that is used to interact with +// sequence numbers (e.g. queries). +type SequenceNumberGetter interface { + // LatestSequenceNumber should return the latest, global Sequence Number + // registered by the Event Store. + LatestSequenceNumber(ctx context.Context) (int64, error) +} From 09c47369c2639e1943386f201782b7c329183f2e Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Fri, 3 Sep 2021 22:30:17 +0200 Subject: [PATCH 2/3] feat(subscription): re-enable Volatile subscription --- subscription/catchup.go | 6 +- subscription/catchup_test.go | 5 +- subscription/volatile.go | 138 ++++++++++++--------- subscription/volatile_test.go | 220 ++++++++++++++++++---------------- 4 files changed, 208 insertions(+), 161 deletions(-) diff --git a/subscription/catchup.go b/subscription/catchup.go index 92538b34..9f0ec258 100644 --- a/subscription/catchup.go +++ b/subscription/catchup.go @@ -82,7 +82,11 @@ func (s *CatchUp) Start(ctx context.Context, eventStream eventstore.EventStream) for { select { case <-ctx.Done(): - return ctx.Err() + if err := ctx.Err(); err != nil { + return fmt.Errorf("subscription.CatchUp: context error: %w", err) + } + + return nil case <-time.After(b.NextBackOff()): sequenceNumber, err := s.catchUp(ctx, eventStream, lastSequenceNumber) diff --git a/subscription/catchup_test.go b/subscription/catchup_test.go index 3f67b156..e6722e99 100644 --- a/subscription/catchup_test.go +++ b/subscription/catchup_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -25,14 +26,14 @@ func TestCatchUp(t *testing.T) { s := new(CatchUpSuite) logger, err := zap.NewDevelopment() - assert.NoError(t, err) + require.NoError(t, err) s.makeSubscription = func(store eventstore.Store) subscription.Subscription { return &subscription.CatchUp{ SubscriptionName: t.Name(), Checkpointer: checkpoint.NopCheckpointer, Target: stream.All{}, - Logger: zaplogger.Wrap(logger), + Logger: zaplogger.Wrap(logger.With(zap.String("test", t.Name()))), EventStore: store, PullEvery: 10 * time.Millisecond, MaxInterval: 50 * time.Millisecond, diff --git a/subscription/volatile.go b/subscription/volatile.go index 631912d5..b15f080e 100644 --- a/subscription/volatile.go +++ b/subscription/volatile.go @@ -1,56 +1,86 @@ package subscription -// NOTE(ar3s3ru): Volatile subscriptions are currently disabled, since the -// support for Subscriptions is currently being deprecated. - -// import ( -// "context" -// "fmt" - -// "github.com/get-eventually/go-eventually/eventstore" -// ) - -// var _ Subscription = Volatile{} - -// // Volatile is a Subscription type that does not keep state of -// // the last Event processed or received, nor survives the Subscription -// // checkpoint between restarts. -// // -// // Use this Subscription type for volatile processes, such as projecting -// // realtime metrics, or when you're only interested in newer events -// // committed to the Event Store. -// type Volatile struct { -// SubscriptionName string -// Target TargetStream -// EventStore eventstore.Subscriber -// } - -// // Name is the name of the subscription. -// func (v Volatile) Name() string { return v.SubscriptionName } - -// // Start starts the Subscription by opening a subscribing Event Stream -// // using the subscription's Subscriber instance. -// func (v Volatile) Start(ctx context.Context, stream eventstore.EventStream) error { -// var err error - -// switch t := v.Target.(type) { -// case TargetStreamAll: -// err = v.EventStore.SubscribeToAll(ctx, stream) -// case TargetStreamType: -// err = v.EventStore.SubscribeToType(ctx, stream, t.Type) -// default: -// return fmt.Errorf("subscription.Volatile: unexpected target type") -// } - -// if err != nil { -// return fmt.Errorf("subscription.Volatile: event subscriber exited with error: %w", err) -// } - -// return nil -// } - -// // Checkpoint is a no-op operation, since the transient nature of the -// // Subscription does not require to persist its current state. -// func (Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error { -// return nil -// } +import ( + "context" + "fmt" + "time" + + "github.com/get-eventually/go-eventually/eventstore" + "github.com/get-eventually/go-eventually/eventstore/stream" + "github.com/get-eventually/go-eventually/logger" + "github.com/get-eventually/go-eventually/subscription/checkpoint" +) + +var _ Subscription = Volatile{} + +// Volatile is a Subscription type that does not keep state of +// the last Event processed or received, nor survives the Subscription +// checkpoint between restarts. +// +// Use this Subscription type for volatile processes, such as projecting +// realtime metrics, or when you're only interested in newer events +// committed to the Event Store. +type Volatile struct { + SubscriptionName string + Target stream.Target + Logger logger.Logger + EventStore interface { + eventstore.Streamer + eventstore.SequenceNumberGetter + } + + // PullEvery is the minimum interval between each streaming call to the Event Store. + // + // Defaults to DefaultPullInterval if unspecified or negative value + // has been provided. + PullEvery time.Duration + + // MaxInterval is the maximum interval between each streaming call to the Event Store. + // Use this value to ensure a specific eventual consistency window. + // + // Defaults to DefaultMaxPullInterval if unspecified or negative value + // has been provided. + MaxInterval time.Duration + + // BufferSize is the size of buffered channels used as EventStreams + // by the Subscription when receiving Events from the Event Store. + // + // Defaults to DefaultPullCatchUpBufferSize if unspecified or a negative + // value has been provided. + BufferSize int +} + +// Name is the name of the subscription. +func (v Volatile) Name() string { return v.SubscriptionName } + +// Start starts the Subscription by opening a subscribing Event Stream +// using the subscription's Subscriber instance. +func (v Volatile) Start(ctx context.Context, stream eventstore.EventStream) error { + latestSequenceNumber, err := v.EventStore.LatestSequenceNumber(ctx) + if err != nil { + return fmt.Errorf("subscription.Volatile: failed to get latest sequence number from event store: %w", err) + } + + catchUpSubscription := &CatchUp{ + SubscriptionName: v.SubscriptionName, + Target: v.Target, + EventStore: v.EventStore, + Checkpointer: checkpoint.FixedCheckpointer{StartingFrom: latestSequenceNumber}, + Logger: v.Logger, + PullEvery: v.PullEvery, + MaxInterval: v.MaxInterval, + BufferSize: v.BufferSize, + } + + if err := catchUpSubscription.Start(ctx, stream); err != nil { + return fmt.Errorf("subscription.Volatile: internal catch-up subscription exited with error: %w", err) + } + + return nil +} + +// Checkpoint is a no-op operation, since the transient nature of the +// Subscription does not require to persist its current state. +func (Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error { + return nil +} diff --git a/subscription/volatile_test.go b/subscription/volatile_test.go index 9328f3cb..2142b448 100644 --- a/subscription/volatile_test.go +++ b/subscription/volatile_test.go @@ -1,106 +1,118 @@ package subscription_test -// import ( -// "context" -// "errors" -// "fmt" -// "sync" -// "testing" - -// "github.com/stretchr/testify/assert" - -// "github.com/get-eventually/go-eventually" -// "github.com/get-eventually/go-eventually/eventstore" -// "github.com/get-eventually/go-eventually/eventstore/inmemory" -// "github.com/get-eventually/go-eventually/internal" -// "github.com/get-eventually/go-eventually/subscription" -// ) - -// func TestVolatile(t *testing.T) { -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() - -// streamID := stream.ID{ -// Type: "my-type", -// Name: "my-instance", -// } - -// eventStore := inmemory.NewEventStore() - -// volatileSubscription := subscription.Volatile{ -// SubscriptionName: "test-volatile-subscription", -// Target: subscription.TargetStreamType{Type: streamID.Type}, -// EventStore: eventStore, -// } - -// _, err := eventStore.Append( -// ctx, -// streamID, -// eventstore.VersionCheck(0), -// eventually.Event{Payload: internal.StringPayload("test-event-should-not-be-received")}, -// ) - -// if !assert.NoError(t, err) { -// return -// } - -// expectedEvents := []eventstore.Event{ -// { -// Stream: streamID, -// Version: 2, -// SequenceNumber: 2, -// Event: eventually.Event{ -// Payload: internal.StringPayload("test-event-should-be-received-0"), -// }, -// }, -// { -// Stream: streamID, -// Version: 3, -// SequenceNumber: 3, -// Event: eventually.Event{ -// Payload: internal.StringPayload("test-event-should-be-received-1"), -// }, -// }, -// { -// Stream: streamID, -// Version: 4, -// SequenceNumber: 4, -// Event: eventually.Event{ -// Payload: internal.StringPayload("test-event-should-be-received-2"), -// }, -// }, -// } - -// wg := new(sync.WaitGroup) -// wg.Add(1) - -// go func() { -// defer cancel() -// wg.Wait() - -// for i := 0; i < 3; i++ { -// _, err = eventStore.Append( -// ctx, -// streamID, -// eventstore.VersionCheck(i+1), -// eventually.Event{ -// Payload: internal.StringPayload(fmt.Sprintf("test-event-should-be-received-%d", i)), -// }, -// ) - -// if !assert.NoError(t, err) { -// return -// } -// } -// }() - -// events, err := eventstore.StreamToSlice(ctx, func(ctx context.Context, es eventstore.EventStream) error { -// // This kinda helps with starting the Subscription first, -// // then wake up the WaitGroup, which will unlock the write goroutine. -// go func() { wg.Done() }() -// return volatileSubscription.Start(ctx, es) -// }) - -// assert.True(t, errors.Is(err, context.Canceled), "err", err) -// assert.Equal(t, expectedEvents, events) -// } +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/get-eventually/go-eventually" + "github.com/get-eventually/go-eventually/eventstore" + "github.com/get-eventually/go-eventually/eventstore/inmemory" + "github.com/get-eventually/go-eventually/eventstore/stream" + "github.com/get-eventually/go-eventually/extension/zaplogger" + "github.com/get-eventually/go-eventually/internal" + "github.com/get-eventually/go-eventually/subscription" +) + +func TestVolatile(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + streamID := stream.ID{ + Type: "my-type", + Name: "my-instance", + } + + eventStore := inmemory.NewEventStore() + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + volatileSubscription := subscription.Volatile{ + SubscriptionName: "test-volatile-subscription", + Target: stream.ByType(streamID.Type), + EventStore: eventStore, + Logger: zaplogger.Wrap(logger.With(zap.String("test", t.Name()))), + PullEvery: 10 * time.Millisecond, + MaxInterval: 50 * time.Millisecond, + } + + _, err = eventStore.Append( + ctx, + streamID, + eventstore.VersionCheck(0), + eventually.Event{Payload: internal.StringPayload("test-event-should-not-be-received")}, + ) + + require.NoError(t, err) + + expectedEvents := []eventstore.Event{ + { + Stream: streamID, + Version: 2, + SequenceNumber: 2, + Event: eventually.Event{ + Payload: internal.StringPayload("test-event-should-be-received-0"), + }, + }, + { + Stream: streamID, + Version: 3, + SequenceNumber: 3, + Event: eventually.Event{ + Payload: internal.StringPayload("test-event-should-be-received-1"), + }, + }, + { + Stream: streamID, + Version: 4, + SequenceNumber: 4, + Event: eventually.Event{ + Payload: internal.StringPayload("test-event-should-be-received-2"), + }, + }, + } + + wg := new(sync.WaitGroup) + wg.Add(1) + + go func() { + defer cancel() + wg.Wait() + + for i := 0; i < 3; i++ { + _, err = eventStore.Append( + ctx, + streamID, + eventstore.VersionCheck(i+1), + eventually.Event{ + Payload: internal.StringPayload(fmt.Sprintf("test-event-should-be-received-%d", i)), + }, + ) + + require.NoError(t, err) + } + + // NOTE: this is bad, I know, and it makes the test kinda unreliable, + // but in order to ensure that the subscription has consumed + // all the events committed before closing the context we gotta wait + // a little bit... + <-time.After(800 * time.Millisecond) + }() + + events, err := eventstore.StreamToSlice(ctx, func(ctx context.Context, es eventstore.EventStream) error { + // This kinda helps with starting the Subscription first, + // then wake up the WaitGroup, which will unlock the write goroutine. + go func() { wg.Done() }() + return volatileSubscription.Start(ctx, es) + }) + + assert.True(t, errors.Is(err, context.Canceled), "err", err) + assert.Equal(t, expectedEvents, events) +} From cccd4b1f7bc93dc782d215b623a1cece4a3bbb5f Mon Sep 17 00:00:00 2001 From: Danilo Cianfrone Date: Fri, 3 Sep 2021 22:38:21 +0200 Subject: [PATCH 3/3] fix: linter complaint --- eventstore/inmemory/store.go | 6 ++++-- subscription/volatile.go | 10 +++++----- subscription/volatile_test.go | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/eventstore/inmemory/store.go b/eventstore/inmemory/store.go index c5586557..d8fc6940 100644 --- a/eventstore/inmemory/store.go +++ b/eventstore/inmemory/store.go @@ -10,8 +10,10 @@ import ( "github.com/get-eventually/go-eventually/eventstore/stream" ) -var _ eventstore.Store = &EventStore{} -var _ eventstore.SequenceNumberGetter = &EventStore{} +var ( + _ eventstore.Store = &EventStore{} + _ eventstore.SequenceNumberGetter = &EventStore{} +) // EventStore is an in-memory eventstore.Store implementation. type EventStore struct { diff --git a/subscription/volatile.go b/subscription/volatile.go index b15f080e..0e07f5b4 100644 --- a/subscription/volatile.go +++ b/subscription/volatile.go @@ -11,7 +11,7 @@ import ( "github.com/get-eventually/go-eventually/subscription/checkpoint" ) -var _ Subscription = Volatile{} +var _ Subscription = &Volatile{} // Volatile is a Subscription type that does not keep state of // the last Event processed or received, nor survives the Subscription @@ -51,11 +51,11 @@ type Volatile struct { } // Name is the name of the subscription. -func (v Volatile) Name() string { return v.SubscriptionName } +func (v *Volatile) Name() string { return v.SubscriptionName } // Start starts the Subscription by opening a subscribing Event Stream // using the subscription's Subscriber instance. -func (v Volatile) Start(ctx context.Context, stream eventstore.EventStream) error { +func (v *Volatile) Start(ctx context.Context, es eventstore.EventStream) error { latestSequenceNumber, err := v.EventStore.LatestSequenceNumber(ctx) if err != nil { return fmt.Errorf("subscription.Volatile: failed to get latest sequence number from event store: %w", err) @@ -72,7 +72,7 @@ func (v Volatile) Start(ctx context.Context, stream eventstore.EventStream) erro BufferSize: v.BufferSize, } - if err := catchUpSubscription.Start(ctx, stream); err != nil { + if err := catchUpSubscription.Start(ctx, es); err != nil { return fmt.Errorf("subscription.Volatile: internal catch-up subscription exited with error: %w", err) } @@ -81,6 +81,6 @@ func (v Volatile) Start(ctx context.Context, stream eventstore.EventStream) erro // Checkpoint is a no-op operation, since the transient nature of the // Subscription does not require to persist its current state. -func (Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error { +func (*Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error { return nil } diff --git a/subscription/volatile_test.go b/subscription/volatile_test.go index 2142b448..1f04b52b 100644 --- a/subscription/volatile_test.go +++ b/subscription/volatile_test.go @@ -34,7 +34,7 @@ func TestVolatile(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) - volatileSubscription := subscription.Volatile{ + volatileSubscription := &subscription.Volatile{ SubscriptionName: "test-volatile-subscription", Target: stream.ByType(streamID.Type), EventStore: eventStore,