Skip to content

Commit

Permalink
Delete S3 keys incrementally in batches
Browse files Browse the repository at this point in the history
Instead of first collecting all keys and then batch deleting them,
we will do the incremental delete _online_ per max allowed batch.
Doing this prevents frequent allocations for large S3 keyspaces
and OOM-kills that might happen as a result of those.

Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
  • Loading branch information
milosgajdos committed May 27, 2022
1 parent 5fe6934 commit ac6c07b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 43 deletions.
73 changes: 30 additions & 43 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,54 +900,59 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
return err
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

// Delete recursively deletes all objects stored at "path" and its subpaths.
// We must be careful since S3 does not guarantee read after delete consistency
func (d *driver) Delete(ctx context.Context, path string) error {
s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)

// manually add the given path if it's a file
stat, err := d.Stat(ctx, path)
if err != nil {
return err
}
if stat != nil && !stat.IsDir() {
path := d.s3Path(path)
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: &path,
})
}

// list objects under the given path as a subpath (suffix with slash "/")
s3Path := d.s3Path(path) + "/"
s3Path := d.s3Path(path)
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(s3Path),
}
ListLoop:

for {
// list all the objects
resp, err := d.S3.ListObjectsV2(listObjectsInput)

// resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
// and the loop would be exited without recalling ListObjects
// and the loop would exit without recalling ListObjects
if err != nil || len(resp.Contents) == 0 {
break ListLoop
return storagedriver.PathNotFoundError{Path: path}
}

for _, key := range resp.Contents {
// Skip if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' {
continue
}
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: key.Key,
})
}

// Delete objects only if the list is not empty, otherwise S3 API returns a cryptic error
if len(s3Objects) > 0 {
// NOTE: according to AWS docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more.
// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack
// Delete here straight away and reset the object slice when successful.
_, err = d.S3.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects,
Quiet: aws.Bool(false),
},
})
if err != nil {
return err
}

}
// NOTE: we don't want to reallocate
// the slice so we simply "reset" it
s3Objects = s3Objects[:0]

// resp.Contents must have at least one element or we would have returned not found
listObjectsInput.StartAfter = resp.Contents[len(resp.Contents)-1].Key

Expand All @@ -958,24 +963,6 @@ ListLoop:
}
}

total := len(s3Objects)
if total == 0 {
return storagedriver.PathNotFoundError{Path: path}
}

// need to chunk objects into groups of 1000 per s3 restrictions
for i := 0; i < total; i += 1000 {
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects[i:min(i+1000, total)],
Quiet: aws.Bool(false),
},
})
if err != nil {
return err
}
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions registry/storage/driver/s3-aws/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,10 @@ func TestMoveWithMultipartCopy(t *testing.T) {
}

func TestListObjectsV2(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}

rootDir, err := ioutil.TempDir("", "driver-")
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
Expand Down

0 comments on commit ac6c07b

Please sign in to comment.