Skip to content

Commit a4e336c

Browse files
authored
[AWS] Skip s3 test events in s3-sqs filebeat input (#47635)
* Skip s3 test events * add changelog * update changelog * keep only event field in s3TestEvent * add Event into s3EventsV2 * add comment and change variable name for processing s3-sns-sqs
1 parent 3b8b996 commit a4e336c

File tree

3 files changed

+85
-1
lines changed

3 files changed

+85
-1
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Skip s3 test events in filebeat s3 input
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: filebeat
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234

x-pack/filebeat/input/awss3/sqs_s3_event.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ func nonRetryableErrorWrap(err error) error {
6060
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
6161
// If the notification message is sent from SNS to SQS, then Records will be
6262
// replaced by TopicArn and Message fields.
63+
// The Event field is present in test event notifications (s3:TestEvent) but not in regular events.
6364
type s3EventsV2 struct {
65+
Event string `json:"Event"` // Present in test events (s3:TestEvent), empty in regular events
6466
TopicArn string `json:"TopicArn"`
6567
Message string `json:"Message"`
6668
Records []s3EventV2 `json:"Records"`
@@ -323,13 +325,26 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro
323325
return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err)
324326
}
325327

328+
// Check if this is a test event and skip it
329+
if events.Event == "s3:TestEvent" {
330+
p.log.Debugw("Skipping S3 test event notification", "sqs_message_body", body)
331+
return nil, nil
332+
}
333+
326334
// Check if the notification is from S3 -> SNS -> SQS
327335
if events.TopicArn != "" {
336+
// Check if the inner message is a test event before unmarshaling
337+
var innerEvents s3EventsV2
328338
dec := json.NewDecoder(strings.NewReader(events.Message))
329-
if err := dec.Decode(&events); err != nil {
339+
if err := dec.Decode(&innerEvents); err != nil {
330340
p.log.Debugw("Invalid SQS message body.", "sqs_message_body", body)
331341
return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err)
332342
}
343+
if innerEvents.Event == "s3:TestEvent" {
344+
p.log.Debugw("Skipping S3 test event notification (via SNS)", "sqs_message_body", body)
345+
return nil, nil
346+
}
347+
events = innerEvents
333348
}
334349

335350
// Check if the notification is from S3 -> EventBridge -> SQS

x-pack/filebeat/input/awss3/sqs_s3_event_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,30 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) {
282282
require.NoError(t, err)
283283
assert.Equal(t, 0, len(events))
284284
})
285+
286+
t.Run("s3:TestEvent messages are skipped", func(t *testing.T) {
287+
testEventMsg := `{
288+
"Service":"Amazon S3",
289+
"Event":"s3:TestEvent",
290+
"Time":"2014-10-13T15:57:02.089Z",
291+
"Bucket":"amzn-s3-demo-bucket",
292+
"RequestId":"5582815E1AEA5ADF",
293+
"HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
294+
}`
295+
events, err := p.getS3Notifications(testEventMsg)
296+
require.NoError(t, err)
297+
assert.Equal(t, 0, len(events), "Test events should be skipped and return no events")
298+
})
299+
300+
t.Run("s3:TestEvent messages via SNS are skipped", func(t *testing.T) {
301+
testEventViaSNS := `{
302+
"TopicArn":"arn:aws:sns:us-east-1:123456789012:test-topic",
303+
"Message":"{\"Service\":\"Amazon S3\",\"Event\":\"s3:TestEvent\",\"Time\":\"2014-10-13T15:57:02.089Z\",\"Bucket\":\"amzn-s3-demo-bucket\",\"RequestId\":\"5582815E1AEA5ADF\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\"}"
304+
}`
305+
events, err := p.getS3Notifications(testEventViaSNS)
306+
require.NoError(t, err)
307+
assert.Equal(t, 0, len(events), "Test events via SNS should be skipped and return no events")
308+
})
285309
}
286310

287311
func TestNonRecoverableError(t *testing.T) {

0 commit comments

Comments
 (0)