Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 2 additions & 68 deletions pkg/storage/chunk/client/aws/config.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,12 @@
// Provenance-includes-location: https://github.com/weaveworks/common/blob/main/aws/config.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: Weaveworks Ltd.

package aws

import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

// DynamoConfigFromURL returns AWS config from given URL. It expects escaped
// AWS Access key ID & Secret Access Key to be encoded in the URL. It
// also expects region specified as a host (letting AWS generate full
// endpoint) or fully valid endpoint with dummy region assumed (e.g
// for URLs to emulated services).
func DynamoConfigFromURL(awsURL *url.URL) (*dynamodb.Options, error) {
httpClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 100,
TLSHandshakeTimeout: 3 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
config := dynamodb.Options{HTTPClient: httpClient}

// Use a custom http.Client with the golang defaults but also specifying
// MaxIdleConnsPerHost because of a bug in golang https://github.com/golang/go/issues/13801
// where MaxIdleConnsPerHost does not work as expected.

if awsURL.User != nil {
username := awsURL.User.Username()
password, _ := awsURL.User.Password()

// We request at least the username or password being set to enable the static credentials.
if username != "" || password != "" {
config.Credentials = credentials.NewStaticCredentialsProvider(username, password, "")
}
}

if strings.Contains(awsURL.Host, ".") {
region := os.Getenv("AWS_REGION")
if region == "" {
region = "dummy"
}
config.Region = region
if awsURL.Scheme == "https" {
config.BaseEndpoint = aws.String(fmt.Sprintf("https://%s", awsURL.Host))
}
config.BaseEndpoint = aws.String(fmt.Sprintf("http://%s", awsURL.Host))
} else {
config.Region = awsURL.Host
}
// Let AWS generate default endpoint based on region passed as a host in URL.
return &config, nil
}
const InvalidAWSRegion = "dummy"

func CredentialsFromURL(awsURL *url.URL) (key, secret string) {
func credentialsFromURL(awsURL *url.URL) (key, secret string) {
if awsURL.User != nil {
username := awsURL.User.Username()
password, _ := awsURL.User.Password()
Expand Down
135 changes: 134 additions & 1 deletion pkg/storage/chunk/client/aws/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import (
"net/url"
"testing"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)

func TestCredentialsFromURL(t *testing.T) {
Expand Down Expand Up @@ -44,16 +48,145 @@ func TestCredentialsFromURL(t *testing.T) {
expectedKey: "key",
expectedSecret: "secret",
},
{
name: "URL with credentials and bucket",
urlStr: "s3://key:secret@us-east-1/bucket",
expectedKey: "key",
expectedSecret: "secret",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
u, err := url.Parse(tt.urlStr)
require.NoError(t, err)

key, secret := CredentialsFromURL(u)
key, secret := credentialsFromURL(u)
require.Equal(t, tt.expectedKey, key)
require.Equal(t, tt.expectedSecret, secret)
})
}
}

func urlValue(s string) flagext.URLValue {
url := &flagext.URLValue{}
_ = url.Set(s)
return *url
}

func TestS3ClientOptions(t *testing.T) {

t.Run("s3 schema with region as hostname", func(t *testing.T) {
cfg := S3Config{
S3: urlValue("s3://us-east-0/bucket"),
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

require.Equal(t, "us-east-0", opts.Region)
require.Nil(t, opts.BaseEndpoint)
})

t.Run("https schema with region as hostname", func(t *testing.T) {
cfg := S3Config{
S3: urlValue("https://us-east-0/bucket"),
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

require.Equal(t, "us-east-0", opts.Region)
require.Nil(t, opts.BaseEndpoint)
})

t.Run("s3 schema with endpoint hostname", func(t *testing.T) {
cfg := S3Config{
S3: urlValue("s3://s3.us-east-0.amazonaws.com/bucket"),
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

require.Equal(t, "", opts.Region)
require.Equal(t, "http://s3.us-east-0.amazonaws.com", *opts.BaseEndpoint)
})

t.Run("https schema with endpoint hostname", func(t *testing.T) {
cfg := S3Config{
S3: urlValue("https://s3.us-east-0.amazonaws.com/bucket"),
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

require.Equal(t, "", opts.Region)
require.Equal(t, "https://s3.us-east-0.amazonaws.com", *opts.BaseEndpoint)
})

t.Run("access key and secret in url", func(t *testing.T) {
cfg := S3Config{
S3: urlValue("s3://accesskey:secret@us-east-0/bucket"),
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

cred, err := opts.Credentials.Retrieve(t.Context())
require.NoError(t, err)
require.Equal(t, "accesskey", cred.AccessKeyID)
require.Equal(t, "secret", cred.SecretAccessKey)
})

t.Run("fields override url config", func(t *testing.T) {
cfg := S3Config{
S3: urlValue("s3://accesskey:secret@us-east-0/bucket"),
AccessKeyID: "loki",
SecretAccessKey: flagext.SecretWithValue("s3cr3t"),
Endpoint: "s3.eu-south-0.amazonaws.com",
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

cred, err := opts.Credentials.Retrieve(t.Context())
require.NoError(t, err)
require.Equal(t, "loki", cred.AccessKeyID)
require.Equal(t, "s3cr3t", cred.SecretAccessKey)
require.Equal(t, "https://s3.eu-south-0.amazonaws.com", *opts.BaseEndpoint)
})

t.Run("insecure=false flag uses https for endpoint", func(t *testing.T) {
cfg := S3Config{
Endpoint: "minio:9100",
Insecure: false,
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

require.Equal(t, "https://minio:9100", *opts.BaseEndpoint)
})

t.Run("insecure=true flag uses http for endpoint", func(t *testing.T) {
cfg := S3Config{
Endpoint: "minio:9100",
Insecure: true,
}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

require.Equal(t, "http://minio:9100", *opts.BaseEndpoint)
})

t.Run("region is set to invalid when url is not present", func(t *testing.T) {
cfg := S3Config{}
fn, _ := s3ClientConfigFunc(cfg, hedging.Config{}, false)
opts := s3.Options{}
fn(&opts)

require.Equal(t, InvalidAWSRegion, opts.Region)
})

}
71 changes: 54 additions & 17 deletions pkg/storage/chunk/client/aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"net"
"net/http"
"net/url"
"os"
"reflect"
"strings"
"time"

"github.com/IBM/ibm-cos-sdk-go/aws"
"github.com/IBM/ibm-cos-sdk-go/aws/request"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/smithy-go"
Expand Down Expand Up @@ -922,28 +924,63 @@ func dynamoOptionsFromURL(awsURL *url.URL) (dynamodb.Options, error) {
if len(path) > 0 {
level.Warn(log.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path)
}
config, err := DynamoConfigFromURL(awsURL)
config, err := dynamodbOptionsFromURL(awsURL)
if err != nil {
return dynamodb.Options{}, err
}
config.RetryMaxAttempts = 0 // We do our own retries, so we can monitor them
config.HTTPClient = &http.Client{Transport: defaultTransport}
return *config, nil
}

// Copy-pasted http.DefaultTransport
var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
// We will connect many times in parallel to the same DynamoDB server,
// see https://github.com/golang/go/issues/13801
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// Deprecated: Has been copied from ConfigFromURL which was used for generating the configuration for both S3 and DynamoDB
// when the Amazon AWS SKD v1 was still used.
func dynamodbOptionsFromURL(awsURL *url.URL) (*dynamodb.Options, error) {
httpClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
// We will connect many times in parallel to the same DynamoDB server,
// see https://github.com/golang/go/issues/13801
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
config := dynamodb.Options{HTTPClient: httpClient}

// Use a custom http.Client with the golang defaults but also specifying
// MaxIdleConnsPerHost because of a bug in golang https://github.com/golang/go/issues/13801
// where MaxIdleConnsPerHost does not work as expected.

if awsURL.User != nil {
key, secret := credentialsFromURL(awsURL)

// We request at least the username or password being set to enable the static credentials.
if key != "" || secret != "" {
config.Credentials = credentials.NewStaticCredentialsProvider(key, secret, "")
}
}

if strings.Contains(awsURL.Host, ".") {
region := os.Getenv("AWS_REGION")
if region == "" {
region = InvalidAWSRegion
}
config.Region = region
if awsURL.Scheme == "https" {
config.BaseEndpoint = aws.String(fmt.Sprintf("%s://%s", awsURL.Scheme, awsURL.Host))
} else {
config.BaseEndpoint = aws.String(fmt.Sprintf("http://%s", awsURL.Host))
}
} else {
config.Region = awsURL.Host
}
// Let AWS generate default endpoint based on region passed as a host in URL.
return &config, nil
}
Loading