forked from asonawalla/gazette
/
s3_fs.go
410 lines (358 loc) · 12.3 KB
/
s3_fs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package cloudstore
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
gzip "github.com/youtube/vitess/go/cgzip"
"github.com/LiveRamp/gazette/pkg/keepalive"
)
const (
AWSAccessKeyID = "AWSAccessKeyID"
AWSSecretAccessKey = "AWSSecretAccessKey"
S3Region = "S3Region"
S3GlobalCannedACL = "S3GlobalCannedACL"
S3SSEAlgorithm = "S3SSEAlgorithm"
)
// Maps Amazon S3 into an API compatible with cloudstore.FileSystem.
type s3Fs struct {
properties Properties
// Prefix roots all files within this filesystem.
prefix string
compress bool
}
type S3Properties map[string]string
func (s S3Properties) Get(key string) string {
return s[key]
}
func newS3FS(properties Properties, prefix string, compress bool) (*s3Fs, error) {
return &s3Fs{
properties: properties,
prefix: prefix,
compress: compress,
}, nil
}
// Opens a S3 file for reading or for writing (O_RDWR is not supported).
// O_CREATE is enforced, and O_CREATE|O_EXCL is emulated (best effort) by
// checking for file existence prior to open. Files opened for O_RDONLY are
// not actually opened for reading by this call (they're only stat'd): rather,
// read opens happen lazily, on the first Read() call.
func (fs *s3Fs) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
// TODO(johnny): |perm| is currently ignored. Should these be mapped
// into owner / group / everyone ACL's?
bucket, path := pathToBucketAndSubpath(fs.prefix, name)
var svc = fs.svc()
var isDir = isBucketStoreDir(path)
var statObject *s3.Object
var exists bool
// Check the current status of |path| on cloud storage. First determine if
// |path| is a regular file via StatObject(). If not, determine if |path|
// should be treated as a directory by quering for subordinate files.
if !isDir {
var headParams = s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
}
resp, err := svc.HeadObject(&headParams)
if isAWSNotFound(err) {
// File does not exist, not a fatal error.
} else if err != nil {
// Unrelated error, treat as fatal.
return nil, fmt.Errorf("s3 head file object: %s", err)
} else {
// File exists; make a *s3.Object from the HeadObject result.
statObject = &s3.Object{
ETag: resp.ETag,
Key: aws.String(path),
LastModified: resp.LastModified,
// Owner field not set by this implementation.
Size: resp.ContentLength,
StorageClass: resp.StorageClass,
}
exists = true
}
if !exists {
// |path| doesn't have a '/' suffix, but could still be a directory.
// Check by querying with a suffix, and seeing if any files are returned.
// TODO(joshk): This query could store the first N files in the prefix,
// but instead we just repeat it later in Readdir().
var listParams = s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(1),
Prefix: aws.String(path + "/"),
}
listObjects, err := svc.ListObjectsV2(&listParams)
if err != nil {
return nil, fmt.Errorf("s3 list objects: %s", err)
} else if len(listObjects.Contents) != 0 || len(listObjects.CommonPrefixes) != 0 {
// Rewrite |path| as a directory.
path = path + "/"
isDir = true
}
}
}
// Is this a directory? Opens of directories for reading always succeed.
// Otherwise fail.
if isDir {
if flag != os.O_RDONLY {
return nil, errors.New("unsupported directory flags")
}
return &s3File{
svc: svc,
bucket: aws.String(bucket),
key: aws.String(path),
}, nil
}
// Walk through each supported flag combination, and emulate flag behaviors
// by testing against the stat'd status of the file on cloud storage.
if flag == os.O_RDONLY && !exists {
return nil, os.ErrNotExist // Read which doesn't exist. Map to os error.
} else if flag == os.O_RDONLY {
// Read which exists. Return a file which will lazily open a reader.
return &s3File{
svc: svc,
bucket: aws.String(bucket),
object: statObject,
key: aws.String(path),
}, nil
} else if flag == os.O_WRONLY|os.O_TRUNC && !exists {
return nil, os.ErrNotExist // Write which doesn't exist. Map to os error.
} else if flag == os.O_WRONLY|os.O_CREATE|os.O_EXCL && exists {
return nil, os.ErrExist // Exclusive create which exists. Map to os error.
} else if flag == os.O_WRONLY|os.O_TRUNC ||
flag == os.O_WRONLY|os.O_CREATE|os.O_TRUNC ||
flag == os.O_WRONLY|os.O_CREATE|os.O_EXCL {
// Canned ACL takes effect if explicitly non-blank. Otherwise, defaults
// to ObjectCannedACLBucketOwnerFullControl.
var cannedACL *string
if specACL := fs.properties.Get(S3GlobalCannedACL); specACL != "" {
cannedACL = aws.String(specACL)
} else {
cannedACL = aws.String(s3.ObjectCannedACLBucketOwnerFullControl)
}
// Begin a multipart upload.
var params = s3.CreateMultipartUploadInput{
ACL: cannedACL,
Bucket: aws.String(bucket),
Key: aws.String(path),
ServerSideEncryption: fs.sseAlgorithm(),
}
resp, err := svc.CreateMultipartUpload(¶ms)
if err != nil {
return nil, fmt.Errorf("s3 create multipart upload: %s", err)
} else if resp.UploadId == nil {
return nil, errors.New("expected UploadId in MultipartUpload creation")
}
var buf = new(bytes.Buffer)
var spool = *buf
var compressor io.WriteCloser
if fs.compress {
compressor = gzip.NewWriter(&spool)
}
return &s3File{
svc: svc,
bucket: aws.String(bucket),
key: aws.String(path),
uploadId: resp.UploadId,
spool: spool,
compressor: compressor,
}, nil
} else {
return nil, errors.New("unsupported file flags")
}
}
func (fs *s3Fs) Open(name string) (http.File, error) {
return fs.OpenFile(name, os.O_RDONLY, 0)
}
func (fs *s3Fs) MkdirAll(name string, perm os.FileMode) error {
// Ensure we can list files under |bucket| and |path|.
var bucket, path = pathToBucketAndSubpath(fs.prefix, name)
path = strings.TrimRight(path, "/")
var listParams = s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(path),
MaxKeys: aws.Int64(1),
}
var objects, err = fs.svc().ListObjectsV2(&listParams)
if err != nil {
return err
}
if len(objects.Contents) > 0 {
if *objects.Contents[0].Key == path {
// Simulate POSIX rules: that a regular file and directory cannot have the same name.
return &os.PathError{Err: os.ErrExist, Path: path}
}
// An object which is prefixed by |path| is allowed.
}
return nil
}
func (fs *s3Fs) Remove(name string) error {
bucket, path := pathToBucketAndSubpath(fs.prefix, name)
var svc = fs.svc()
// DeleteObject won't tell you if the file didn't exist. HeadObject first.
var headParams = s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
}
_, err := svc.HeadObject(&headParams)
if isAWSNotFound(err) {
return os.ErrNotExist // Map to os error.
}
// The file exists. Attempt to delete it.
var deleteParams = s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
}
_, err = svc.DeleteObject(&deleteParams)
return err
}
// CopyAtomic copies files contents, while being sensitive to the consistency
// guarantees provided by Amazon S3. In particular, |to| is aborted
// (s3File.uploadId is aborted explicitly) if a write *or read* error occurs.
func (fs *s3Fs) CopyAtomic(to File, from io.Reader) (n int64, err error) {
if n, err = io.Copy(to, from); err == nil {
// Completes the multipart upload.
to.Close()
}
return
}
// For the specified |path|, generates a signed URL which makes it so that the
// credential keypair is not required to access the path, for the amount of
// time specified by |validFor|.
func (fs *s3Fs) ToURL(path, method string, validFor time.Duration) (*url.URL, error) {
var bucket, subPath = pathToBucketAndSubpath(fs.prefix, path)
var params = s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(subPath),
}
req, _ := fs.svc().GetObjectRequest(¶ms)
if urlStr, err := req.Presign(validFor); err != nil {
return nil, fmt.Errorf("s3 presign: %s", err)
} else if urlObj, err := url.Parse(urlStr); err != nil {
return nil, fmt.Errorf("s3 url parse: %s", err)
} else {
return urlObj, nil
}
}
func (fs *s3Fs) ProducesAuthorizedURL() bool {
return true
}
func (fs *s3Fs) Close() error {
return nil // No-op.
}
// Emits all files underneath |root| to |walkFn|.
//
// Takes advantage of the fact that there are no real directories in S3, only
// prefixes. Initiates an object listing underneath the root (a prefix) but
// without any Delimiter specified. As a result, all files under the prefix,
// regardless of directory-depth, are returned in a small number of API calls
// (typically just one, but subject to continuation if the number of files is
// large.)
//
// As this scan doesn't know about even virtual directory boundaries, the
// |filepath.SkipDir| API allowing walk-functions to skip an entire directory
// isn't implemented and should not be used.
func (fs *s3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
var bucket, subPath = pathToBucketAndSubpath(fs.prefix, root)
var svc = fs.svc()
var continuation *string
for {
var listParams = s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
ContinuationToken: continuation,
// No Delimiter set.
MaxKeys: s3MaxListObjectsKeys,
Prefix: aws.String(subPath),
}
var objects, err = svc.ListObjectsV2(&listParams)
if err != nil {
return fmt.Errorf("s3 list objects: %s", err.Error())
}
for i := range objects.Contents {
var fp = &s3File{
svc: svc,
bucket: aws.String(bucket),
key: objects.Contents[i].Key,
object: objects.Contents[i],
}
// Strip the full prefix. |rel| is now relative to |path|.
var rel, err = filepath.Rel(subPath, *objects.Contents[i].Key)
if err != nil {
return fmt.Errorf("s3 relative path: %s", err.Error())
}
if werr := walkFn(filepath.Join(root, rel), fp, nil); werr == filepath.SkipDir {
// The underlying API call no longer has a concept of directories,
// so we'd have to work really hard to enable SkipDir. Until
// there is such a use-case, punt.
return errors.New("SkipDir not implemented for s3Fs")
} else if werr != nil {
// Allow caller to abort Walk operation.
return fmt.Errorf("s3 walking directory: %s", werr.Error())
}
}
if objects.IsTruncated != nil && !*objects.IsTruncated {
return nil
}
continuation = objects.NextContinuationToken
}
}
// TODO(joshk): Support IAM roles. Only matters if we are actually deployed on AWS.
func (fs *s3Fs) credentials() *credentials.Credentials {
return credentials.NewStaticCredentials(fs.properties.Get(AWSAccessKeyID),
fs.properties.Get(AWSSecretAccessKey), "")
}
func (fs *s3Fs) region() string {
return fs.properties.Get(S3Region)
}
func (fs *s3Fs) sseAlgorithm() *string {
if a := fs.properties.Get(S3SSEAlgorithm); a != "" {
return aws.String(a)
} else {
return nil
}
}
func (fs *s3Fs) svc() *s3.S3 {
// S3 files with Content-Encoding: gzip will get transparently decompressed
// with the default http transport, a behavior that we have to manually disable.
var client = &http.Client{
Transport: &http.Transport{
Dial: keepalive.Dialer.Dial,
DisableCompression: true,
},
}
// If environment variables are nil, the aws client will look for them in
// ~/.aws/credentials automatically.
var config = aws.NewConfig().WithRegion(fs.region()).WithHTTPClient(client)
if e := os.Getenv("AWS_DISABLE_SSL"); e == "1" || e == "true" {
config.DisableSSL = aws.Bool(true)
}
if e := os.Getenv("AWS_ENDPOINT"); e != "" {
config.Endpoint = aws.String(e)
config.S3ForcePathStyle = aws.Bool(true)
}
// Overwrite credentials on aws client config to be from env vars if specified.
if fs.properties.Get(AWSAccessKeyID) != "" ||
fs.properties.Get(AWSSecretAccessKey) != "" {
config = config.WithCredentials(fs.credentials())
}
return s3.New(session.Must(session.NewSession(config)))
}
func isAWSNotFound(err error) bool {
if awsErr, ok := err.(awserr.RequestFailure); ok {
return awsErr.StatusCode() == http.StatusNotFound
} else {
return false
}
}