forked from ppenguin/goofys
/
backend_gcs.go
469 lines (398 loc) · 14 KB
/
backend_gcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
package internal
import (
"github.com/djmaze/goofys/api/common"
"bytes"
"context"
"fmt"
"io"
"net"
"path"
"strings"
"syscall"
"cloud.google.com/go/storage"
"github.com/jacobsa/fuse"
"golang.org/x/sync/errgroup"
syncsem "golang.org/x/sync/semaphore"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
type GCSBackend struct {
bucketName string
config *common.GCSConfig // stores user and bucket configuration
cap Capabilities
bucket *storage.BucketHandle // provides set of methods to operate on a bucket
logger *common.LogHandle // logger for GCS backend
}
const (
maxListKeys int = 1000 // the max limit for number of elements during listObjects
)
type GCSMultipartBlobCommitInput struct {
cancel context.CancelFunc // useful to abort a multipart upload in GCS
writer *storage.Writer // used to emulate mpu under GCS, which currently used a single gcsWriter
}
// NewGCS initializes a GCS Backend.
// It creates an authenticated client or unauthenticated client based on existing credentials in the environment.
func NewGCS(bucket string, config *common.GCSConfig) (*GCSBackend, error) {
var client *storage.Client
var err error
// TODO: storage.NewClient has automated mechanisms to set up credentials together with HTTP settings.
// Currently, we are using config.Credentials only to differentiate between creating an authenticated or
// unauthenticated client not using it to initialize a client.
// If config.Credentials are configured, we'll get an authenticated client.
if config.Credentials != nil {
client, err = storage.NewClient(context.Background())
} else {
// otherwise we will get an unauthenticated client. option.WithoutAuthentication() is necessary
// because the API will generate an error if it could not find credentials and this option is unset.
client, err = storage.NewClient(context.Background(), option.WithoutAuthentication())
}
if err != nil {
return nil, err
}
return &GCSBackend{
config: config,
bucketName: bucket,
bucket: client.Bucket(bucket),
cap: Capabilities{
MaxMultipartSize: 5 * 1024 * 1024 * 1024,
Name: "gcs",
// parallel multipart upload is not supported in GCS
NoParallelMultipart: true,
},
logger: common.GetLogger("gcs"),
}, nil
}
// Init checks user's access to bucket.
func (g *GCSBackend) Init(key string) error {
// We will do a successful mount if the user can list on the bucket.
// This is different other backends because GCS does not differentiate between object not found and
// bucket not found.
prefix, _ := path.Split(key)
_, err := g.ListBlobs(&ListBlobsInput{
MaxKeys: PUInt32(1),
Prefix: PString(prefix),
})
g.logger.Debugf("INIT GCS: ListStatus = %s", getDebugResponseStatus(err))
if err == syscall.ENXIO {
return fmt.Errorf("bucket %v does not exist", g.bucketName)
}
// Errors can be returned directly since ListBlobs converts them to syscall errors.
return err
}
func (g *GCSBackend) Capabilities() *Capabilities {
return &g.cap
}
// Bucket returns the GCSBackend's bucket name.
func (g *GCSBackend) Bucket() string {
return g.bucketName
}
func getDebugResponseStatus(err error) string {
if err != nil {
return fmt.Sprintf("ERROR: %v", err)
}
return "SUCCESS"
}
// HeadBlob gets the file object metadata.
func (g *GCSBackend) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) {
attrs, err := g.bucket.Object(param.Key).Attrs(context.Background())
g.logger.Debugf("HEAD %v = %v", param.Key, getDebugResponseStatus(err))
if err != nil {
return nil, mapGCSError(err)
}
return &HeadBlobOutput{
BlobItemOutput: BlobItemOutput{
Key: &attrs.Name,
ETag: &attrs.Etag,
LastModified: &attrs.Updated,
Size: uint64(attrs.Size),
StorageClass: &attrs.StorageClass,
},
ContentType: &attrs.ContentType,
IsDirBlob: strings.HasSuffix(param.Key, "/"),
Metadata: PMetadata(attrs.Metadata),
}, nil
}
func (g *GCSBackend) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
query := storage.Query{
Prefix: NilStr(param.Prefix),
Delimiter: NilStr(param.Delimiter),
StartOffset: NilStr(param.StartAfter),
}
objectIterator := g.bucket.Objects(context.Background(), &query)
// Set max keys, a number > 0 is required by the SDK.
maxKeys := int(NilUint32(param.MaxKeys))
if maxKeys == 0 {
maxKeys = maxListKeys // follow the default JSON API mechanism to return 1000 items if maxKeys is not set.
}
pager := iterator.NewPager(objectIterator, maxKeys, NilStr(param.ContinuationToken))
var entries []*storage.ObjectAttrs
nextToken, err := pager.NextPage(&entries)
g.logger.Debugf("LIST %s : %s", param, getDebugResponseStatus(err))
if err != nil {
return nil, mapGCSError(err)
}
var nextContToken *string
if nextToken != "" {
nextContToken = &nextToken
}
var prefixes []BlobPrefixOutput
var items []BlobItemOutput
for _, entry := range entries {
// if blob is a prefix, then Prefix field will be set
if entry.Prefix != "" {
prefixes = append(prefixes, BlobPrefixOutput{&entry.Prefix})
} else if entry.Name != "" { // otherwise for actual blob, Name field will set
items = append(items, BlobItemOutput{
Key: &entry.Name,
ETag: &entry.Etag,
LastModified: &entry.Updated,
Size: uint64(entry.Size),
StorageClass: &entry.StorageClass,
})
} else {
log.Errorf("LIST Unknown object: %v", entry)
}
}
return &ListBlobsOutput{
Prefixes: prefixes,
Items: items,
NextContinuationToken: nextContToken,
IsTruncated: nextContToken != nil,
}, nil
}
func (g *GCSBackend) DeleteBlob(param *DeleteBlobInput) (*DeleteBlobOutput, error) {
err := g.bucket.Object(param.Key).Delete(context.Background())
g.logger.Debugf("DELETE Object %v = %s ", param.Key, getDebugResponseStatus(err))
if err != nil {
return nil, mapGCSError(err)
}
return &DeleteBlobOutput{}, nil
}
// DeleteBlobs deletes multiple GCS blobs.
func (g *GCSBackend) DeleteBlobs(param *DeleteBlobsInput) (*DeleteBlobsOutput, error) {
// The go sdk does not support batch requests: https://issuetracker.google.com/issues/142641783
// So we're using goroutines and errorgroup to delete multiple objects
eg, rootCtx := errgroup.WithContext(context.Background())
sem := syncsem.NewWeighted(100)
for _, item := range param.Items {
if err := sem.Acquire(rootCtx, 1); err != nil {
return nil, err
}
curItem := item
eg.Go(func() error {
defer sem.Release(1)
return g.bucket.Object(curItem).Delete(rootCtx)
})
}
if err := eg.Wait(); err != nil {
return nil, mapGCSError(err)
}
return &DeleteBlobsOutput{}, nil
}
// RenameBlob is not supported for GCS backend. So Goofys will do a CopyBlob followed by DeleteBlob for renames.
func (g *GCSBackend) RenameBlob(param *RenameBlobInput) (*RenameBlobOutput, error) {
return nil, syscall.ENOTSUP
}
// CopyBlob copies a source object to another destination object under the same bucket.
func (g *GCSBackend) CopyBlob(param *CopyBlobInput) (*CopyBlobOutput, error) {
src := g.bucket.Object(param.Source)
dest := g.bucket.Object(param.Destination)
copier := dest.CopierFrom(src)
copier.StorageClass = NilStr(param.StorageClass)
copier.Etag = NilStr(param.ETag)
copier.Metadata = NilMetadata(param.Metadata)
_, err := copier.Run(context.Background())
g.logger.Debugf("Copy object %s = %s ", param, getDebugResponseStatus(err))
if err != nil {
return nil, mapGCSError(err)
}
return &CopyBlobOutput{}, nil
}
// GetBlob returns a file reader for a GCS object.
func (g *GCSBackend) GetBlob(param *GetBlobInput) (*GetBlobOutput, error) {
obj := g.bucket.Object(param.Key).ReadCompressed(true)
var reader *storage.Reader
var err error
if param.Count != 0 {
reader, err = obj.NewRangeReader(context.Background(), int64(param.Start), int64(param.Count))
} else if param.Start != 0 {
reader, err = obj.NewRangeReader(context.Background(), int64(param.Start), -1)
} else {
// If we don't limit the range, the full object will be read
reader, err = obj.NewReader(context.Background())
}
g.logger.Debugf("GET Blob %s = %v", param, getDebugResponseStatus(err))
if err != nil {
return nil, mapGCSError(err)
}
// Caveats: the SDK's reader object doesn't provide ETag, StorageClass, and Metadata attributes within a single
// API call, hence we're not returning these information in the output.
// Relevant GitHub issue: https://github.com/googleapis/google-cloud-go/issues/2740
return &GetBlobOutput{
HeadBlobOutput: HeadBlobOutput{
BlobItemOutput: BlobItemOutput{
Key: PString(param.Key),
LastModified: &reader.Attrs.LastModified,
Size: uint64(reader.Attrs.Size),
},
ContentType: &reader.Attrs.ContentType,
},
Body: reader,
}, nil
}
// PutBlob writes a file to GCS.
func (g *GCSBackend) PutBlob(param *PutBlobInput) (*PutBlobOutput, error) {
// Handle nil pointer error when param.Body is nil
body := param.Body
if body == nil {
body = bytes.NewReader([]byte(""))
}
writer := g.bucket.Object(param.Key).NewWriter(context.Background())
writer.ContentType = NilStr(param.ContentType)
writer.Metadata = NilMetadata(param.Metadata)
// setting chunkSize to be equal to the file size will make this a single request upload
writer.ChunkSize = int(NilUint64(param.Size))
_, err := io.Copy(writer, body)
g.logger.Debugf("PUT Blob (to writer) %s = %s ", param, getDebugResponseStatus(err))
if err != nil {
return nil, mapGCSError(err)
}
err = writer.Close()
g.logger.Debugf("PUT Blob (Flush) %v = %s ", param.Key, getDebugResponseStatus(err))
if err != nil {
return nil, mapGCSError(err)
}
attrs := writer.Attrs()
return &PutBlobOutput{
ETag: &attrs.Etag,
//LastModified: &attrs.Updated, // this field exist in the upstream open source goofys repo
StorageClass: &attrs.StorageClass,
}, nil
}
// MultipartBlobBegin begins a multi part blob request.
// Under GCS backend, we'll initialize the gcsWriter object and the context for the multipart blob request here.
func (g *GCSBackend) MultipartBlobBegin(param *MultipartBlobBeginInput) (*MultipartBlobCommitInput, error) {
ctx, cancel := context.WithCancel(context.Background())
writer := g.bucket.Object(param.Key).NewWriter(ctx)
writer.ChunkSize = g.config.ChunkSize
writer.ContentType = NilStr(param.ContentType)
writer.Metadata = NilMetadata(param.Metadata)
g.logger.Debugf("Multipart Blob BEGIN: %s", param)
return &MultipartBlobCommitInput{
Key: ¶m.Key,
Metadata: param.Metadata,
backendData: &GCSMultipartBlobCommitInput{
writer: writer,
cancel: cancel,
},
}, nil
}
// MultipartBlobAdd adds part of blob to the upload request.
// Under GCS backend, we'll write that blob part into the gcsWriter.
// TODO(deka): This is a temporary implementation to allow most tests to run.
// We might change this implementation in the future.
func (g *GCSBackend) MultipartBlobAdd(param *MultipartBlobAddInput) (*MultipartBlobAddOutput, error) {
commitData, ok := param.Commit.backendData.(*GCSMultipartBlobCommitInput)
if !ok {
panic("Incorrect commit data type")
}
// Handle nil pointer error when param.Body is nil
body := param.Body
if body == nil {
body = bytes.NewReader([]byte(""))
}
n, err := io.Copy(commitData.writer, body)
g.logger.Debugf("Multipart Blob ADD %s bytesWritten: %v = %s", param, n, getDebugResponseStatus(err))
if err != nil {
commitData.cancel()
return nil, err
}
return &MultipartBlobAddOutput{}, nil
}
func (g *GCSBackend) MultipartBlobAbort(param *MultipartBlobCommitInput) (*MultipartBlobAbortOutput, error) {
commitData, ok := param.backendData.(*GCSMultipartBlobCommitInput)
if !ok {
panic("Incorrect commit data type")
}
g.logger.Debugf("Multipart Blob ABORT %v", param.Key)
commitData.cancel()
return &MultipartBlobAbortOutput{}, nil
}
func (g *GCSBackend) MultipartBlobCommit(param *MultipartBlobCommitInput) (*MultipartBlobCommitOutput, error) {
commitData, ok := param.backendData.(*GCSMultipartBlobCommitInput)
if !ok {
panic("Incorrect commit data type")
}
// Flushing a writer will make GCS to fully upload the buffer
err := commitData.writer.Close()
g.logger.Debugf("Multipart Blob COMMIT %v = %s ", param.Key, getDebugResponseStatus(err))
if err != nil {
commitData.cancel()
return nil, mapGCSError(err)
}
attrs := commitData.writer.Attrs()
return &MultipartBlobCommitOutput{
ETag: &attrs.Etag,
}, nil
}
func (g *GCSBackend) MultipartExpire(param *MultipartExpireInput) (*MultipartExpireOutput, error) {
// No-op: GCS expires a resumable session after 7 days automatically
return &MultipartExpireOutput{}, nil
}
func (g *GCSBackend) RemoveBucket(param *RemoveBucketInput) (*RemoveBucketOutput, error) {
err := g.bucket.Delete(context.Background())
if err != nil {
return nil, mapGCSError(err)
}
return &RemoveBucketOutput{}, nil
}
func (g *GCSBackend) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error) {
// Requires an authenticated credentials
err := g.bucket.Create(context.Background(), g.config.Credentials.ProjectID, nil)
if err != nil {
return nil, mapGCSError(err)
}
return &MakeBucketOutput{}, nil
}
func (g *GCSBackend) Delegate() interface{} {
return g
}
// mapGCSError maps an error to syscall / fuse errors.
func mapGCSError(err error) error {
if err == nil {
return nil
}
if err == storage.ErrObjectNotExist {
return fuse.ENOENT
}
// this error can be returned during list operation if the bucket does not exist
if err == storage.ErrBucketNotExist {
return syscall.ENXIO
}
if e, ok := err.(*googleapi.Error); ok {
switch e.Code {
case 409:
return fuse.EEXIST
case 404:
return fuse.ENOENT
// Retryable errors:
// https://cloud.google.com/storage/docs/json_api/v1/status-codes#429_Too_Many_Requests
// https://cloud.google.com/storage/docs/json_api/v1/status-codes#500_Internal_Server_Error
case 429, 500, 502, 503, 504:
return syscall.EAGAIN
default:
// return syscall error if it's not nil
fuseErr := mapHttpError(e.Code)
if fuseErr != nil {
return fuseErr
}
}
}
if e, ok := err.(net.Error); ok {
if e.Timeout() {
return syscall.ETIMEDOUT
}
}
return err
}