Skip to content

Commit

Permalink
[v15] Add a fallback for EmitAuditEvents failure due to event conflic…
Browse files Browse the repository at this point in the history
…ts (DynamoDB backend) (#40913)

* refactor(dynamoevents): add a fallback for put item condition error

* refactor(dynamoevents): code review suggestions
  • Loading branch information
gabrielcorado committed Apr 25, 2024
1 parent 3e18844 commit 1136189
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 37 deletions.
95 changes: 77 additions & 18 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type Config struct {
UIDGenerator utils.UID
// Endpoint is an optional non-AWS endpoint
Endpoint string
// DisableConflictCheck disables conflict checks when emitting an event.
// Disabling it can cause events to be lost due to them being overwritten.
DisableConflictCheck bool

// ReadMaxCapacity is the maximum provisioned read capacity.
ReadMaxCapacity int64
Expand Down Expand Up @@ -146,6 +149,10 @@ func (cfg *Config) SetFromURL(in *url.URL) error {
cfg.Endpoint = endpoint
}

if disableConflictCheck := in.Query().Get("disable_conflict_check"); disableConflictCheck != "" {
cfg.DisableConflictCheck = true
}

const boolErrorTemplate = "failed to parse URI %q flag %q - %q, supported values are 'true', 'false', or any other" +
"supported boolean in https://pkg.go.dev/strconv#ParseBool"
if val := in.Query().Get(events.UseFIPSQueryParam); val != "" {
Expand Down Expand Up @@ -366,24 +373,19 @@ const (
func (l *Log) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error {
ctx = context.WithoutCancel(ctx)
sessionID := getSessionID(in)
if err := l.putAuditEvent(ctx, sessionID, in); err != nil {
switch {
case isAWSValidationError(err):
// In case of ValidationException: Item size has exceeded the maximum allowed size
// sanitize event length and retry upload operation.
return trace.Wrap(l.handleAWSValidationError(ctx, err, sessionID, in))
}
return trace.Wrap(err)
}
return nil
return trace.Wrap(l.putAuditEvent(ctx, sessionID, in))
}

func (l *Log) handleAWSValidationError(ctx context.Context, err error, sessionID string, in apievents.AuditEvent) error {
if alreadyTrimmed := ctx.Value(largeEventHandledContextKey); alreadyTrimmed != nil {
return err
}

se, ok := trimEventSize(in)
if !ok {
return trace.BadParameter(err.Error())
}
if err := l.putAuditEvent(ctx, sessionID, se); err != nil {
if err := l.putAuditEvent(context.WithValue(ctx, largeEventHandledContextKey, true), sessionID, se); err != nil {
return trace.BadParameter(err.Error())
}
fields := log.Fields{"event_id": in.GetID(), "event_type": in.GetType()}
Expand All @@ -392,6 +394,22 @@ func (l *Log) handleAWSValidationError(ctx context.Context, err error, sessionID
return nil
}

func (l *Log) handleConditionError(ctx context.Context, err error, sessionID string, in apievents.AuditEvent) error {
if alreadyUpdated := ctx.Value(conflictHandledContextKey); alreadyUpdated != nil {
return err
}

// Update index using the current system time instead of event time to
// ensure the value is always set.
in.SetIndex(l.Clock.Now().UnixNano())

if err := l.putAuditEvent(context.WithValue(ctx, conflictHandledContextKey, true), sessionID, in); err != nil {
return trace.Wrap(err)
}
l.WithFields(log.Fields{"event_id": in.GetID(), "event_type": in.GetType()}).Debug("Event index overwritten")
return nil
}

// getSessionID if set returns event ID obtained from metadata or generates a new one.
func getSessionID(in apievents.AuditEvent) string {
s, ok := in.(events.SessionMetadataGetter)
Expand All @@ -415,13 +433,48 @@ func trimEventSize(event apievents.AuditEvent) (apievents.AuditEvent, bool) {
return m.TrimToMaxSize(maxItemSize), true
}

// putAuditEventContextKey represents context keys of putAuditEvent.
type putAuditEventContextKey int

const (
// conflictHandledContextKey if present on the context, the conflict error
// was already handled.
conflictHandledContextKey putAuditEventContextKey = iota
// largeEventHandledContextKey if present on the context, the large event
// error was already handled.
largeEventHandledContextKey
)

func (l *Log) putAuditEvent(ctx context.Context, sessionID string, in apievents.AuditEvent) error {
input, err := l.createPutItem(sessionID, in)
if err != nil {
return trace.Wrap(err)
}
_, err = l.svc.PutItemWithContext(ctx, input)
return convertError(err)

if _, err = l.svc.PutItemWithContext(ctx, input); err != nil {
err = convertError(err)

switch {
case isAWSValidationError(err):
// In case of ValidationException: Item size has exceeded the maximum allowed size
// sanitize event length and retry upload operation.
return trace.Wrap(l.handleAWSValidationError(ctx, err, sessionID, in))
case trace.IsAlreadyExists(err):
// Condition errors are directly related to the uniqueness of the
// item event index/session id. Since we can't change the session
// id, update the event index with a new value and retry the put
// item.
l.
WithError(err).
WithFields(log.Fields{"event_type": in.GetType(), "session_id": sessionID, "event_index": in.GetIndex()}).
Error("Conflict on event session_id and event_index")
return trace.Wrap(l.handleConditionError(ctx, err, sessionID, in))
}

return err
}

return nil
}

func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamodb.PutItemInput, error) {
Expand All @@ -443,11 +496,17 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod
if err != nil {
return nil, trace.Wrap(err)
}
return &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(l.Tablename),
ConditionExpression: aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)"),
}, nil

input := &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(l.Tablename),
}

if !l.Config.DisableConflictCheck {
input.ConditionExpression = aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)")
}

return input, nil
}

type messageSizeTrimmer interface {
Expand Down
43 changes: 24 additions & 19 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -68,6 +67,7 @@ func setupDynamoContext(t *testing.T) *dynamoContext {
Tablename: fmt.Sprintf("teleport-test-%v", uuid.New().String()),
Clock: fakeClock,
UIDGenerator: utils.NewFakeUID(),
Endpoint: "http://localhost:8000",
})
require.NoError(t, err)

Expand Down Expand Up @@ -437,38 +437,43 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
}

// TestEmitSessionEventsSameIndex given events that share the same session ID
// and index, the emit should fail, avoiding any event to get overwritten.
// and index, the emit should succeed.
func TestEmitSessionEventsSameIndex(t *testing.T) {
ctx := context.Background()
tt := setupDynamoContext(t)
sessionID := session.NewID()

require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0)))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0, "")))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1, "")))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1, "")))
}

func generateEvent(sessionID session.ID, index int64) apievents.AuditEvent {
return &apievents.AppSessionChunk{
// TestValidationErrorsHandling given events that return validation
// errors (large event size and already exists), the emit should handle them
// and succeed on emitting the event when it does support trimming.
func TestValidationErrorsHandling(t *testing.T) {
ctx := context.Background()
tt := setupDynamoContext(t)
sessionID := session.NewID()
largeQuery := strings.Repeat("A", maxItemSize)

// First write should only trigger the large event size
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0, largeQuery)))
// Second should trigger both errors.
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0, largeQuery)))
}

func generateEvent(sessionID session.ID, index int64, query string) apievents.AuditEvent {
return &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
Type: events.AppSessionChunkEvent,
Code: events.AppSessionChunkCode,
Type: events.DatabaseSessionQueryEvent,
ClusterName: "root",
Index: index,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: uuid.New().String(),
ServerNamespace: apidefaults.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: sessionID.String(),
},
AppMetadata: apievents.AppMetadata{
AppURI: "nginx",
AppPublicAddr: "https://nginx",
AppName: "nginx",
},
SessionChunkID: uuid.New().String(),
DatabaseQuery: query,
}
}

Expand Down

0 comments on commit 1136189

Please sign in to comment.