Skip to content

Commit

Permalink
Add CaughtUp and FellBehind message handling in subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Nov 30, 2023
1 parent 28b9403 commit 5fe996c
Show file tree
Hide file tree
Showing 5 changed files with 899 additions and 615 deletions.
4 changes: 4 additions & 0 deletions esdb/subscription_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ type SubscriptionEvent struct {
SubscriptionDropped *SubscriptionDropped
// When a checkpoint was created.
CheckPointReached *Position
// When an event is caught up
CaughtUp *Subscription
// When an event is fell behind
FellBehind *Subscription
}

// PersistentSubscriptionEvent used to handle persistent subscription notifications raised throughout its lifecycle.
Expand Down
12 changes: 12 additions & 0 deletions esdb/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ func (sub *Subscription) Recv() *SubscriptionEvent {
EventAppeared: &resolvedEvent,
}
}
case *api.ReadResp_CaughtUp_:
{
return &SubscriptionEvent{
CaughtUp: sub,
}
}
case *api.ReadResp_FellBehind_:
{
return &SubscriptionEvent{
FellBehind: sub,
}
}
}

sub.client.config.applyLogger(LogWarn, "received unknown message, skipping")
Expand Down
65 changes: 65 additions & 0 deletions esdb/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,74 @@ func SubscriptionTests(t *testing.T, emptyDBClient *esdb.Client, populatedDBClie
t.Run("subscriptionAllFilter", subscriptionAllFilter(emptyDBClient))
t.Run("connectionClosing", connectionClosing(populatedDBClient))
t.Run("subscriptionAllWithCredentialsOverride", subscriptionAllWithCredentialsOverride(populatedDBClient))
t.Run("subscriptionToStreamCaughtUpMessage", subscriptionToStreamCaughtUpMessage(populatedDBClient))
})
}

func subscriptionToStreamCaughtUpMessage(db *esdb.Client) TestCall {
const minSupportedVersion = 23
const expectedEventCount = 6_000
const testTimeout = 1 * time.Minute

return func(t *testing.T) {
if db == nil {
t.Skip("Database client is nil")
}

esdbVersion, err := db.GetServerVersion()
require.NoError(t, err, "Error getting server version")

if esdbVersion.Major < minSupportedVersion {
t.Skip("CaughtUp message is not supported in this version of EventStoreDB")
}

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

streamID := "dataset20M-0"
subscription, err := db.SubscribeToStream(ctx, streamID, esdb.SubscribeToStreamOptions{From: esdb.Start{}})
require.NoError(t, err)
defer subscription.Close()

var caughtUpReceived sync.WaitGroup
caughtUpReceived.Add(1)

go func() {
var count uint64 = 0
defer caughtUpReceived.Done()
allEventsAcknowledged := false

for {
select {
case <-ctx.Done():
t.Error("Context timed out before receiving CaughtUp message")
return
default:
event := subscription.Recv()

if event.EventAppeared != nil {
count++

if count == expectedEventCount {
allEventsAcknowledged = true
}

continue
}

if allEventsAcknowledged && event.CaughtUp != nil {
require.True(t, count >= expectedEventCount, "Did not receive the exact number of expected events before CaughtUp")
return
}
}
}
}()

caughtUpTimedOut := waitWithTimeout(&caughtUpReceived, testTimeout)
require.False(t, caughtUpTimedOut, "Timed out waiting for CaughtUp message")
}
}

func streamSubscriptionDeliversAllEventsInStreamAndListensForNewEvents(db *esdb.Client) TestCall {
return func(t *testing.T) {
if db == nil {
Expand Down
12 changes: 11 additions & 1 deletion protos/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ option go_package = "github.com/EventStore/EventStore-Client-Go/v3/protos/stream

import "shared.proto";
import "status.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

Expand Down Expand Up @@ -99,8 +100,14 @@ message ReadResp {
uint64 first_stream_position = 5;
uint64 last_stream_position = 6;
AllStreamPosition last_all_stream_position = 7;
CaughtUp caught_up = 8;
FellBehind fell_behind = 9;
}

message CaughtUp {}

message FellBehind {}

message ReadEvent {
RecordedEvent event = 1;
RecordedEvent link = 2;
Expand Down Expand Up @@ -215,7 +222,10 @@ message BatchAppendReq {
google.protobuf.Empty any = 4;
google.protobuf.Empty stream_exists = 5;
}
google.protobuf.Timestamp deadline = 6;
oneof deadline_option {
google.protobuf.Timestamp deadline_21_10_0 = 6;
google.protobuf.Duration deadline = 7;
}
}

message ProposedMessage {
Expand Down
Loading

0 comments on commit 5fe996c

Please sign in to comment.