Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] Add a fallback for EmitAuditEvents failure due to event conflicts (DynamoDB backend) #40913

Merged
merged 3 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 77 additions & 18 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type Config struct {
UIDGenerator utils.UID
// Endpoint is an optional non-AWS endpoint
Endpoint string
// DisableConflictCheck disables conflict checks when emitting an event.
// Disabling it can cause events to be lost due to them being overwritten.
DisableConflictCheck bool

// ReadMaxCapacity is the maximum provisioned read capacity.
ReadMaxCapacity int64
Expand Down Expand Up @@ -146,6 +149,10 @@ func (cfg *Config) SetFromURL(in *url.URL) error {
cfg.Endpoint = endpoint
}

if disableConflictCheck := in.Query().Get("disable_conflict_check"); disableConflictCheck != "" {
cfg.DisableConflictCheck = true
}

const boolErrorTemplate = "failed to parse URI %q flag %q - %q, supported values are 'true', 'false', or any other" +
"supported boolean in https://pkg.go.dev/strconv#ParseBool"
if val := in.Query().Get(events.UseFIPSQueryParam); val != "" {
Expand Down Expand Up @@ -366,24 +373,19 @@ const (
func (l *Log) EmitAuditEvent(ctx context.Context, in apievents.AuditEvent) error {
ctx = context.WithoutCancel(ctx)
sessionID := getSessionID(in)
if err := l.putAuditEvent(ctx, sessionID, in); err != nil {
switch {
case isAWSValidationError(err):
// In case of ValidationException: Item size has exceeded the maximum allowed size
// sanitize event length and retry upload operation.
return trace.Wrap(l.handleAWSValidationError(ctx, err, sessionID, in))
}
return trace.Wrap(err)
}
return nil
return trace.Wrap(l.putAuditEvent(ctx, sessionID, in))
}

func (l *Log) handleAWSValidationError(ctx context.Context, err error, sessionID string, in apievents.AuditEvent) error {
if alreadyTrimmed := ctx.Value(largeEventHandledContextKey); alreadyTrimmed != nil {
return err
}

se, ok := trimEventSize(in)
if !ok {
return trace.BadParameter(err.Error())
}
if err := l.putAuditEvent(ctx, sessionID, se); err != nil {
if err := l.putAuditEvent(context.WithValue(ctx, largeEventHandledContextKey, true), sessionID, se); err != nil {
return trace.BadParameter(err.Error())
}
fields := log.Fields{"event_id": in.GetID(), "event_type": in.GetType()}
Expand All @@ -392,6 +394,22 @@ func (l *Log) handleAWSValidationError(ctx context.Context, err error, sessionID
return nil
}

func (l *Log) handleConditionError(ctx context.Context, err error, sessionID string, in apievents.AuditEvent) error {
if alreadyUpdated := ctx.Value(conflictHandledContextKey); alreadyUpdated != nil {
return err
}

// Update index using the current system time instead of event time to
// ensure the value is always set.
in.SetIndex(l.Clock.Now().UnixNano())

if err := l.putAuditEvent(context.WithValue(ctx, conflictHandledContextKey, true), sessionID, in); err != nil {
return trace.Wrap(err)
}
l.WithFields(log.Fields{"event_id": in.GetID(), "event_type": in.GetType()}).Debug("Event index overwritten")
return nil
}

// getSessionID if set returns event ID obtained from metadata or generates a new one.
func getSessionID(in apievents.AuditEvent) string {
s, ok := in.(events.SessionMetadataGetter)
Expand All @@ -415,13 +433,48 @@ func trimEventSize(event apievents.AuditEvent) (apievents.AuditEvent, bool) {
return m.TrimToMaxSize(maxItemSize), true
}

// putAuditEventContextKey represents context keys of putAuditEvent.
type putAuditEventContextKey int

const (
// conflictHandledContextKey if present on the context, the conflict error
// was already handled.
conflictHandledContextKey putAuditEventContextKey = iota
// largeEventHandledContextKey if present on the context, the large event
// error was already handled.
largeEventHandledContextKey
)

func (l *Log) putAuditEvent(ctx context.Context, sessionID string, in apievents.AuditEvent) error {
input, err := l.createPutItem(sessionID, in)
if err != nil {
return trace.Wrap(err)
}
_, err = l.svc.PutItemWithContext(ctx, input)
return convertError(err)

if _, err = l.svc.PutItemWithContext(ctx, input); err != nil {
err = convertError(err)

switch {
case isAWSValidationError(err):
// In case of ValidationException: Item size has exceeded the maximum allowed size
// sanitize event length and retry upload operation.
return trace.Wrap(l.handleAWSValidationError(ctx, err, sessionID, in))
case trace.IsAlreadyExists(err):
// Condition errors are directly related to the uniqueness of the
// item event index/session id. Since we can't change the session
// id, update the event index with a new value and retry the put
// item.
l.
WithError(err).
WithFields(log.Fields{"event_type": in.GetType(), "session_id": sessionID, "event_index": in.GetIndex()}).
Error("Conflict on event session_id and event_index")
return trace.Wrap(l.handleConditionError(ctx, err, sessionID, in))
}

return err
}

return nil
}

func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamodb.PutItemInput, error) {
Expand All @@ -443,11 +496,17 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod
if err != nil {
return nil, trace.Wrap(err)
}
return &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(l.Tablename),
ConditionExpression: aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)"),
}, nil

input := &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(l.Tablename),
}

if !l.Config.DisableConflictCheck {
input.ConditionExpression = aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)")
}

return input, nil
}

type messageSizeTrimmer interface {
Expand Down
43 changes: 24 additions & 19 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -68,6 +67,7 @@ func setupDynamoContext(t *testing.T) *dynamoContext {
Tablename: fmt.Sprintf("teleport-test-%v", uuid.New().String()),
Clock: fakeClock,
UIDGenerator: utils.NewFakeUID(),
Endpoint: "http://localhost:8000",
})
require.NoError(t, err)

Expand Down Expand Up @@ -437,38 +437,43 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
}

// TestEmitSessionEventsSameIndex given events that share the same session ID
// and index, the emit should fail, avoiding any event to get overwritten.
// and index, the emit should succeed.
func TestEmitSessionEventsSameIndex(t *testing.T) {
ctx := context.Background()
tt := setupDynamoContext(t)
sessionID := session.NewID()

require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0)))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0, "")))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1, "")))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1, "")))
}

func generateEvent(sessionID session.ID, index int64) apievents.AuditEvent {
return &apievents.AppSessionChunk{
// TestValidationErrorsHandling given events that return validation
// errors (large event size and already exists), the emit should handle them
// and succeed on emitting the event when it does support trimming.
func TestValidationErrorsHandling(t *testing.T) {
ctx := context.Background()
tt := setupDynamoContext(t)
sessionID := session.NewID()
largeQuery := strings.Repeat("A", maxItemSize)

// First write should only trigger the large event size
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0, largeQuery)))
// Second should trigger both errors.
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0, largeQuery)))
}

func generateEvent(sessionID session.ID, index int64, query string) apievents.AuditEvent {
return &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
Type: events.AppSessionChunkEvent,
Code: events.AppSessionChunkCode,
Type: events.DatabaseSessionQueryEvent,
ClusterName: "root",
Index: index,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: uuid.New().String(),
ServerNamespace: apidefaults.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: sessionID.String(),
},
AppMetadata: apievents.AppMetadata{
AppURI: "nginx",
AppPublicAddr: "https://nginx",
AppName: "nginx",
},
SessionChunkID: uuid.New().String(),
DatabaseQuery: query,
}
}

Expand Down