Skip to content

Commit

Permalink
fix: s3_input integration tests (#39648)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkoutsovasilis committed May 24, 2024
1 parent 67ad3d0 commit 566d3bf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .ci/jobs/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ services:
image: busybox
depends_on:
localstack: { condition: service_healthy }

localstack:
container_name: "${localstack_integration_test_container}"
image: localstack/localstack:2.1.0 # Latest stable release
image: localstack/localstack:3.1.0 # Latest stable release
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
environment:
Expand Down
76 changes: 42 additions & 34 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,22 @@ file_selectors:
`, queueURL))
}

func createInput(t *testing.T, cfg *conf.C) *s3Input {
func createSQSInput(t *testing.T, cfg *conf.C) *sqsReaderInput {
inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg)
if err != nil {
t.Fatal(err)
}

return inputV2.(*s3Input)
return inputV2.(*sqsReaderInput)
}

func createS3Input(t *testing.T, cfg *conf.C) *s3PollerInput {
inputV2, err := Plugin(openTestStatestore()).Manager.Create(cfg)
if err != nil {
t.Fatal(err)
}

return inputV2.(*s3PollerInput)
}

func newV2Context() (v2.Context, func()) {
Expand Down Expand Up @@ -235,11 +244,7 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
})

// Initialize s3Input with the test config
s3Input := &s3Input{
config: config,
awsConfig: awsCfg,
store: openTestStatestore(),
}
s3Input := newSQSReaderInput(config, awsCfg)
// Run S3 Input with desired context
var errGroup errgroup.Group
errGroup.Go(func() error {
Expand Down Expand Up @@ -284,7 +289,7 @@ func TestInputRunSQS(t *testing.T) {
"testdata/log.txt", // Skipped (no match).
)

s3Input := createInput(t, makeTestConfigSQS(tfConfig.QueueURL))
sqsInput := createSQSInput(t, makeTestConfigSQS(tfConfig.QueueURL))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
Expand All @@ -294,23 +299,23 @@ func TestInputRunSQS(t *testing.T) {

var errGroup errgroup.Group
errGroup.Go(func() error {
return s3Input.Run(inputCtx, &fakePipeline{})
return sqsInput.Run(inputCtx, &fakePipeline{})
})

if err := errGroup.Wait(); err != nil {
t.Fatal(err)
}

assert.EqualValues(t, s3Input.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications.
assert.EqualValues(t, s3Input.metrics.sqsMessagesInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications.
assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0)
assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0)
assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunS3(t *testing.T) {
Expand All @@ -332,7 +337,7 @@ func TestInputRunS3(t *testing.T) {
"testdata/log.txt", // Skipped (no match).
)

s3Input := createInput(t, makeTestConfigS3(tfConfig.BucketName))
s3Input := createS3Input(t, makeTestConfigS3(tfConfig.BucketName))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
Expand All @@ -353,7 +358,7 @@ func TestInputRunS3(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3ObjectsListedTotal.Get(), 8)
assert.EqualValues(t, s3Input.metrics.s3ObjectsProcessedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 6)
assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
}

Expand Down Expand Up @@ -402,7 +407,10 @@ func makeAWSConfig(t *testing.T, region string) aws.Config {

func drainSQS(t *testing.T, region string, queueURL string, cfg aws.Config) {
sqs := &awsSQSAPI{
client: sqs.NewFromConfig(cfg),
client: sqs.NewFromConfig(cfg, func(options *sqs.Options) {
//options.ClientLogMode = aws.LogResponseWithBody
options.Region = region
}),
queueURL: queueURL,
apiTimeout: 1 * time.Minute,
visibilityTimeout: 30 * time.Second,
Expand Down Expand Up @@ -523,7 +531,7 @@ func TestInputRunSNS(t *testing.T) {
"testdata/log.txt", // Skipped (no match).
)

s3Input := createInput(t, makeTestConfigSQS(tfConfig.QueueURLForSNS))
sqsInput := createSQSInput(t, makeTestConfigSQS(tfConfig.QueueURLForSNS))

inputCtx, cancel := newV2Context()
t.Cleanup(cancel)
Expand All @@ -533,21 +541,21 @@ func TestInputRunSNS(t *testing.T) {

var errGroup errgroup.Group
errGroup.Go(func() error {
return s3Input.Run(inputCtx, &fakePipeline{})
return sqsInput.Run(inputCtx, &fakePipeline{})
})

if err := errGroup.Wait(); err != nil {
t.Fatal(err)
}

assert.EqualValues(t, s3Input.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications.
assert.EqualValues(t, s3Input.metrics.sqsMessagesInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications.
assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0)
assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0)
assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}

0 comments on commit 566d3bf

Please sign in to comment.