From da18d630550b52e6433f4759ee6e5da0818748e7 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 24 May 2023 09:31:32 +0930 Subject: [PATCH] address pr comment --- .../docs/inputs/input-aws-s3.asciidoc | 3 +- x-pack/filebeat/input/awss3/config.go | 16 +++--- x-pack/filebeat/input/awss3/config_test.go | 49 ++++++++++++++++++- x-pack/filebeat/input/awss3/input.go | 8 +-- 4 files changed, 61 insertions(+), 15 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 11d9e1b98cfb..7c65bbeb53c2 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -261,7 +261,8 @@ URL of the AWS SQS queue that messages will be received from. (Required when `bu ==== `region_name` The name of the AWS region of the end point for testing purposes. This option -must not be set when using amazonaws.com end points. +must not be set when using amazonaws end points that provide a region, and must +be set for end points that do not. [float] ==== `visibility_timeout` diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 6e76c21283d4..4f1512739dda 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -7,8 +7,6 @@ package awss3 import ( "errors" "fmt" - "net/url" - "strings" "time" "github.com/dustin/go-humanize" @@ -81,13 +79,13 @@ func (c *config) Validate() error { return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers) } - if c.QueueURL != "" && c.RegionName != "" { - u, err := url.Parse(c.QueueURL) - if err != nil { - return fmt.Errorf("invalid queue_url: %w", err) - } - if strings.HasSuffix(u.Host, ".amazonaws.com") { - return fmt.Errorf("region_name <%s> must not be set with an amazonaws.com queue_url", c.RegionName) + if c.QueueURL != "" { + region, _ := getRegionFromQueueURL(c.QueueURL, c.AWSConfig.Endpoint) + switch { + case region != "" && c.RegionName != "": + return fmt.Errorf("region_name <%s> must not be set with a queue_url containing a region name", c.RegionName) + case region == "" && c.RegionName == "": + return errors.New("region_name must be set for a queue_url not containing a region name") } } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 7212d034aa0e..d3dc078254fe 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -120,26 +120,71 @@ func TestConfig(t *testing.T) { config: mapstr.M{ "queue_url": queueURL, "region_name": "region", + "endpoint": "ep", }, expectedErr: "", expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { c := makeConfig(queueURL, "", "") c.RegionName = "region" + c.AWSConfig.Endpoint = "ep" return c }, }, { - name: "AWS endpoint with explicit region", + name: "explicit AWS endpoint with explicit region", queueURL: queueURL, s3Bucket: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", "region_name": "region", + "endpoint": "amazonaws.com", }, - expectedErr: "region_name must not be set with an amazonaws.com queue_url accessing config", + expectedErr: "region_name must not be set with a queue_url containing a region name", expectedCfg: nil, }, + { + name: "inferred AWS endpoint with explicit region", + queueURL: queueURL, + s3Bucket: "", + nonAWSS3Bucket: "", + config: mapstr.M{ + "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", + "region_name": "region", + }, + expectedErr: "region_name must not be set with a queue_url containing a region name", + expectedCfg: nil, + }, + { + name: "localstack_with_region_name", + queueURL: "http://localhost:4566/000000000000/sample-queue", + s3Bucket: "", + nonAWSS3Bucket: "", + config: mapstr.M{ + "queue_url": "http://localhost:4566/000000000000/sample-queue", + "region_name": "myregion", + }, + expectedErr: "", + expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "") + c.RegionName = "myregion" + return c + }, + }, + { + name: "localstack_no_region_name", + queueURL: "http://localhost:4566/000000000000/sample-queue", + s3Bucket: "", + nonAWSS3Bucket: "", + config: mapstr.M{ + "queue_url": "http://localhost:4566/000000000000/sample-queue", + }, + expectedErr: "region_name must be set for a queue_url not containing a region name accessing config", + expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "") + return c + }, + }, { name: "error on no queueURL and s3Bucket and nonAWSS3Bucket", queueURL: "", diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index ec05f0a1945a..c290d13140d3 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -311,9 +311,11 @@ func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) { return "", fmt.Errorf(queueURL + " is not a valid URL") } if url.Scheme == "https" && url.Host != "" { - queueHostSplit := strings.Split(url.Host, ".") - if len(queueHostSplit) > 2 && (strings.Join(queueHostSplit[2:], ".") == endpoint || (endpoint == "" && queueHostSplit[2] == "amazonaws")) { - return queueHostSplit[1], nil + queueHostSplit := strings.SplitN(url.Host, ".", 3) + if len(queueHostSplit) == 3 { + if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { + return queueHostSplit[1], nil + } } } return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")