Skip to content

Commit

Permalink
[8.4](backport #32921) Refactor endpoint resolver for AWS services (#…
Browse files Browse the repository at this point in the history
…33014)

* Refactor endpoint resolver for AWS services (#32921)

* Add signing region

* Add changelog entry

* remove awscommon.EnrichAWSConfigWithEndpoint

* add nonAWSBucketResolver when recreating the s3 client for new region

* remove unused functions

* changelog

* fix changelog

* fix docs

Co-authored-by: Andrea Spacca <andrea.spacca@elastic.co>
(cherry picked from commit d0bc413)

# Conflicts:
#	x-pack/libbeat/common/aws/credentials.go
#	x-pack/libbeat/common/aws/credentials_test.go

* fix merge

Co-authored-by: Mario Castro <mariocaster@gmail.com>
Co-authored-by: Andrea Spacca <andrea.spacca@elastic.co>
  • Loading branch information
3 people committed Sep 8, 2022
1 parent b821413 commit 4d8b390
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 236 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Expand Up @@ -35,7 +35,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*

- Fix in AWS related services initialisation relying on custom endpoint resolver. {issue}32888[32888] {pull}32921[32921]

*Auditbeat*

Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Expand Up @@ -65,8 +65,7 @@ Listing of the S3 bucket will be polled according the time interval defined by
The `aws-s3` input can also poll 3rd party S3 compatible services such as the self hosted Minio.
Using non-AWS S3 compatible buckets requires the use of `access_key_id` and `secret_access_key` for authentication.
To specify the S3 bucket name, use the `non_aws_bucket_name` config and the `endpoint` must be set to replace the default API endpoint.
`endpoint` should be a full URI in the form of `https(s)://<s3 endpoint>`, that will be used as the API endpoint of the service, or a single domain.
If a domain is provided, the full endpoint URI will be constructed with the region name in the standard form of `https://s3.<region>.<domain>` supported by AWS and several 3rd party providers.
`endpoint` should be a full URI in the form of `https(s)://<s3 endpoint>` in the case of `non_aws_bucket_name`, that will be used as the API endpoint of the service.
No `endpoint` is needed if using the native AWS S3 service hosted at `amazonaws.com`.
Please see <<aws-credentials-config,Configuration parameters>> for alternate AWS domains that require a different endpoint.

Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/input/awscloudwatch/input.go
Expand Up @@ -120,9 +120,11 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
}
defer client.Close()

logsServiceName := awscommon.CreateServiceName("logs", in.config.AWSConfig.FIPSEnabled, in.config.RegionName)
cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig)
svc := cloudwatchlogs.NewFromConfig(cwConfig)
svc := cloudwatchlogs.NewFromConfig(in.awsConfig, func(o *cloudwatchlogs.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})

logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName)
if err != nil {
Expand Down
44 changes: 34 additions & 10 deletions x-pack/filebeat/input/awss3/input.go
Expand Up @@ -155,27 +155,31 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsReader, error) {
s3ServiceName := awscommon.CreateServiceName("s3", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)
sqsServiceName := awscommon.CreateServiceName("sqs", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)

sqsAPI := &awsSQSAPI{
client: sqs.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, sqsServiceName, in.awsConfig.Region, in.awsConfig)),
client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}),
queueURL: in.config.QueueURL,
apiTimeout: in.config.APITimeout,
visibilityTimeout: in.config.VisibilityTimeout,
longPollWaitTime: in.config.SQSWaitTime,
}

s3API := &awsS3API{
client: s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)),
client: s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}),
}

log := ctx.Logger.With("queue_url", in.config.QueueURL)
log.Infof("AWS api_timeout is set to %v.", in.config.APITimeout)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout)
log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)
Expand All @@ -195,8 +199,15 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
return sqsReader, nil
}

type nonAWSBucketResolver struct {
endpoint string
}

func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) {
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}

func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) {
s3ServiceName := awscommon.CreateServiceName("s3", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)
var bucketName string
var bucketID string
if in.config.NonAWSBucketName != "" {
Expand All @@ -207,7 +218,14 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
bucketID = in.config.BucketARN
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig), func(o *s3.Options) {
s3Client := s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint}
}

if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle
})
regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName)
Expand All @@ -220,7 +238,14 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
in.awsConfig.Region = regionName

if regionName != originalAwsConfigRegion {
s3Client = s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig), func(o *s3.Options) {
s3Client = s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint}
}

if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle
})
}
Expand All @@ -234,7 +259,6 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Expand Up @@ -398,7 +398,7 @@ func TestGetRegionForBucketARN(t *testing.T) {
t.Fatal(err)
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", cfg))
s3Client := s3.NewFromConfig(cfg)

regionName, err := getRegionForBucket(context.Background(), s3Client, getBucketNameFromARN(tfConfig.BucketName))
assert.NoError(t, err)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestPaginatorListPrefix(t *testing.T) {
t.Fatal(err)
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", cfg))
s3Client := s3.NewFromConfig(cfg)

s3API := &awsS3API{
client: s3Client,
Expand Down
18 changes: 12 additions & 6 deletions x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go
Expand Up @@ -7,6 +7,8 @@ package ec2
import (
"fmt"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/gofrs/uuid"

Expand Down Expand Up @@ -63,9 +65,11 @@ func AutodiscoverBuilder(
if config.Regions == nil {
// set default region to make initial aws api call
awsCfg.Region = "us-west-1"
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, awsCfg.Region)
svcEC2 := ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, awsCfg.Region, awsCfg))
svcEC2 := ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})

completeRegionsList, err := awsauto.GetRegions(svcEC2)
if err != nil {
Expand All @@ -81,9 +85,11 @@ func AutodiscoverBuilder(
logp.Error(fmt.Errorf("error loading AWS config for aws_ec2 autodiscover provider: %w", err))
}
awsCfg.Region = region
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, region)
clients = append(clients, ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, region, awsCfg)))
clients = append(clients, ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}))
}

return internalBuilder(uuid, bus, config, newAPIFetcher(clients), keystore)
Expand Down
19 changes: 13 additions & 6 deletions x-pack/libbeat/autodiscover/providers/aws/elb/provider.go
Expand Up @@ -5,6 +5,7 @@
package elb

import (
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
"github.com/gofrs/uuid"
Expand Down Expand Up @@ -64,9 +65,12 @@ func AutodiscoverBuilder(

// Construct MetricSet with a full regions list if there is no region specified.
if config.Regions == nil {
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, awsCfg.Region)
svcEC2 := ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, awsCfg.Region, awsCfg))
svcEC2 := ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}

})

completeRegionsList, err := awsauto.GetRegions(svcEC2)
if err != nil {
Expand All @@ -88,9 +92,12 @@ func AutodiscoverBuilder(
logp.Err("error loading AWS config for aws_elb autodiscover provider: %s", err)
}
awsCfg.Region = region
elbServiceName := awscommon.CreateServiceName("elasticloadbalancing", config.AWSConfig.FIPSEnabled, region)
clients = append(clients, elasticloadbalancingv2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, elbServiceName, region, awsCfg)))
clients = append(clients, elasticloadbalancingv2.NewFromConfig(awsCfg, func(o *elasticloadbalancingv2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}

}))
}

return internalBuilder(uuid, bus, config, newAPIFetcher(clients), keystore)
Expand Down
38 changes: 0 additions & 38 deletions x-pack/libbeat/common/aws/credentials.go
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"net/http"
"net/url"
"strings"

"github.com/aws/aws-sdk-go-v2/service/sts"

Expand Down Expand Up @@ -164,40 +163,3 @@ func addStaticCredentialsProviderToAwsConfig(beatsConfig ConfigAWS, awsConfig *a

awsConfig.Credentials = staticCredentialsProvider
}

// EnrichAWSConfigWithEndpoint function enabled endpoint resolver for AWS service clients when endpoint is given in config.
func EnrichAWSConfigWithEndpoint(endpoint string, serviceName string, regionName string, beatsConfig awssdk.Config) awssdk.Config {
var eurl string
if endpoint != "" {
parsedEndpoint, _ := url.Parse(endpoint)

// Beats uses the provided endpoint if the scheme is present or...
if parsedEndpoint.Scheme != "" {
eurl = endpoint
} else {
// ...build one by using the scheme, service and region names.
if regionName == "" {
eurl = "https://" + serviceName + "." + endpoint
} else {
eurl = "https://" + serviceName + "." + regionName + "." + endpoint
}
}

beatsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(
func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{URL: eurl}, nil
})
}
return beatsConfig
}

// CreateServiceName based on Service name, Region and FIPS. Returns service name if Fips is not enabled.
func CreateServiceName(serviceName string, fipsEnabled bool, region string) string {
if fipsEnabled {
_, found := OptionalGovCloudFIPS[serviceName]
if !strings.HasPrefix(region, "us-gov-") || found {
return serviceName + "-fips"
}
}
return serviceName
}

0 comments on commit 4d8b390

Please sign in to comment.