Skip to content

Commit

Permalink
[v14] [dynamoevents] fix pagination when limit is reached at the boun…
Browse files Browse the repository at this point in the history
…dary of a day (#44273)

* [dynamoevents] fix pagination when limit is reached at the boundary of a day

DynamoDB stores events using the day as one of the search indexes. When querying, it splits the [from, to] window into the containing days, i.e., [date(from), date(from)+1 day, ..., date(to)], and iterates through them sequentially until there are no more events to consume or the limit is reached. Each day is requested individually and after the previous day (assuming ascending order).

DynamoDB indicates that a client has consumed all events for a day by returning an empty iterator. According to the DynamoDB documentation:

```
// The primary key of the item where the operation stopped, inclusive of the
// previous result set. Use this value to start a new operation, excluding this
// value in the new request.
//
// If LastEvaluatedKey is empty, then the "last page" of results has been processed
// and there is no more data to be retrieved.
//
// If LastEvaluatedKey is not empty, it does not necessarily mean that there
// is more data in the result set. The only way to know when you have reached
// the end of the result set is when LastEvaluatedKey is empty.
```

Given the current indexing, we need to take special consideration when the limit is reached exactly at the end of a day. In this case, the iterator must be moved to the next day. Otherwise, the client receives a reset cursor with only the date set, causing a dead loop since the cursor resets to the beginning of the day, which is a specific issue for the event-handler.

This PR fixes the issue by advancing the cursor to the next day if the limit is reached at the end of a day, and the iterator is empty.

I wrote a test but couldn't reproduce the issue using DynamoDB local, as it never returns an empty cursor. Instead, it returns a cursor that, when called, returns an empty cursor and no events.

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* add tests

* handle an edge case when iteration stops before page ends because maximum response size is reached

* handle edge case where some events could have been skipped when the maximum message size was reached

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato committed Jul 16, 2024
1 parent 64ef0d4 commit 812f070
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 8 deletions.
25 changes: 18 additions & 7 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,12 +1147,6 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
// Because this may break on non page boundaries an additional
// checkpoint is needed for sub-page breaks.
if l.totalSize+len(data) >= events.MaxEventBytesInResponse {
hf := false
if hasLeftFun != nil {
hf = hasLeftFun()
}
l.hasLeft = hf || len(l.checkpoint.Iterator) != 0

key, err := getSubPageCheckpoint(&e)
if err != nil {
return nil, false, trace.Wrap(err)
Expand All @@ -1161,12 +1155,17 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft

// We need to reset the iterator so we get the previous page again.
l.checkpoint.Iterator = oldIterator

// If we stopped because of the size limit, we know that at least one event has to be fetched from the
// current date and old iterator, so we must set it to true independently of the hasLeftFun or
// the new iterator being empty.
l.hasLeft = true

return out, true, nil
}
l.totalSize += len(data)
out = append(out, e)
l.left--

if l.left == 0 {
hf := false
if hasLeftFun != nil {
Expand Down Expand Up @@ -1239,6 +1238,18 @@ dateLoop:
}
values = append(values, result...)
if limitReached {
// If we've reached the limit, we need to determine whether there are more events to fetch from the current date
// or if we need to move the cursor to the next date.
// To do this, we check if the iterator is empty and if the EventKey is empty.
// DynamoDB returns an empty iterator if all events from the current date have been consumed.
// We need to check if the EventKey is empty because it indicates that we left the page midway
// due to reaching the maximum response size. In this case, we need to resume the query
// from the same date and the request's iterator to fetch the remainder of the page.
// If the input iterator is empty but the EventKey is not, we need to resume the query from the same date
// and we shouldn't move to the next date.
if i < len(l.dates)-1 && len(l.checkpoint.Iterator) == 0 && l.checkpoint.EventKey == "" {
l.checkpoint.Date = l.dates[i+1]
}
return values, nil
}
if len(l.checkpoint.Iterator) == 0 {
Expand Down
73 changes: 72 additions & 1 deletion lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func setupDynamoContext(t *testing.T) *dynamoContext {
Tablename: fmt.Sprintf("teleport-test-%v", uuid.New().String()),
Clock: fakeClock,
UIDGenerator: utils.NewFakeUID(),
Endpoint: "http://localhost:8000",
})
require.NoError(t, err)

Expand Down Expand Up @@ -473,6 +472,78 @@ func TestEmitSessionEventsSameIndex(t *testing.T) {
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1, "")))
}

// TestSearchEventsLimitEndOfDay tests if the search events function can handle
// moving the cursor to the next day when the limit is reached exactly at the
// end of the day.
// This only works if tests run against a real DynamoDB instance.
func TestSearchEventsLimitEndOfDay(t *testing.T) {

ctx := context.Background()
tt := setupDynamoContext(t)
blob := "data"
const eventCount int = 10

// create events for two days
for dayDiff := 0; dayDiff < 2; dayDiff++ {
for i := 0; i < eventCount; i++ {
err := tt.suite.Log.EmitAuditEvent(ctx, &apievents.UserLogin{
Method: events.LoginMethodSAML,
Status: apievents.Status{Success: true},
UserMetadata: apievents.UserMetadata{User: "bob"},
Metadata: apievents.Metadata{
Type: events.UserLoginEvent,
Time: tt.suite.Clock.Now().UTC().Add(time.Hour*24*time.Duration(dayDiff) + time.Second*time.Duration(i)),
},
IdentityAttributes: apievents.MustEncodeMap(map[string]interface{}{"test.data": blob}),
})
require.NoError(t, err)
}
}

windowStart := time.Date(
tt.suite.Clock.Now().UTC().Year(),
tt.suite.Clock.Now().UTC().Month(),
tt.suite.Clock.Now().UTC().Day(),
0, /* hour */
0, /* minute */
0, /* second */
0, /* nanosecond */
time.UTC)
windowEnd := windowStart.Add(time.Hour * 24)

data, err := json.Marshal(checkpointKey{
Date: windowStart.Format("2006-01-02"),
})
require.NoError(t, err)
checkpoint := string(data)

var gotEvents []apievents.AuditEvent
for {
fetched, lCheckpoint, err := tt.log.SearchEvents(ctx, events.SearchEventsRequest{
From: windowStart,
To: windowEnd,
Limit: eventCount,
Order: types.EventOrderAscending,
StartKey: checkpoint,
})
require.NoError(t, err)
checkpoint = lCheckpoint
gotEvents = append(gotEvents, fetched...)

if checkpoint == "" {
break
}
}

require.Len(t, gotEvents, eventCount)
lastTime := tt.suite.Clock.Now().UTC().Add(-time.Hour)

for _, event := range gotEvents {
require.True(t, event.GetTime().After(lastTime))
lastTime = event.GetTime()
}
}

// TestValidationErrorsHandling given events that return validation
// errors (large event size and already exists), the emit should handle them
// and succeed on emitting the event when it does support trimming.
Expand Down

0 comments on commit 812f070

Please sign in to comment.