Skip to content

Commit

Permalink
[v13] fix: trim large events in Athena querier
Browse files Browse the repository at this point in the history
Backport #35402 to branch/v13
Fixes #35161

Large events queried from the Athena audit backend will now be trimmed
before they are stored and before they are returned from a query
according to the existing TrimToMaxSize implementations for each event
type already used by the Dynamo and File backends.

The other backends typically trim the event before storing it, for
Dynamo this is due to the 400 KB item size limit, for the file backend
it's due to the 64 KiB bufio.MaxScanTokenSize.

There is no hard limit to events stored in Parquet files in S3, but
we've been using a 2 GiB limit in the publisher so far.
With this change we will attempt to trim events to 2 GiB before writing
them (if we haven't already run out of memory) instead of just failing.

We've also been using a 1 MiB limit in the querier and just returning an
empty result when an event larger than that is encountered.
With this change we will attempt to trim the event to 1MiB before
returning it.
The 1 MiB limit ultimately stems from the 4MB max gRPC message size.

We could just trim to 1 MiB in the publisher, but I'd prefer to preserve
as much of the event data as possible in case we improve the querying
story for large events in the future (and in case the user wants to
query the events directly from S3).
  • Loading branch information
nklaassen committed Jan 26, 2024
1 parent f4c5844 commit 663ae29
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 18 deletions.
4 changes: 4 additions & 0 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,3 +551,7 @@ var (
consumerNumberOfErrorsFromSQSCollect,
}
)

type trimmableEvent interface {
TrimToMaxSize(int) apievents.AuditEvent
}
27 changes: 24 additions & 3 deletions lib/events/athena/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,15 @@ const (
// maxSNSMessageSize defines maximum size of SNS message. AWS allows 256KB
// however it counts also headers. We round it to 250KB, just to be sure.
maxSNSMessageSize = 250 * 1024
// maxS3BasedSize defines some resonable threshold for S3 based messages (2GB).
maxS3BasedSize uint64 = 2 * 1024 * 1024 * 1024
)

var (
// maxS3BasedSize defines some resonable threshold for S3 based messages
// (almost 2GiB but fits in an int).
//
// It's a var instead of const so tests can override it instead of casually
// allocating 2GiB.
maxS3BasedSize = 2*1024*1024*1024 - 1
)

// publisher is a SNS based events publisher.
Expand Down Expand Up @@ -112,6 +119,20 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent)
in.SetTime(time.Now().UTC().Round(time.Millisecond))
}

// Attempt to trim the event to maxS3BasedSize. This is a no-op if the event
// is already small enough. If it can not be trimmed or the event is still
// too large after marshaling then we may fail to emit the event below.
//
// This limit is much larger than events.MaxEventBytesInResponse and the
// event may need to be trimmed again on the querier side, but this is an
// attempt to preserve as much of the event as possible in case we add the
// ability to query very large events in the future.
if t, ok := in.(trimmableEvent); ok {
// Trim to 3/4 the max size because base64 has 33% overhead.
// The TrimToMaxSize implementations have a 10% buffer already.
in = t.TrimToMaxSize(maxS3BasedSize - maxS3BasedSize/4)
}

oneOf, err := apievents.ToOneOf(in)
if err != nil {
return trace.Wrap(err)
Expand All @@ -123,7 +144,7 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent)

b64Encoded := base64.StdEncoding.EncodeToString(marshaledProto)
if len(b64Encoded) > maxSNSMessageSize {
if uint64(len(b64Encoded)) > maxS3BasedSize {
if len(b64Encoded) > maxS3BasedSize {
return trace.BadParameter("message too large to publish, size %d", len(b64Encoded))
}
return trace.Wrap(p.emitViaS3(ctx, in.GetID(), marshaledProto))
Expand Down
39 changes: 39 additions & 0 deletions lib/events/athena/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,23 @@ import (
apievents "github.com/gravitational/teleport/api/types/events"
)

func init() {
// Override maxS3BasedSize so we don't have to allocate 2GiB to test it.
// Do this in init to avoid any race.
maxS3BasedSize = maxSNSMessageSize * 4
}

// TODO(tobiaszheller): Those UT just cover basic stuff. When we will have consumer
// there will be UT which will cover whole flow of message with encoding/decoding.
func Test_EmitAuditEvent(t *testing.T) {
veryLongString := strings.Repeat("t", maxS3BasedSize+1)
tests := []struct {
name string
in apievents.AuditEvent
publishErrors []error
uploader s3uploader
wantCheck func(t *testing.T, out []fakeQueueMessage)
wantErrorMsg string
}{
{
name: "valid publish",
Expand Down Expand Up @@ -83,6 +91,33 @@ func Test_EmitAuditEvent(t *testing.T) {
require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based)
},
},
{
name: "very big untrimmable event",
in: &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Code: veryLongString,
},
},
uploader: mockUploader{},
wantErrorMsg: "message too large to publish",
},
{
name: "very big trimmable event",
in: &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
},
DatabaseQuery: veryLongString,
},
uploader: mockUploader{},
wantCheck: func(t *testing.T, out []fakeQueueMessage) {
require.Len(t, out, 1)
require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -94,6 +129,10 @@ func Test_EmitAuditEvent(t *testing.T) {
},
}
err := p.EmitAuditEvent(context.Background(), tt.in)
if tt.wantErrorMsg != "" {
require.ErrorContains(t, err, tt.wantErrorMsg)
return
}
require.NoError(t, err)
out := fq.dequeue()
tt.wantCheck(t, out)
Expand Down
52 changes: 50 additions & 2 deletions lib/events/athena/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -588,7 +589,9 @@ func (q *querier) fetchResults(ctx context.Context, queryId string, limit int, c
),
)
defer span.End()
rb := &responseBuilder{}
rb := &responseBuilder{
logger: q.logger,
}
// nextToken is used as offset to next calls for GetQueryResults.
var nextToken string
for {
Expand Down Expand Up @@ -644,6 +647,8 @@ type responseBuilder struct {
output []apievents.AuditEvent
// totalSize is used to track size of output
totalSize int

logger log.FieldLogger
}

func (r *responseBuilder) endKeyset() (*keyset, error) {
Expand Down Expand Up @@ -702,7 +707,50 @@ func (rb *responseBuilder) appendUntilSizeLimit(resultResp *athena.GetQueryResul
}

if len(eventData)+rb.totalSize > events.MaxEventBytesInResponse {
return true, nil
// Encountered an event that would push the total page over the size
// limit.
if len(rb.output) > 0 {
// There are already one or more full events to return, just
// return them and the next event will be picked up on the next
// page.
return true, nil
}
// A single event is larger than the max page size - the best we can
// do is try to trim it.
if t, ok := event.(trimmableEvent); ok {
event = t.TrimToMaxSize(events.MaxEventBytesInResponse)
// Exact rb.totalSize doesn't really matter since the response is
// already size limited.
rb.totalSize += events.MaxEventBytesInResponse
rb.output = append(rb.output, event)
return true, nil
}
// Failed to trim the event to size. The only options are to return
// a response with 0 events, skip this event, or return an error.
//
// Silently skipping events is a terrible option, it's better for
// the client to get an error.
//
// Returning 0 events amounts to either skipping the event or
// getting the client stuck in a paging loop depending on what would
// be returned for the next page token.
//
// Returning a descriptive error should at least give the client a
// hint as to what has gone wrong so that an attempt can be made to
// fix it.
//
// If this condition is reached it should be considered a bug, any
// event that can possibly exceed the maximum size should implement
// TrimToMaxSize (until we can one day implement an API for storing
// and retrieving large events).
rb.logger.WithFields(log.Fields{
"event_type": event.GetType(),
"event_id": event.GetID(),
"event_size": len(eventData),
}).Error("Failed to query event exceeding maximum response size.")
return true, trace.Errorf(
"%s event %s is %s and cannot be returned because it exceeds the maximum response size of %s",
event.GetType(), event.GetID(), humanize.IBytes(uint64(len(eventData))), humanize.IBytes(events.MaxEventBytesInResponse))
}
rb.totalSize += len(eventData)
rb.output = append(rb.output, event)
Expand Down
63 changes: 50 additions & 13 deletions lib/events/athena/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/dustin/go-humanize"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
Expand Down Expand Up @@ -727,7 +728,7 @@ func Test_querier_fetchResults(t *testing.T) {
AppName: "app-4",
},
}
veryBigEvent := &apievents.AppCreate{
bigUntrimmableEvent := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Expand All @@ -737,16 +738,26 @@ func Test_querier_fetchResults(t *testing.T) {
AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
},
}
bigTrimmableEvent := &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Type: events.DatabaseSessionQueryEvent,
},
DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
}
bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse)
tests := []struct {
name string
limit int
condition utils.FieldsCondition
// fakeResp defines responses which will be returned based on given
// input token to GetQueryResults. Note that due to limit of GetQueryResults
// we are doing multiple calls, first always with empty token.
fakeResp map[string]eventsWithToken
wantEvents []apievents.AuditEvent
wantKeyset string
fakeResp map[string]eventsWithToken
wantEvents []apievents.AuditEvent
wantKeyset string
wantErrorMsg string
}{
{
name: "no data returned from query, return empty results",
Expand All @@ -763,25 +774,45 @@ func Test_querier_fetchResults(t *testing.T) {
wantEvents: []apievents.AuditEvent{event1, event2, event3, event4},
},
{
name: "events with veryBigEvent exceeding > MaxEventBytesInResponse",
name: "events with untrimmable event exceeding > MaxEventBytesInResponse",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "token1", events: []apievents.AuditEvent{event1}},
"token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, veryBigEvent}},
"token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigUntrimmableEvent}},
},
limit: 10,
// we don't expect veryBigEvent because it should go to next batch
// we don't expect bigUntrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKeyset: mustEventToKey(t, event3),
},
{
// TODO(tobiaszheller): right now if we have event that's > 1 MiB, it will be silently ignored (due to gRPC unary limit).
// Come back later when we have decision what to do with it.
name: "only 1 very big event",
name: "only 1 very big untrimmable event",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "", events: []apievents.AuditEvent{veryBigEvent}},
"": {returnToken: "", events: []apievents.AuditEvent{bigUntrimmableEvent}},
},
limit: 10,
wantErrorMsg: fmt.Sprintf(
"app.create event %s is 5.0 MiB and cannot be returned because it exceeds the maximum response size of %s",
bigUntrimmableEvent.Metadata.ID, humanize.IBytes(events.MaxEventBytesInResponse)),
},
{
name: "events with trimmable event exceeding > MaxEventBytesInResponse",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "token1", events: []apievents.AuditEvent{event1}},
"token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigTrimmableEvent}},
},
limit: 10,
// we don't expect bigTrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKeyset: mustEventToKey(t, event3),
},
{
name: "only 1 very big trimmable event",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "", events: []apievents.AuditEvent{bigTrimmableEvent}},
},
limit: 10,
wantEvents: []apievents.AuditEvent{},
wantEvents: []apievents.AuditEvent{bigTrimmedEvent},
wantKeyset: mustEventToKey(t, bigTrimmableEvent),
},
{
name: "number of events equals limit in req, make sure that pagination keyset is returned",
Expand Down Expand Up @@ -818,8 +849,14 @@ func Test_querier_fetchResults(t *testing.T) {
},
}
gotEvents, gotKeyset, err := q.fetchResults(context.Background(), "queryid", tt.limit, tt.condition)
if tt.wantErrorMsg != "" {
require.ErrorContains(t, err, tt.wantErrorMsg)
return
}
require.NoError(t, err)
require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty()))
require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty(),
// Expect the database query to be trimmed
cmpopts.IgnoreFields(apievents.DatabaseSessionQuery{}, "DatabaseQuery")))
require.Equal(t, tt.wantKeyset, gotKeyset)
})
}
Expand Down

0 comments on commit 663ae29

Please sign in to comment.