Skip to content

Commit

Permalink
feat: add metrics for event sizes (#35440) (#35505)
Browse files Browse the repository at this point in the history
Fixes #35161

Large events queried from the Athena audit backend will now be trimmed
before they are stored and before they are returned from a query
according to the existing TrimToMaxSize implementations for each event
type already used by the Dynamo and File backends.

The other backends typically trim the event before storing it, for
Dynamo this is due to the 400 KB item size limit, for the file backend
it's due to the 64 KiB bufio.MaxScanTokenSize.

There is no hard limit to events stored in Parquet files in S3, but
we've been using a 2 GiB limit in the publisher so far.
With this change we will attempt to trim events to 2 GiB before writing
them (if we haven't already run out of memory) instead of just failing.

We've also been using a 1 MiB limit in the querier and just returning an
empty result when an event larger than that is encountered.
With this change we will attempt to trim the event to 1MiB before
returning it.
The 1 MiB limit ultimately stems from the 4MB max gRPC message size.

We could just trim to 1 MiB in the publisher, but I'd prefer to preserve
as much of the event data as possible in case we improve the querying
story for large events in the future (and in case the user wants to
query the events directly from S3).

* feat: add metrics for event sizes

Co-authored-by: Nic Klaassen <nic@goteleport.com>
  • Loading branch information
rosstimothy and nklaassen committed Dec 7, 2023
1 parent a565c90 commit a3c6ee7
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 1 deletion.
5 changes: 5 additions & 0 deletions lib/events/athena/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/gravitational/trace"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/internal/context121"
)

Expand Down Expand Up @@ -128,9 +129,13 @@ func (p *publisher) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent)
// attempt to preserve as much of the event as possible in case we add the
// ability to query very large events in the future.
if t, ok := in.(trimmableEvent); ok {
prevSize := in.Size()
// Trim to 3/4 the max size because base64 has 33% overhead.
// The TrimToMaxSize implementations have a 10% buffer already.
in = t.TrimToMaxSize(maxS3BasedSize - maxS3BasedSize/4)
if in.Size() != prevSize {
events.MetricStoredTrimmedEvents.Inc()
}
}

oneOf, err := apievents.ToOneOf(in)
Expand Down
1 change: 1 addition & 0 deletions lib/events/athena/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func (rb *responseBuilder) appendUntilSizeLimit(resultResp *athena.GetQueryResul
// do is try to trim it.
if t, ok := event.(trimmableEvent); ok {
event = t.TrimToMaxSize(events.MaxEventBytesInResponse)
events.MetricQueriedTrimmedEvents.Inc()
// Exact rb.totalSize doesn't really matter since the response is
// already size limited.
rb.totalSize += events.MaxEventBytesInResponse
Expand Down
28 changes: 27 additions & 1 deletion lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,33 @@ var (
},
)

prometheusCollectors = []prometheus.Collector{auditOpenFiles, auditDiskUsed, auditFailedDisk, AuditFailedEmit, auditEmitEvent}
auditEmitEventSizes = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Name: "audit_emitted_event_sizes",
Help: "Size of single events emitted",
Buckets: prometheus.ExponentialBucketsRange(64, 2*1024*1024*1024 /*2GiB*/, 16),
})

// MetricStoredTrimmedEvents counts the number of events that were trimmed
// before being stored.
MetricStoredTrimmedEvents = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: "audit_stored_trimmed_events",
Help: "Number of events that were trimmed before being stored",
})

// MetricQueriedTrimmedEvents counts the number of events that were trimmed
// before being returned from a query.
MetricQueriedTrimmedEvents = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Name: "audit_queried_trimmed_events",
Help: "Number of events that were trimmed before being returned from a query",
})

prometheusCollectors = []prometheus.Collector{auditOpenFiles, auditDiskUsed, auditFailedDisk, AuditFailedEmit, auditEmitEvent, auditEmitEventSizes, MetricStoredTrimmedEvents, MetricQueriedTrimmedEvents}
)

// AuditLog is a new combined facility to record Teleport events and
Expand Down
1 change: 1 addition & 0 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func (l *Log) handleAWSValidationError(ctx context.Context, err error, sessionID
}
fields := log.Fields{"event_id": in.GetID(), "event_type": in.GetType()}
l.WithFields(fields).Info("Uploaded trimmed event to DynamoDB backend.")
events.MetricStoredTrimmedEvents.Inc()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions lib/events/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (w *CheckingEmitterConfig) CheckAndSetDefaults() error {
func (r *CheckingEmitter) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
ctx = context121.WithoutCancel(ctx)
auditEmitEvent.Inc()
auditEmitEventSizes.Observe(float64(event.Size()))
if err := checkAndSetEventFields(event, r.Clock, r.UIDGenerator, r.ClusterName); err != nil {
log.WithError(err).Errorf("Failed to emit audit event.")
AuditFailedEmit.Inc()
Expand Down
1 change: 1 addition & 0 deletions lib/events/filelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (l *FileLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent
if err != nil {
return trace.Wrap(err)
}
MetricStoredTrimmedEvents.Inc()
default:
fields := log.Fields{"event_type": event.GetType(), "event_size": len(line)}
l.WithFields(fields).Warnf("Got a event that exceeded max allowed size.")
Expand Down

0 comments on commit a3c6ee7

Please sign in to comment.