Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed May 25, 2023
1 parent 4c58765 commit d639d7e
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 69 deletions.
1 change: 0 additions & 1 deletion CHANGELOG-developer.next.asciidoc
Expand Up @@ -154,7 +154,6 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add the file path of the instance lock on the error when it's is already locked {pull}33788[33788]
- Add DropFields processor to js API {pull}33458[33458]
- Add support for different folders when testing data {pull}34467[34467]
- Allow non-AWS endpoints for testing Filebeat awss3 input. {issue}35496[35496] {pull}35520[35520]

==== Deprecated

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -288,6 +288,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add execution budget to CEL input. {pull}35409[35409]
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
- Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507]
- Allow non-AWS endpoints for awss3 input. {issue}35496[35496] {pull}35520[35520]

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Expand Up @@ -258,10 +258,10 @@ configuring multiline options.
URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` and `non_aws_bucket_name` are not set).

[float]
==== `region_name`
==== `region`

The name of the AWS region of the end point for testing purposes. This option
must not be set when using amazonaws end points that provide a region.
The name of the AWS region of the end point. If this option is given it
takes precedence over the region name obtained from the `queue_url` value.

[float]
==== `visibility_timeout`
Expand Down
9 changes: 1 addition & 8 deletions x-pack/filebeat/input/awss3/config.go
Expand Up @@ -27,7 +27,7 @@ type config struct {
SQSScript *scriptConfig `config:"sqs.notification_parsing_script"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url"`
RegionName string `config:"region_name"`
RegionName string `config:"region"`
BucketARN string `config:"bucket_arn"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
Expand Down Expand Up @@ -79,13 +79,6 @@ func (c *config) Validate() error {
return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers)
}

if c.QueueURL != "" {
region, _ := getRegionFromQueueURL(c.QueueURL, c.AWSConfig.Endpoint)
if region != "" && c.RegionName != "" {
return fmt.Errorf("region_name <%s> must not be set with a queue_url containing a region name", c.RegionName)
}
}

if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) {
return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+
"less than or equal to 12h", c.VisibilityTimeout)
Expand Down
61 changes: 36 additions & 25 deletions x-pack/filebeat/input/awss3/config_test.go
Expand Up @@ -57,7 +57,7 @@ func TestConfig(t *testing.T) {
nonAWSS3Bucket string
config mapstr.M
expectedErr string
expectedCfg func(queueURL, s3Bucket string, nonAWSS3Bucket string) config
expectedCfg func(queueURL, s3Bucket, nonAWSS3Bucket string) config
}{
{
name: "input with defaults for queueURL",
Expand All @@ -80,7 +80,7 @@ func TestConfig(t *testing.T) {
"number_of_workers": 5,
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig("", s3Bucket, "")
c.NumberOfWorkers = 5
return c
Expand All @@ -100,7 +100,7 @@ func TestConfig(t *testing.T) {
},
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig(queueURL, "", "")
regex := match.MustCompile("/CloudTrail/")
c.FileSelectors = []fileSelectorConfig{
Expand All @@ -113,59 +113,70 @@ func TestConfig(t *testing.T) {
},
},
{
name: "non-AWS endpoint with explicit region",
name: "non-AWS_endpoint_with_explicit_region",
queueURL: queueURL,
s3Bucket: "",
nonAWSS3Bucket: "",
config: mapstr.M{
"queue_url": queueURL,
"region_name": "region",
"endpoint": "ep",
"queue_url": queueURL,
"region": "region",
"endpoint": "ep",
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig(queueURL, "", "")
c.RegionName = "region"
c.AWSConfig.Endpoint = "ep"
return c
},
},
{
name: "explicit AWS endpoint with explicit region",
name: "explicit_AWS_endpoint_with_explicit_region",
queueURL: queueURL,
s3Bucket: "",
nonAWSS3Bucket: "",
config: mapstr.M{
"queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
"region_name": "region",
"endpoint": "amazonaws.com",
"queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
"region": "region",
"endpoint": "amazonaws.com",
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig(queueURL, "", "")
c.QueueURL = "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs"
c.AWSConfig.Endpoint = "amazonaws.com"
c.RegionName = "region"
return c
},
expectedErr: "region_name <region> must not be set with a queue_url containing a region name",
expectedCfg: nil,
},
{
name: "inferred AWS endpoint with explicit region",
name: "inferred_AWS_endpoint_with_explicit_region",
queueURL: queueURL,
s3Bucket: "",
nonAWSS3Bucket: "",
config: mapstr.M{
"queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
"region_name": "region",
"queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
"region": "region",
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig(queueURL, "", "")
c.QueueURL = "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs"
c.RegionName = "region"
return c
},
expectedErr: "region_name <region> must not be set with a queue_url containing a region name",
expectedCfg: nil,
},
{
name: "localstack_with_region_name",
queueURL: "http://localhost:4566/000000000000/sample-queue",
s3Bucket: "",
nonAWSS3Bucket: "",
config: mapstr.M{
"queue_url": "http://localhost:4566/000000000000/sample-queue",
"region_name": "myregion",
"queue_url": "http://localhost:4566/000000000000/sample-queue",
"region": "myregion",
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig(queueURL, "", "")
c.RegionName = "myregion"
return c
Expand Down Expand Up @@ -365,7 +376,7 @@ func TestConfig(t *testing.T) {
"number_of_workers": 5,
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig("", "", nonAWSS3Bucket)
c.NumberOfWorkers = 5
return c
Expand Down Expand Up @@ -448,7 +459,7 @@ func TestConfig(t *testing.T) {
"number_of_workers": 5,
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig("", s3Bucket, "")
c.BackupConfig.BackupToBucketArn = "arn:aws:s3:::bBucket"
c.BackupConfig.BackupToBucketPrefix = "backup"
Expand All @@ -468,7 +479,7 @@ func TestConfig(t *testing.T) {
"number_of_workers": 5,
},
expectedErr: "",
expectedCfg: func(queueURL, s3Bucket string, nonAWSS3Bucket string) config {
expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config {
c := makeConfig("", "", nonAWSS3Bucket)
c.NonAWSBucketName = nonAWSS3Bucket
c.BackupConfig.NonAWSBackupToBucketName = "bBucket"
Expand Down
18 changes: 11 additions & 7 deletions x-pack/filebeat/input/awss3/input.go
Expand Up @@ -115,12 +115,14 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
defer cancelInputCtx()

if in.config.QueueURL != "" {
regionName := in.config.RegionName
if regionName == "" {
regionName, err = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
if err != nil {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
}
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
if err != nil && in.config.RegionName == "" {
return fmt.Errorf("failed to get AWS region from queue_url: %w", err)
}
if in.config.RegionName != "" && regionName != in.config.RegionName {
inputContext.Logger.Warnf("configured region disagrees with queue_url region: %q != %q: using %[1]q",
in.config.RegionName, regionName)
regionName = in.config.RegionName
}

in.awsConfig.Region = regionName
Expand Down Expand Up @@ -303,6 +305,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
return s3Poller, nil
}

var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")

func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) {
// get region from queueURL
// Example: https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs
Expand All @@ -318,7 +322,7 @@ func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) {
}
}
}
return "", fmt.Errorf("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")
return "", errBadQueueURL
}

func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) {
Expand Down
123 changes: 98 additions & 25 deletions x-pack/filebeat/input/awss3/input_test.go
Expand Up @@ -5,35 +5,108 @@
package awss3

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetProviderFromDomain(t *testing.T) {
assert.Equal(t, "aws", getProviderFromDomain("", ""))
assert.Equal(t, "aws", getProviderFromDomain("c2s.ic.gov", ""))
assert.Equal(t, "abc", getProviderFromDomain("abc.com", "abc"))
assert.Equal(t, "xyz", getProviderFromDomain("oraclecloud.com", "xyz"))
assert.Equal(t, "aws", getProviderFromDomain("amazonaws.com", ""))
assert.Equal(t, "aws", getProviderFromDomain("c2s.sgov.gov", ""))
assert.Equal(t, "aws", getProviderFromDomain("c2s.ic.gov", ""))
assert.Equal(t, "aws", getProviderFromDomain("amazonaws.com.cn", ""))
assert.Equal(t, "backblaze", getProviderFromDomain("https://backblazeb2.com", ""))
assert.Equal(t, "cloudflare", getProviderFromDomain("https://1234567890.r2.cloudflarestorage.com", ""))
assert.Equal(t, "wasabi", getProviderFromDomain("https://wasabisys.com", ""))
assert.Equal(t, "digitalocean", getProviderFromDomain("https://digitaloceanspaces.com", ""))
assert.Equal(t, "dreamhost", getProviderFromDomain("https://dream.io", ""))
assert.Equal(t, "scaleway", getProviderFromDomain("https://scw.cloud", ""))
assert.Equal(t, "gcp", getProviderFromDomain("https://googleapis.com", ""))
assert.Equal(t, "arubacloud", getProviderFromDomain("https://cloud.it", ""))
assert.Equal(t, "linode", getProviderFromDomain("https://linodeobjects.com", ""))
assert.Equal(t, "vultr", getProviderFromDomain("https://vultrobjects.com", ""))
assert.Equal(t, "ibm", getProviderFromDomain("https://appdomain.cloud", ""))
assert.Equal(t, "alibaba", getProviderFromDomain("https://aliyuncs.com", ""))
assert.Equal(t, "oracle", getProviderFromDomain("https://oraclecloud.com", ""))
assert.Equal(t, "exoscale", getProviderFromDomain("https://exo.io", ""))
assert.Equal(t, "upcloud", getProviderFromDomain("https://upcloudobjects.com", ""))
assert.Equal(t, "iland", getProviderFromDomain("https://ilandcloud.com", ""))
assert.Equal(t, "zadara", getProviderFromDomain("https://zadarazios.com", ""))
tests := []struct {
endpoint string
override string
want string
}{
{endpoint: "", override: "", want: "aws"},
{endpoint: "c2s.ic.gov", want: "aws"},
{endpoint: "abc.com", override: "abc", want: "abc"},
{endpoint: "oraclecloud.com", override: "xyz", want: "xyz"},
{endpoint: "amazonaws.com", want: "aws"},
{endpoint: "c2s.sgov.gov", want: "aws"},
{endpoint: "c2s.ic.gov", want: "aws"},
{endpoint: "amazonaws.com.cn", want: "aws"},
{endpoint: "https://backblazeb2.com", want: "backblaze"},
{endpoint: "https://1234567890.r2.cloudflarestorage.com", want: "cloudflare"},
{endpoint: "https://wasabisys.com", want: "wasabi"},
{endpoint: "https://digitaloceanspaces.com", want: "digitalocean"},
{endpoint: "https://dream.io", want: "dreamhost"},
{endpoint: "https://scw.cloud", want: "scaleway"},
{endpoint: "https://googleapis.com", want: "gcp"},
{endpoint: "https://cloud.it", want: "arubacloud"},
{endpoint: "https://linodeobjects.com", want: "linode"},
{endpoint: "https://vultrobjects.com", want: "vultr"},
{endpoint: "https://appdomain.cloud", want: "ibm"},
{endpoint: "https://aliyuncs.com", want: "alibaba"},
{endpoint: "https://oraclecloud.com", want: "oracle"},
{endpoint: "https://exo.io", want: "exoscale"},
{endpoint: "https://upcloudobjects.com", want: "upcloud"},
{endpoint: "https://ilandcloud.com", want: "iland"},
{endpoint: "https://zadarazios.com", want: "zadara"},
}

for _, test := range tests {
assert.Equal(t, test.want, getProviderFromDomain(test.endpoint, test.override),
"for endpoint=%q and override=%q", test.endpoint, test.override)
}
}

func TestGetRegionFromQueueURL(t *testing.T) {
tests := []struct {
name string
queueURL string
endpoint string
want string
wantErr error
}{
{
name: "amazonaws.com_domain_with_blank_endpoint",
queueURL: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_matching_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "abc.xyz",
want: "us-east-1",
},
{
name: "abc.xyz_and_domain_with_blank_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
wantErr: errBadQueueURL,
},
{
name: "abc.xyz_and_domain_with_different_endpoint",
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
endpoint: "googlecloud.com",
wantErr: errBadQueueURL,
},
{
name: "invalid_queue_url",
queueURL: ":foo",
wantErr: errors.New(":foo is not a valid URL"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := getRegionFromQueueURL(test.queueURL, test.endpoint)
if !sameError(err, test.wantErr) {
t.Errorf("unexpected error: got:%v want:%v", err, test.wantErr)
}
if got != test.want {
t.Errorf("unexpected result: got:%q want:%q", got, test.want)
}
})
}
}

func sameError(a, b error) bool {
switch {
case a == nil && b == nil:
return true
case a == nil, b == nil:
return false
default:
return a.Error() == b.Error()
}
}

0 comments on commit d639d7e

Please sign in to comment.