-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delete S3 keys incrementally in batches #3635
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |
"bytes" | ||
"context" | ||
"crypto/tls" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
|
@@ -900,54 +901,71 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e | |
return err | ||
} | ||
|
||
func min(a, b int) int { | ||
if a < b { | ||
return a | ||
} | ||
return b | ||
} | ||
|
||
// Delete recursively deletes all objects stored at "path" and its subpaths. | ||
// We must be careful since S3 does not guarantee read after delete consistency | ||
func (d *driver) Delete(ctx context.Context, path string) error { | ||
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax) | ||
|
||
// manually add the given path if it's a file | ||
stat, err := d.Stat(ctx, path) | ||
if err != nil { | ||
return err | ||
} | ||
if stat != nil && !stat.IsDir() { | ||
path := d.s3Path(path) | ||
s3Objects = append(s3Objects, &s3.ObjectIdentifier{ | ||
Key: &path, | ||
}) | ||
} | ||
|
||
// list objects under the given path as a subpath (suffix with slash "/") | ||
s3Path := d.s3Path(path) + "/" | ||
s3Path := d.s3Path(path) | ||
listObjectsInput := &s3.ListObjectsV2Input{ | ||
Bucket: aws.String(d.Bucket), | ||
Prefix: aws.String(s3Path), | ||
} | ||
ListLoop: | ||
|
||
for { | ||
// list all the objects | ||
resp, err := d.S3.ListObjectsV2(listObjectsInput) | ||
|
||
// resp.Contents can only be empty on the first call | ||
// if there were no more results to return after the first call, resp.IsTruncated would have been false | ||
// and the loop would be exited without recalling ListObjects | ||
// and the loop would exit without recalling ListObjects | ||
if err != nil || len(resp.Contents) == 0 { | ||
break ListLoop | ||
return storagedriver.PathNotFoundError{Path: path} | ||
} | ||
|
||
for _, key := range resp.Contents { | ||
// Skip if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab"). | ||
if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' { | ||
continue | ||
} | ||
s3Objects = append(s3Objects, &s3.ObjectIdentifier{ | ||
Key: key.Key, | ||
}) | ||
} | ||
|
||
// Delete objects only if the list is not empty, otherwise S3 API returns a cryptic error | ||
if len(s3Objects) > 0 { | ||
// NOTE: according to AWS docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html | ||
// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more. | ||
// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack | ||
// Delete here straight away and reset the object slice when successful. | ||
resp, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ | ||
Bucket: aws.String(d.Bucket), | ||
Delete: &s3.Delete{ | ||
Objects: s3Objects, | ||
Quiet: aws.Bool(false), | ||
}, | ||
}) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this API, you need to go through the response and see if there are any errors. The status code can still be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is, that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's possible our comments are wrong, but I'm not really sure 🤔 @joaodrp you worked on this more closely, do you remember the specifics? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The sample in the AWS API doc is this:
And the go code to invoke the delete is this, which is fairly generic and i'm not sure it would parse for DeleteErrors: To me, it looks like there doesn't have to be an err object even if there are DeleteErrors. It could be something to do with adding the middleware, but I didn't think so having a quick look. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @milosgajdos I had a go at this one here: Jamstah@60bec85 Testing by remove the delete permission from the user, the code in this PR currently misses the error, and the code in my branch does not, so I think it is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you are right. I did make the same test a few weeks ago, but somehow forgot to update the comment -- actually not quite the same test but similar. I shall update this PR soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've just pushed this e3d3e15 which makes me rather sad about AWS SDK still (I thought I had reached the bottom, but there are clearly new depths). @deleteriousEffect can you please have another look? 🙇 |
||
return err | ||
} | ||
|
||
if len(resp.Errors) > 0 { | ||
// NOTE: AWS SDK s3.Error does not implement error interface which | ||
// is pretty intensely sad, so we have to do away with this for now. | ||
errs := make([]error, 0, len(resp.Errors)) | ||
for _, err := range resp.Errors { | ||
errs = append(errs, errors.New(err.String())) | ||
} | ||
return storagedriver.Errors{ | ||
DriverName: driverName, | ||
Errs: errs, | ||
} | ||
} | ||
} | ||
// NOTE: we don't want to reallocate | ||
// the slice so we simply "reset" it | ||
s3Objects = s3Objects[:0] | ||
|
||
// resp.Contents must have at least one element or we would have returned not found | ||
listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key | ||
|
||
|
@@ -958,24 +976,6 @@ ListLoop: | |
} | ||
} | ||
|
||
total := len(s3Objects) | ||
if total == 0 { | ||
return storagedriver.PathNotFoundError{Path: path} | ||
} | ||
|
||
// need to chunk objects into groups of 1000 per s3 restrictions | ||
for i := 0; i < total; i += 1000 { | ||
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ | ||
Bucket: aws.String(d.Bucket), | ||
Delete: &s3.Delete{ | ||
Objects: s3Objects[i:min(i+1000, total)], | ||
Quiet: aws.Bool(false), | ||
}, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the resp.IsTruncated validating at here?