Skip to content

Commit

Permalink
Reset event checkpoint key property for non sub-page breaks (#7638)
Browse files Browse the repository at this point in the history
  • Loading branch information
xacrimon committed Aug 2, 2021
1 parent 83bdca2 commit bbcdb81
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 14 deletions.
54 changes: 40 additions & 14 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,14 @@ func eventFilterList(amount int) string {
return "(" + strings.Join(eventTypes, ", ") + ")"
}

func reverseStrings(slice []string) []string {
newSlice := make([]string, 0, len(slice))
for i := len(slice) - 1; i >= 0; i-- {
newSlice = append(newSlice, slice[i])
}
return newSlice
}

// searchEventsRaw is a low level function for searching for events. This is kept
// separate from the SearchEvents function in order to allow tests to grab more metadata.
func (l *Log) searchEventsRaw(fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]event, string, error) {
Expand All @@ -789,8 +797,12 @@ func (l *Log) searchEventsRaw(fromUTC, toUTC time.Time, namespace string, eventT
var values []event
totalSize := 0
dates := daysBetween(fromUTC, toUTC)
if order == types.EventOrderDescending {
dates = reverseStrings(dates)
}

query := "CreatedAtDate = :date AND CreatedAt BETWEEN :start and :end"
g := l.WithFields(log.Fields{"From": fromUTC, "To": toUTC, "Namespace": namespace, "EventTypes": eventTypes, "Limit": limit, "StartKey": startKey})
g := l.WithFields(log.Fields{"From": fromUTC, "To": toUTC, "Namespace": namespace, "EventTypes": eventTypes, "Limit": limit, "StartKey": startKey, "Order": order})
var left int64
if limit != 0 {
left = int64(limit)
Expand Down Expand Up @@ -866,7 +878,8 @@ dateLoop:
if err != nil {
return nil, "", trace.Wrap(err)
}
g.WithFields(log.Fields{"duration": time.Since(start), "items": len(out.Items)}).Debugf("Query completed.")
g.WithFields(log.Fields{"duration": time.Since(start), "items": len(out.Items), "forward": forward, "iterator": checkpoint.Iterator}).Debugf("Query completed.")
oldIterator := checkpoint.Iterator
checkpoint.Iterator = out.LastEvaluatedKey

for _, item := range out.Items {
Expand Down Expand Up @@ -897,11 +910,15 @@ dateLoop:
// checkpoint is needed for sub-page breaks.
if totalSize+len(data) >= events.MaxEventBytesInResponse {
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0

key, err := getSubPageCheckpoint(&e)
if err != nil {
return nil, "", trace.Wrap(err)
}
checkpoint.EventKey = key

// We need to reset the iterator so we get the previous page again.
checkpoint.Iterator = oldIterator
break dateLoop
}

Expand All @@ -911,6 +928,7 @@ dateLoop:

if left == 0 {
hasLeft = i+1 != len(dates) || len(checkpoint.Iterator) != 0
checkpoint.EventKey = ""
break dateLoop
}
}
Expand Down Expand Up @@ -1383,19 +1401,27 @@ func (l *Log) deleteAllItems() error {
},
})
}
if len(requests) == 0 {
return nil
}
req, _ := l.svc.BatchWriteItemRequest(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
l.Tablename: requests,
},
})
err = req.Send()
err = convertError(err)
if err != nil {
return trace.Wrap(err)

for len(requests) > 0 {
top := 25
if top > len(requests) {
top = len(requests)
}
chunk := requests[:top]
requests = requests[top:]

req, _ := l.svc.BatchWriteItemRequest(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
l.Tablename: chunk,
},
})
err = req.Send()
err = convertError(err)
if err != nil {
return trace.Wrap(err)
}
}

return nil
}

Expand Down
49 changes: 49 additions & 0 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"sort"
"strconv"
Expand Down Expand Up @@ -94,6 +95,54 @@ func (s *DynamoeventsSuite) TestPagination(c *check.C) {
s.EventPagination(c)
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randStringAlpha(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

func (s *DynamoeventsSuite) TestSizeBreak(c *check.C) {
const eventSize = 50 * 1024
blob := randStringAlpha(eventSize)

const eventCount int = 10
for i := 0; i < eventCount; i++ {
err := s.Log.EmitAuditEventLegacy(events.UserLocalLoginE, events.EventFields{
events.LoginMethod: events.LoginMethodSAML,
events.AuthAttemptSuccess: true,
events.EventUser: "bob",
events.EventTime: s.Clock.Now().UTC().Add(time.Second * time.Duration(i)),
"test.data": blob,
})
c.Assert(err, check.IsNil)
}

var checkpoint string
events := make([]apievents.AuditEvent, 0)

for {
fetched, lCheckpoint, err := s.log.SearchEvents(s.Clock.Now().UTC().Add(-time.Hour), s.Clock.Now().UTC().Add(time.Hour), apidefaults.Namespace, nil, eventCount, types.EventOrderDescending, checkpoint)
c.Assert(err, check.IsNil)
checkpoint = lCheckpoint
events = append(events, fetched...)

if checkpoint == "" {
break
}
}

lastTime := s.Clock.Now().UTC().Add(time.Hour)

for _, event := range events {
c.Assert(event.GetTime().Before(lastTime), check.Equals, true)
lastTime = event.GetTime()
}
}

func (s *DynamoeventsSuite) TestSessionEventsCRUD(c *check.C) {
s.SessionEventsCRUD(c)

Expand Down

0 comments on commit bbcdb81

Please sign in to comment.