Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trim large events in Athena querier #35402

Merged
merged 3 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,3 +635,7 @@ func newAthenaMetrics(cfg athenaMetricsConfig) (*athenaMetrics, error) {
m.consumerNumberOfErrorsFromSQSCollect,
))
}

type trimmableEvent interface {
TrimToMaxSize(int) apievents.AuditEvent
}
27 changes: 24 additions & 3 deletions lib/events/athena/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ const (
// maxSNSMessageSize defines maximum size of SNS message. AWS allows 256KB
// however it counts also headers. We round it to 250KB, just to be sure.
maxSNSMessageSize = 250 * 1024
// maxS3BasedSize defines some resonable threshold for S3 based messages (2GB).
maxS3BasedSize uint64 = 2 * 1024 * 1024 * 1024
)

var (
// maxS3BasedSize defines some resonable threshold for S3 based messages
// (almost 2GiB but fits in an int).
//
// It's a var instead of const so tests can override it instead of casually
// allocating 2GiB.
maxS3BasedSize = 2*1024*1024*1024 - 1
)

// publisher is a SNS based events publisher.
Expand Down Expand Up @@ -115,6 +122,20 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent)
in.SetTime(time.Now().UTC().Round(time.Millisecond))
}

// Attempt to trim the event to maxS3BasedSize. This is a no-op if the event
// is already small enough. If it can not be trimmed or the event is still
// too large after marshaling then we may fail to emit the event below.
//
// This limit is much larger than events.MaxEventBytesInResponse and the
// event may need to be trimmed again on the querier side, but this is an
// attempt to preserve as much of the event as possible in case we add the
// ability to query very large events in the future.
if t, ok := in.(trimmableEvent); ok {
// Trim to 3/4 the max size because base64 has 33% overhead.
// The TrimToMaxSize implementations have a 10% buffer already.
in = t.TrimToMaxSize(maxS3BasedSize - maxS3BasedSize/4)
}

oneOf, err := apievents.ToOneOf(in)
if err != nil {
return trace.Wrap(err)
Expand All @@ -126,7 +147,7 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent)

b64Encoded := base64.StdEncoding.EncodeToString(marshaledProto)
if len(b64Encoded) > maxSNSMessageSize {
if uint64(len(b64Encoded)) > maxS3BasedSize {
if len(b64Encoded) > maxS3BasedSize {
return trace.BadParameter("message too large to publish, size %d", len(b64Encoded))
}
return trace.Wrap(p.emitViaS3(ctx, in.GetID(), marshaledProto))
Expand Down
39 changes: 39 additions & 0 deletions lib/events/athena/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,23 @@ import (
apievents "github.com/gravitational/teleport/api/types/events"
)

func init() {
// Override maxS3BasedSize so we don't have to allocate 2GiB to test it.
// Do this in init to avoid any race.
maxS3BasedSize = maxSNSMessageSize * 4
}

// TODO(tobiaszheller): Those UT just cover basic stuff. When we will have consumer
// there will be UT which will cover whole flow of message with encoding/decoding.
func Test_EmitAuditEvent(t *testing.T) {
veryLongString := strings.Repeat("t", maxS3BasedSize+1)
tests := []struct {
name string
in apievents.AuditEvent
publishErrors []error
uploader s3uploader
wantCheck func(t *testing.T, out []fakeQueueMessage)
wantErrorMsg string
}{
{
name: "valid publish",
Expand Down Expand Up @@ -87,6 +95,33 @@ func Test_EmitAuditEvent(t *testing.T) {
require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based)
},
},
{
name: "very big untrimmable event",
in: &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Code: veryLongString,
},
},
uploader: mockUploader{},
wantErrorMsg: "message too large to publish",
},
{
name: "very big trimmable event",
in: &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
},
DatabaseQuery: veryLongString,
},
uploader: mockUploader{},
wantCheck: func(t *testing.T, out []fakeQueueMessage) {
require.Len(t, out, 1)
require.Contains(t, *out[0].attributes[payloadTypeAttr].StringValue, payloadTypeS3Based)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -98,6 +133,10 @@ func Test_EmitAuditEvent(t *testing.T) {
},
}
err := p.EmitAuditEvent(context.Background(), tt.in)
if tt.wantErrorMsg != "" {
require.ErrorContains(t, err, tt.wantErrorMsg)
return
}
require.NoError(t, err)
out := fq.dequeue()
tt.wantCheck(t, out)
Expand Down
52 changes: 50 additions & 2 deletions lib/events/athena/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -592,7 +593,9 @@ func (q *querier) fetchResults(ctx context.Context, queryId string, limit int, c
),
)
defer span.End()
rb := &responseBuilder{}
rb := &responseBuilder{
logger: q.logger,
}
// nextToken is used as offset to next calls for GetQueryResults.
var nextToken string
for {
Expand Down Expand Up @@ -648,6 +651,8 @@ type responseBuilder struct {
output []apievents.AuditEvent
// totalSize is used to track size of output
totalSize int

logger log.FieldLogger
}

func (r *responseBuilder) endKeyset() (*keyset, error) {
Expand Down Expand Up @@ -706,7 +711,50 @@ func (rb *responseBuilder) appendUntilSizeLimit(resultResp *athena.GetQueryResul
}

if len(eventData)+rb.totalSize > events.MaxEventBytesInResponse {
return true, nil
// Encountered an event that would push the total page over the size
// limit.
if len(rb.output) > 0 {
// There are already one or more full events to return, just
// return them and the next event will be picked up on the next
// page.
return true, nil
}
// A single event is larger than the max page size - the best we can
// do is try to trim it.
if t, ok := event.(trimmableEvent); ok {
event = t.TrimToMaxSize(events.MaxEventBytesInResponse)
// Exact rb.totalSize doesn't really matter since the response is
// already size limited.
rb.totalSize += events.MaxEventBytesInResponse
rb.output = append(rb.output, event)
return true, nil
}
// Failed to trim the event to size. The only options are to return
// a response with 0 events, skip this event, or return an error.
//
// Silently skipping events is a terrible option, it's better for
// the client to get an error.
//
// Returning 0 events amounts to either skipping the event or
// getting the client stuck in a paging loop depending on what would
// be returned for the next page token.
//
// Returning a descriptive error should at least give the client a
// hint as to what has gone wrong so that an attempt can be made to
// fix it.
//
// If this condition is reached it should be considered a bug, any
// event that can possibly exceed the maximum size should implement
// TrimToMaxSize (until we can one day implement an API for storing
// and retrieving large events).
rb.logger.WithFields(log.Fields{
"event_type": event.GetType(),
"event_id": event.GetID(),
"event_size": len(eventData),
}).Error("Failed to query event exceeding maximum response size.")
return true, trace.Errorf(
"%s event %s is %s and cannot be returned because it exceeds the maximum response size of %s",
event.GetType(), event.GetID(), humanize.IBytes(uint64(len(eventData))), humanize.IBytes(events.MaxEventBytesInResponse))
}
rb.totalSize += len(eventData)
rb.output = append(rb.output, event)
Expand Down
63 changes: 50 additions & 13 deletions lib/events/athena/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/athena"
athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types"
"github.com/dustin/go-humanize"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
Expand Down Expand Up @@ -731,7 +732,7 @@ func Test_querier_fetchResults(t *testing.T) {
AppName: "app-4",
},
}
veryBigEvent := &apievents.AppCreate{
bigUntrimmableEvent := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Expand All @@ -741,16 +742,26 @@ func Test_querier_fetchResults(t *testing.T) {
AppName: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
},
}
bigTrimmableEvent := &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Type: events.DatabaseSessionQueryEvent,
},
DatabaseQuery: strings.Repeat("aaaaa", events.MaxEventBytesInResponse),
}
bigTrimmedEvent := bigTrimmableEvent.TrimToMaxSize(events.MaxEventBytesInResponse)
tests := []struct {
name string
limit int
condition utils.FieldsCondition
// fakeResp defines responses which will be returned based on given
// input token to GetQueryResults. Note that due to limit of GetQueryResults
// we are doing multiple calls, first always with empty token.
fakeResp map[string]eventsWithToken
wantEvents []apievents.AuditEvent
wantKeyset string
fakeResp map[string]eventsWithToken
wantEvents []apievents.AuditEvent
wantKeyset string
wantErrorMsg string
}{
{
name: "no data returned from query, return empty results",
Expand All @@ -767,25 +778,45 @@ func Test_querier_fetchResults(t *testing.T) {
wantEvents: []apievents.AuditEvent{event1, event2, event3, event4},
},
{
name: "events with veryBigEvent exceeding > MaxEventBytesInResponse",
name: "events with untrimmable event exceeding > MaxEventBytesInResponse",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "token1", events: []apievents.AuditEvent{event1}},
"token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, veryBigEvent}},
"token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigUntrimmableEvent}},
},
limit: 10,
// we don't expect veryBigEvent because it should go to next batch
// we don't expect bigUntrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKeyset: mustEventToKey(t, event3),
},
{
// TODO(tobiaszheller): right now if we have event that's > 1 MiB, it will be silently ignored (due to gRPC unary limit).
// Come back later when we have decision what to do with it.
name: "only 1 very big event",
name: "only 1 very big untrimmable event",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "", events: []apievents.AuditEvent{veryBigEvent}},
"": {returnToken: "", events: []apievents.AuditEvent{bigUntrimmableEvent}},
},
limit: 10,
wantErrorMsg: fmt.Sprintf(
"app.create event %s is 5.0 MiB and cannot be returned because it exceeds the maximum response size of %s",
bigUntrimmableEvent.Metadata.ID, humanize.IBytes(events.MaxEventBytesInResponse)),
},
{
name: "events with trimmable event exceeding > MaxEventBytesInResponse",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "token1", events: []apievents.AuditEvent{event1}},
"token1": {returnToken: "", events: []apievents.AuditEvent{event2, event3, bigTrimmableEvent}},
},
limit: 10,
// we don't expect bigTrimmableEvent because it should go to next batch
wantEvents: []apievents.AuditEvent{event1, event2, event3},
wantKeyset: mustEventToKey(t, event3),
},
{
name: "only 1 very big trimmable event",
fakeResp: map[string]eventsWithToken{
"": {returnToken: "", events: []apievents.AuditEvent{bigTrimmableEvent}},
},
limit: 10,
wantEvents: []apievents.AuditEvent{},
wantEvents: []apievents.AuditEvent{bigTrimmedEvent},
wantKeyset: mustEventToKey(t, bigTrimmableEvent),
},
{
name: "number of events equals limit in req, make sure that pagination keyset is returned",
Expand Down Expand Up @@ -822,8 +853,14 @@ func Test_querier_fetchResults(t *testing.T) {
},
}
gotEvents, gotKeyset, err := q.fetchResults(context.Background(), "queryid", tt.limit, tt.condition)
if tt.wantErrorMsg != "" {
require.ErrorContains(t, err, tt.wantErrorMsg)
return
}
require.NoError(t, err)
require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty()))
require.Empty(t, cmp.Diff(tt.wantEvents, gotEvents, cmpopts.EquateEmpty(),
// Expect the database query to be trimmed
cmpopts.IgnoreFields(apievents.DatabaseSessionQuery{}, "DatabaseQuery")))
require.Equal(t, tt.wantKeyset, gotKeyset)
})
}
Expand Down