/
sse_bucket_client.go
168 lines (136 loc) · 5.12 KB
/
sse_bucket_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package bucket
import (
"context"
"io"
"github.com/gogo/status"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/s3"
"google.golang.org/grpc/codes"
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
)
// TenantConfigProvider defines a per-tenant config provider.
type TenantConfigProvider interface {
// S3SSEType returns the per-tenant S3 SSE type.
S3SSEType(userID string) string
// S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
S3SSEKMSKeyID(userID string) string
// S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
S3SSEKMSEncryptionContext(userID string) string
}
// SSEBucketClient is a wrapper around a objstore.BucketReader that configures the object
// storage server-side encryption (SSE) for a given user.
type SSEBucketClient struct {
userID string
bucket objstore.Bucket
cfgProvider TenantConfigProvider
}
// NewSSEBucketClient makes a new SSEBucketClient. The cfgProvider can be nil.
func NewSSEBucketClient(userID string, bucket objstore.Bucket, cfgProvider TenantConfigProvider) *SSEBucketClient {
return &SSEBucketClient{
userID: userID,
bucket: bucket,
cfgProvider: cfgProvider,
}
}
// Close implements objstore.Bucket.
func (b *SSEBucketClient) Close() error {
return b.bucket.Close()
}
// Upload the contents of the reader as an object into the bucket.
func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) error {
if sse, err := b.getCustomS3SSEConfig(); err != nil {
return err
} else if sse != nil {
// If the underlying bucket client is not S3 and a custom S3 SSE config has been
// provided, the config option will be ignored.
ctx = s3.ContextWithSSEConfig(ctx, sse)
}
return b.bucket.Upload(ctx, name, r)
}
// Delete implements objstore.Bucket.
func (b *SSEBucketClient) Delete(ctx context.Context, name string) error {
return b.bucket.Delete(ctx, name)
}
// Name implements objstore.Bucket.
func (b *SSEBucketClient) Name() string {
return b.bucket.Name()
}
func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) {
if b.cfgProvider == nil {
return nil, nil
}
// No S3 SSE override if the type override hasn't been provided.
sseType := b.cfgProvider.S3SSEType(b.userID)
if sseType == "" {
return nil, nil
}
cfg := cortex_s3.SSEConfig{
Type: sseType,
KMSKeyID: b.cfgProvider.S3SSEKMSKeyID(b.userID),
KMSEncryptionContext: b.cfgProvider.S3SSEKMSEncryptionContext(b.userID),
}
sse, err := cfg.BuildMinioConfig()
if err != nil {
return nil, errors.Wrapf(err, "unable to customise S3 SSE config for tenant %s", b.userID)
}
return sse, nil
}
// Iter implements objstore.Bucket.
func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bucket.Iter(ctx, dir, f, options...)
}
// Get implements objstore.Bucket.
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
r, err := b.bucket.Get(ctx, name)
if err != nil && b.IsAccessDeniedErr(err) {
// Store gateway will return the status if the returned error is an `status.Error`
return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error()))
}
return r, err
}
// GetRange implements objstore.Bucket.
func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
r, err := b.bucket.GetRange(ctx, name, off, length)
if err != nil && b.IsAccessDeniedErr(err) {
return nil, cortex_errors.WithCause(err, status.Error(codes.PermissionDenied, err.Error()))
}
return r, err
}
// Exists implements objstore.Bucket.
func (b *SSEBucketClient) Exists(ctx context.Context, name string) (bool, error) {
return b.bucket.Exists(ctx, name)
}
// IsObjNotFoundErr implements objstore.Bucket.
func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
return b.bucket.IsObjNotFoundErr(err)
}
// IsAccessDeniedErr implements objstore.Bucket.
func (b *SSEBucketClient) IsAccessDeniedErr(err error) bool {
// unwrap error
if se, ok := err.(interface{ Err() error }); ok {
return b.bucket.IsAccessDeniedErr(se.Err()) || b.bucket.IsAccessDeniedErr(err)
}
return b.bucket.IsAccessDeniedErr(err)
}
// Attributes implements objstore.Bucket.
func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bucket.Attributes(ctx, name)
}
// ReaderWithExpectedErrs implements objstore.Bucket.
func (b *SSEBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.WithExpectedErrs(fn)
}
// WithExpectedErrs implements objstore.Bucket.
func (b *SSEBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket {
if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok {
return &SSEBucketClient{
userID: b.userID,
bucket: ib.WithExpectedErrs(fn),
cfgProvider: b.cfgProvider,
}
}
return b
}