Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b

// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
// is updated accordingly.
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
for blockID, blockErr := range partials {
// We can safely delete only blocks which are partial because the meta.json is missing.
if !errors.Is(blockErr, bucketindex.ErrBlockMetaNotFound) {
Expand Down Expand Up @@ -411,7 +411,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
}

// applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger) {
// The retention period of zero is a special value indicating to never delete.
if retention <= 0 {
return
Expand Down
143 changes: 143 additions & 0 deletions pkg/storage/bucket/sse_bucket_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package bucket

import (
"context"
"io"

"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/s3"

cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
)

// TenantConfigProvider defines a per-tenant config provider.
type TenantConfigProvider interface {
// S3SSEType returns the per-tenant S3 SSE type.
S3SSEType(userID string) string

// S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
S3SSEKMSKeyID(userID string) string

// S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
S3SSEKMSEncryptionContext(userID string) string
}

// SSEBucketClient is a wrapper around a objstore.BucketReader that configures the object
// storage server-side encryption (SSE) for a given user.
type SSEBucketClient struct {
userID string
bucket objstore.Bucket
cfgProvider TenantConfigProvider
}

// NewSSEBucketClient makes a new SSEBucketClient. The cfgProvider can be nil.
func NewSSEBucketClient(userID string, bucket objstore.Bucket, cfgProvider TenantConfigProvider) *SSEBucketClient {
return &SSEBucketClient{
userID: userID,
bucket: bucket,
cfgProvider: cfgProvider,
}
}

// Close implements objstore.Bucket.
func (b *SSEBucketClient) Close() error {
return b.bucket.Close()
}

// Upload the contents of the reader as an object into the bucket.
func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) error {
if sse, err := b.getCustomS3SSEConfig(); err != nil {
return err
} else if sse != nil {
// If the underlying bucket client is not S3 and a custom S3 SSE config has been
// provided, the config option will be ignored.
ctx = s3.ContextWithSSEConfig(ctx, sse)
}

return b.bucket.Upload(ctx, name, r)
}

// Delete implements objstore.Bucket.
func (b *SSEBucketClient) Delete(ctx context.Context, name string) error {
return b.bucket.Delete(ctx, name)
}

// Name implements objstore.Bucket.
func (b *SSEBucketClient) Name() string {
return b.bucket.Name()
}

func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) {
if b.cfgProvider == nil {
return nil, nil
}

// No S3 SSE override if the type override hasn't been provided.
sseType := b.cfgProvider.S3SSEType(b.userID)
if sseType == "" {
return nil, nil
}

cfg := cortex_s3.SSEConfig{
Type: sseType,
KMSKeyID: b.cfgProvider.S3SSEKMSKeyID(b.userID),
KMSEncryptionContext: b.cfgProvider.S3SSEKMSEncryptionContext(b.userID),
}

sse, err := cfg.BuildMinioConfig()
if err != nil {
return nil, errors.Wrapf(err, "unable to customise S3 SSE config for tenant %s", b.userID)
}

return sse, nil
}

// Iter implements objstore.Bucket.
func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bucket.Iter(ctx, dir, f, options...)
}

// Get implements objstore.Bucket.
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bucket.Get(ctx, name)
}

// GetRange implements objstore.Bucket.
func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bucket.GetRange(ctx, name, off, length)
}

// Exists implements objstore.Bucket.
func (b *SSEBucketClient) Exists(ctx context.Context, name string) (bool, error) {
return b.bucket.Exists(ctx, name)
}

// IsObjNotFoundErr implements objstore.Bucket.
func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}

// Attributes implements objstore.Bucket.
func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bucket.Attributes(ctx, name)
}

// ReaderWithExpectedErrs implements objstore.Bucket.
func (b *SSEBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.WithExpectedErrs(fn)
}

// WithExpectedErrs implements objstore.Bucket.
func (b *SSEBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket {
if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok {
return &SSEBucketClient{
userID: b.userID,
bucket: ib.WithExpectedErrs(fn),
cfgProvider: b.cfgProvider,
}
}

return b
}
124 changes: 124 additions & 0 deletions pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package bucket

import (
"context"
"encoding/base64"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
tests := map[string]struct {
withExpectedErrs bool
}{
"default client": {
withExpectedErrs: false,
},
"client with expected errors": {
withExpectedErrs: true,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
const (
kmsKeyID = "ABC"
kmsEncryptionContext = "{\"department\":\"10103.0\"}"
)

var req *http.Request

// Start a fake HTTP server which simulate S3.
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Keep track of the received request.
req = r

w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

s3Cfg := s3.Config{
Endpoint: srv.Listener.Addr().String(),
Region: "test",
BucketName: "test-bucket",
SecretAccessKey: flagext.Secret{Value: "test"},
AccessKeyID: "test",
Insecure: true,
}

s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger())
require.NoError(t, err)

// Configure the config provider with NO KMS key ID.
cfgProvider := &mockTenantConfigProvider{}

var sseBkt objstore.Bucket
if testData.withExpectedErrs {
sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider).WithExpectedErrs(s3Client.IsObjNotFoundErr)
} else {
sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider)
}

err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
require.NoError(t, err)

// Ensure NO KMS header has been injected.
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption"))
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context"))

// Configure the config provider with a KMS key ID and without encryption context.
cfgProvider.s3SseType = s3.SSEKMS
cfgProvider.s3KmsKeyID = kmsKeyID

err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
require.NoError(t, err)

// Ensure the KMS header has been injected.
assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption"))
assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context"))

// Configure the config provider with a KMS key ID and encryption context.
cfgProvider.s3SseType = s3.SSEKMS
cfgProvider.s3KmsKeyID = kmsKeyID
cfgProvider.s3KmsEncryptionContext = kmsEncryptionContext

err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
require.NoError(t, err)

// Ensure the KMS header has been injected.
assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption"))
assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
assert.Equal(t, base64.StdEncoding.EncodeToString([]byte(kmsEncryptionContext)), req.Header.Get("x-amz-server-side-encryption-context"))
})
}
}

type mockTenantConfigProvider struct {
s3SseType string
s3KmsKeyID string
s3KmsEncryptionContext string
}

func (m *mockTenantConfigProvider) S3SSEType(_ string) string {
return m.s3SseType
}

func (m *mockTenantConfigProvider) S3SSEKMSKeyID(_ string) string {
return m.s3KmsKeyID
}

func (m *mockTenantConfigProvider) S3SSEKMSEncryptionContext(_ string) string {
return m.s3KmsEncryptionContext
}
Loading