Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v9] Correctly handle Firestore pagination with DocumentID cursors #13757

Merged
merged 10 commits into from
Sep 16, 2022
54 changes: 21 additions & 33 deletions lib/backend/firestore/firestorebk.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,58 +757,46 @@ func (b *Backend) getIndexParent() string {
}

func (b *Backend) ensureIndexes(adminSvc *apiv1.FirestoreAdminClient) error {
tuples := []*IndexTuple{{
FirstField: keyDocProperty,
SecondField: expiresDocProperty,
SecondFieldOrder: adminpb.Index_IndexField_ASCENDING,
}}
tuples := IndexList{}
tuples.Index(Field(keyDocProperty, adminpb.Index_IndexField_ASCENDING), Field(expiresDocProperty, adminpb.Index_IndexField_ASCENDING))
return EnsureIndexes(b.clientContext, adminSvc, tuples, b.getIndexParent())
}

type IndexTuple struct {
FirstField string
SecondField string
SecondFieldOrder adminpb.Index_IndexField_Order
type IndexList [][]*adminpb.Index_IndexField

func (l *IndexList) Index(fields ...*adminpb.Index_IndexField) {
list := []*adminpb.Index_IndexField{}
list = append(list, fields...)
*l = append(*l, list)
}

func Field(name string, order adminpb.Index_IndexField_Order) *adminpb.Index_IndexField {
return &adminpb.Index_IndexField{
FieldPath: name,
ValueMode: &adminpb.Index_IndexField_Order_{
Order: order,
},
}
}

type indexTask struct {
operation *apiv1.CreateIndexOperation
tuple *IndexTuple
tuple []*adminpb.Index_IndexField
}

// EnsureIndexes is a function used by Firestore events and backend to generate indexes and will block until
// indexes are reported as created
func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tuples []*IndexTuple, indexParent string) error {
func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tuples IndexList, indexParent string) error {
l := log.WithFields(log.Fields{trace.Component: BackendName})

ascendingFieldOrder := &adminpb.Index_IndexField_Order_{
Order: adminpb.Index_IndexField_ASCENDING,
}

var tasks []indexTask

// create the indexes
for _, tuple := range tuples {
secondFieldOrder := &adminpb.Index_IndexField_Order_{
Order: tuple.SecondFieldOrder,
}

fields := []*adminpb.Index_IndexField{
{
FieldPath: tuple.FirstField,
ValueMode: ascendingFieldOrder,
},
{
FieldPath: tuple.SecondField,
ValueMode: secondFieldOrder,
},
}
l.Infof("%v", fields)
operation, err := adminSvc.CreateIndex(ctx, &adminpb.CreateIndexRequest{
Parent: indexParent,
Index: &adminpb.Index{
QueryScope: adminpb.Index_COLLECTION,
Fields: fields,
Fields: tuple,
},
})
if err != nil && status.Code(err) != codes.AlreadyExists {
Expand Down Expand Up @@ -857,7 +845,7 @@ func waitOnIndexCreation(ctx context.Context, l *log.Entry, task indexTask) erro
if err != nil {
return trace.Wrap(err)
}
l.Infof("Creating index for tuple %s-%s with name %s.", task.tuple.FirstField, task.tuple.SecondField, meta.Index)
l.Infof("Creating index for tuple %v with name %s.", task.tuple, meta.Index)

_, err = task.operation.Wait(ctx)
if err != nil {
Expand Down
165 changes: 65 additions & 100 deletions lib/events/firestoreevents/firestoreevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ package firestoreevents
import (
"context"
"encoding/json"
"fmt"
"net/url"
"sort"
"strconv"
"strings"
"time"

"cloud.google.com/go/firestore"
Expand Down Expand Up @@ -123,9 +123,6 @@ const (
// sessionIDDocProperty is used internally to query for records and matches the key in the event struct tag
sessionIDDocProperty = "sessionID"

// eventIndexDocProperty is used internally to query for records and matches the key in the event struct tag
eventIndexDocProperty = "eventIndex"

// createdAtDocProperty is used internally to query for records and matches the key in the event struct tag
createdAtDocProperty = "createdAt"

Expand Down Expand Up @@ -475,54 +472,25 @@ func (l *Log) SearchEvents(fromUTC, toUTC time.Time, namespace string, eventType
return l.searchEventsWithFilter(fromUTC, toUTC, namespace, limit, order, startKey, searchEventsFilter{eventTypes: eventTypes}, "")
}

func (l *Log) searchEventsWithFilter(fromUTC, toUTC time.Time, namespace string, limit int, order types.EventOrder, startKey string, filter searchEventsFilter, sessionID string) ([]apievents.AuditEvent, string, error) {
var eventsArr []apievents.AuditEvent
var estimatedSize int
checkpoint := startKey
left := limit

for {
gotEvents, withSize, withCheckpoint, err := l.searchEventsOnce(fromUTC, toUTC, namespace, left, order, checkpoint, filter, events.MaxEventBytesInResponse-estimatedSize, sessionID)
if nil != err {
return nil, "", trace.Wrap(err)
}

eventsArr = append(eventsArr, gotEvents...)
estimatedSize += withSize
left -= len(gotEvents)
checkpoint = withCheckpoint

if len(checkpoint) == 0 || left <= 0 || estimatedSize >= events.MaxEventBytesInResponse {
break
}
}

return eventsArr, checkpoint, nil
}

func (l *Log) searchEventsOnce(fromUTC, toUTC time.Time, namespace string, limit int, order types.EventOrder, startKey string, filter searchEventsFilter, spaceRemaining int, sessionID string) ([]apievents.AuditEvent, int, string, error) {
g := l.WithFields(log.Fields{"From": fromUTC, "To": toUTC, "Namespace": namespace, "Filter": filter, "Limit": limit, "StartKey": startKey})
func (l *Log) searchEventsWithFilter(fromUTC, toUTC time.Time, namespace string, limit int, order types.EventOrder, lastKey string, filter searchEventsFilter, sessionID string) ([]apievents.AuditEvent, string, error) {
g := l.WithFields(log.Fields{"From": fromUTC, "To": toUTC, "Namespace": namespace, "Filter": filter, "Limit": limit, "StartKey": lastKey})

var lastKey int64
var values []events.EventFields
var parsedStartKey int64
var err error
reachedEnd := false
totalSize := 0
var checkpointParts []string
var checkpointTime int

if startKey != "" {
parsedStartKey, err = strconv.ParseInt(startKey, 10, 64)
if err != nil {
return nil, 0, "", trace.WrapWithMessage(err, "failed to parse startKey, expected integer but found: %q", startKey)
if lastKey != "" {
checkpointParts = strings.Split(lastKey, ":")
if len(checkpointParts) != 2 {
return nil, "", trace.BadParameter("invalid checkpoint key: %q", lastKey)
}
}

modifyquery := func(query firestore.Query) firestore.Query {
if startKey != "" {
return query.StartAfter(parsedStartKey)
checkpointTime, err = strconv.Atoi(checkpointParts[0])
if err != nil {
return nil, "", trace.BadParameter("invalid checkpoint key: %q", lastKey)
}

return query
}

var firestoreOrdering firestore.Direction
Expand All @@ -532,96 +500,95 @@ func (l *Log) searchEventsOnce(fromUTC, toUTC time.Time, namespace string, limit
case types.EventOrderDescending:
firestoreOrdering = firestore.Desc
default:
return nil, 0, "", trace.BadParameter("invalid event order: %v", order)
return nil, "", trace.BadParameter("invalid event order: %v", order)
}

query := modifyquery(l.svc.Collection(l.CollectionName).
query := l.svc.Collection(l.CollectionName).
Where(eventNamespaceDocProperty, "==", apidefaults.Namespace).
Where(createdAtDocProperty, ">=", fromUTC.Unix()).
Where(createdAtDocProperty, "<=", toUTC.Unix()).
OrderBy(createdAtDocProperty, firestoreOrdering)).
Limit(limit)
Where(createdAtDocProperty, "<=", toUTC.Unix())

if sessionID != "" {
query = query.Where(sessionIDDocProperty, "==", sessionID)
}

if len(filter.eventTypes) > 0 {
query = query.Where(eventTypeDocProperty, "in", filter.eventTypes)
}
if sessionID != "" {
query = query.Where(sessionIDDocProperty, "==", sessionID)

query = query.OrderBy(createdAtDocProperty, firestoreOrdering).
OrderBy(firestore.DocumentID, firestore.Asc)

if lastKey != "" {
query = query.StartAfter(checkpointTime, checkpointParts[1])
}

start := time.Now()
docSnaps, err := query.Documents(l.svcContext).GetAll()
batchReadLatencies.Observe(time.Since(start).Seconds())
batchReadRequests.Inc()
if err != nil {
return nil, 0, "", firestorebk.ConvertGRPCError(err)
}

// Correctly detecting if you've reached the end of a query in firestore is
// tricky since it doesn't set any flag when it finds that there are no further events.
// This solution here seems to be the most common, but I haven't been able to find
// any documented hard guarantees on firestore not early returning for some reason like response
// size like DynamoDB does. In short, this should work in all cases for lack of a better solution.
if len(docSnaps) < limit {
reachedEnd = true
return nil, "", firestorebk.ConvertGRPCError(err)
}

g.WithFields(log.Fields{"duration": time.Since(start)}).Debugf("Query completed.")
for _, docSnap := range docSnaps {
var e event
err = docSnap.DataTo(&e)
if err != nil {
return nil, 0, "", firestorebk.ConvertGRPCError(err)
return nil, "", firestorebk.ConvertGRPCError(err)
}

data := []byte(e.Fields)
if totalSize+len(data) >= spaceRemaining {
if totalSize+len(data) >= events.MaxEventBytesInResponse {
break
}

var fields events.EventFields
if err := json.Unmarshal(data, &fields); err != nil {
return nil, 0, "", trace.Errorf("failed to unmarshal event %v", err)
return nil, "", trace.Errorf("failed to unmarshal event %v", err)
}

time := docSnap.Data()[createdAtDocProperty].(int64)
lastKey = strconv.Itoa(int(time)) + ":" + docSnap.Ref.ID

// Check that the filter condition is satisfied.
if filter.condition != nil && !filter.condition(utils.Fields(fields)) {
continue
}

lastKey = docSnap.Data()[createdAtDocProperty].(int64)
values = append(values, fields)
totalSize += len(data)
if limit > 0 && len(values) >= limit {
break
}
}

if len(docSnaps) < limit {
lastKey = ""
}

var toSort sort.Interface
switch order {
case types.EventOrderAscending:
toSort = events.ByTimeAndIndex(values)
case types.EventOrderDescending:
toSort = sort.Reverse(events.ByTimeAndIndex(values))
default:
return nil, 0, "", trace.BadParameter("invalid event order: %v", order)
return nil, "", trace.BadParameter("invalid event order: %v", order)
}
sort.Sort(toSort)

sort.Sort(toSort)
eventArr := make([]apievents.AuditEvent, 0, len(values))
for _, fields := range values {
event, err := events.FromEventFields(fields)
if err != nil {
return nil, 0, "", trace.Wrap(err)
return nil, "", trace.Wrap(err)
}
eventArr = append(eventArr, event)
}

var lastKeyString string
if lastKey != 0 && !reachedEnd {
lastKeyString = fmt.Sprintf("%d", lastKey)
}

return eventArr, totalSize, lastKeyString, nil
return eventArr, lastKey, nil
}

// SearchSessionEvents returns session related events only. This is used to
Expand Down Expand Up @@ -654,32 +621,30 @@ func (l *Log) getIndexParent() string {
}

func (l *Log) ensureIndexes(adminSvc *apiv1.FirestoreAdminClient) error {
tuples := make([]*firestorebk.IndexTuple, 0)
tuples = append(tuples, &firestorebk.IndexTuple{
FirstField: eventNamespaceDocProperty,
SecondField: createdAtDocProperty,
SecondFieldOrder: admin.Index_IndexField_ASCENDING,
})
tuples = append(tuples, &firestorebk.IndexTuple{
FirstField: eventNamespaceDocProperty,
SecondField: createdAtDocProperty,
SecondFieldOrder: admin.Index_IndexField_DESCENDING,
})
tuples = append(tuples, &firestorebk.IndexTuple{
FirstField: eventTypeDocProperty,
SecondField: createdAtDocProperty,
SecondFieldOrder: admin.Index_IndexField_ASCENDING,
})
tuples = append(tuples, &firestorebk.IndexTuple{
FirstField: eventTypeDocProperty,
SecondField: createdAtDocProperty,
SecondFieldOrder: admin.Index_IndexField_DESCENDING,
})
tuples = append(tuples, &firestorebk.IndexTuple{
FirstField: sessionIDDocProperty,
SecondField: eventIndexDocProperty,
SecondFieldOrder: admin.Index_IndexField_ASCENDING,
})
tuples := firestorebk.IndexList{}
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
)
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_DESCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
)
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventTypeDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_DESCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
)
tuples.Index(
firestorebk.Field(eventNamespaceDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(eventTypeDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(sessionIDDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(createdAtDocProperty, admin.Index_IndexField_ASCENDING),
firestorebk.Field(firestore.DocumentID, admin.Index_IndexField_ASCENDING),
)
err := firestorebk.EnsureIndexes(l.svcContext, adminSvc, tuples, l.getIndexParent())
return trace.Wrap(err)
}
Expand Down