Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset event checkpoint key property for non sub-page breaks #7638

Merged
merged 3 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 40 additions & 14 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,14 @@ func eventFilterList(amount int) string {
return "(" + strings.Join(eventTypes, ", ") + ")"
}

func reverseStrings(slice []string) []string {
newSlice := make([]string, 0, len(slice))
for i := len(slice) - 1; i >= 0; i-- {
newSlice = append(newSlice, slice[i])
}
return newSlice
}

// searchEventsRaw is a low level function for searching for events. This is kept
// separate from the SearchEvents function in order to allow tests to grab more metadata.
func (l *Log) searchEventsRaw(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]event, string, error) {
Expand All @@ -788,8 +796,12 @@ func (l *Log) searchEventsRaw(fromUTC, toUTC time.Time, namespace string, eventT
var values []event
totalSize := 0
dates := daysBetween(fromUTC, toUTC)
if order == types.EventOrderDescending {
dates = reverseStrings(dates)
}

query := "CreatedAtDate = :date AND CreatedAt BETWEEN :start and :end"
g := l.WithFields(log.Fields{"From": fromUTC, "To": toUTC, "Namespace": namespace, "EventTypes": eventTypes, "Limit": limit, "StartKey": startKey})
g := l.WithFields(log.Fields{"From": fromUTC, "To": toUTC, "Namespace": namespace, "EventTypes": eventTypes, "Limit": limit, "StartKey": startKey, "Order": order})
var left int64
if limit != 0 {
left = int64(limit)
Expand Down Expand Up @@ -865,7 +877,8 @@ dateLoop:
if err != nil {
return nil, "", trace.Wrap(err)
}
g.WithFields(log.Fields{"duration": time.Since(start), "items": len(out.Items)}).Debugf("Query completed.")
g.WithFields(log.Fields{"duration": time.Since(start), "items": len(out.Items), "forward": forward, "iterator": checkpoint.Iterator}).Debugf("Query completed.")
oldIterator := checkpoint.Iterator
checkpoint.Iterator = out.LastEvaluatedKey

for _, item := range out.Items {
Expand Down Expand Up @@ -896,11 +909,15 @@ dateLoop:
// checkpoint is needed for sub-page breaks.
if totalSize+len(data) >= events.MaxEventBytesInResponse {
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0

key, err := getSubPageCheckpoint(&e)
if err != nil {
return nil, "", trace.Wrap(err)
}
checkpoint.EventKey = key

// We need to reset the iterator so we get the previous page again.
checkpoint.Iterator = oldIterator
break dateLoop
}

Expand All @@ -910,6 +927,7 @@ dateLoop:

if left == 0 {
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0
checkpoint.EventKey = ""
break dateLoop
}
}
Expand Down Expand Up @@ -1382,19 +1400,27 @@ func (l *Log) deleteAllItems() error {
},
})
}
if len(requests) == 0 {
return nil
}
req, _ := l.svc.BatchWriteItemRequest(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
l.Tablename: requests,
},
})
err = req.Send()
err = convertError(err)
if err != nil {
return trace.Wrap(err)

for len(requests) > 0 {
top := 25
if top > len(requests) {
top = len(requests)
}
chunk := requests[:top]
requests = requests[top:]

req, _ := l.svc.BatchWriteItemRequest(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
l.Tablename: chunk,
},
})
err = req.Send()
err = convertError(err)
if err != nil {
return trace.Wrap(err)
}
}

return nil
}

Expand Down
49 changes: 49 additions & 0 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"sort"
"strconv"
Expand Down Expand Up @@ -95,6 +96,54 @@ func (s *DynamoeventsSuite) TestPagination(c *check.C) {
s.EventPagination(c)
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randStringAlpha(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

func (s *DynamoeventsSuite) TestSizeBreak(c *check.C) {
const eventSize = 50 * 1024
blob := randStringAlpha(eventSize)

const eventCount int = 10
for i := 0; i < eventCount; i++ {
err := s.Log.EmitAuditEventLegacy(events.UserLocalLoginE, events.EventFields{
events.LoginMethod: events.LoginMethodSAML,
events.AuthAttemptSuccess: true,
events.EventUser: "bob",
events.EventTime: s.Clock.Now().UTC().Add(time.Second * time.Duration(i)),
"test.data": blob,
})
c.Assert(err, check.IsNil)
}

var checkpoint string
events := make([]apievents.AuditEvent, 0)

for {
fetched, lCheckpoint, err := s.log.SearchEvents(s.Clock.Now().UTC().Add(-time.Hour), s.Clock.Now().UTC().Add(time.Hour), apidefaults.Namespace, nil, eventCount, types.EventOrderDescending, checkpoint)
c.Assert(err, check.IsNil)
checkpoint = lCheckpoint
events = append(events, fetched...)

if checkpoint == "" {
break
}
}

lastTime := s.Clock.Now().UTC().Add(time.Hour)

for _, event := range events {
c.Assert(event.GetTime().Before(lastTime), check.Equals, true)
lastTime = event.GetTime()
}
}

func (s *DynamoeventsSuite) TestSessionEventsCRUD(c *check.C) {
s.SessionEventsCRUD(c)

Expand Down