Skip to content

Commit

Permalink
Merge pull request #3480 from CollinShoop/optimize-s3-walk
Browse files Browse the repository at this point in the history
Optimize storagedriver/s3 Walk (up to ~500x) + small bugfix
  • Loading branch information
milosgajdos committed Sep 26, 2021
2 parents a60a3f6 + cf81f67 commit 1563384
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 92 deletions.
2 changes: 1 addition & 1 deletion registry/storage/driver/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
Expand Down
2 changes: 1 addition & 1 deletion registry/storage/driver/filesystem/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
Expand Down
2 changes: 1 addition & 1 deletion registry/storage/driver/inmemory/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (d *driver) URLFor(ctx context.Context, path string, options map[string]int
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
// from the given path, calling f on each file and directory
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return storagedriver.WalkFallback(ctx, d, path, f)
}
Expand Down
169 changes: 91 additions & 78 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io/ioutil"
"math"
"net/http"
"path/filepath"
"reflect"
"sort"
"strconv"
Expand Down Expand Up @@ -941,111 +942,85 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn)
return nil
}

type walkInfoContainer struct {
storagedriver.FileInfoFields
prefix *string
}

// Path provides the full path of the target of this file info.
func (wi walkInfoContainer) Path() string {
return wi.FileInfoFields.Path
}

// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (wi walkInfoContainer) Size() int64 {
return wi.FileInfoFields.Size
}

// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (wi walkInfoContainer) ModTime() time.Time {
return wi.FileInfoFields.ModTime
}

// IsDir returns true if the path is a directory.
func (wi walkInfoContainer) IsDir() bool {
return wi.FileInfoFields.IsDir
}

func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, prefix string, f storagedriver.WalkFn) error {
var retError error
var (
retError error
// the most recent directory walked for de-duping
prevDir string
// the most recent skip directory to avoid walking over undesirable files
prevSkipDir string
)
prevDir = prefix + path

listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
Bucket: aws.String(d.Bucket),
Prefix: aws.String(path),
MaxKeys: aws.Int64(listMax),
}

ctx, done := dcontext.WithTrace(parentCtx)
defer done("s3aws.ListObjectsV2Pages(%s)", path)
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {

var count int64
// KeyCount was introduced with version 2 of the GET Bucket operation in S3.
// Some S3 implementations don't support V2 now, so we fall back to manual
// calculation of the key count if required
if objects.KeyCount != nil {
count = *objects.KeyCount
*objectCount += *objects.KeyCount
} else {
count = int64(len(objects.Contents) + len(objects.CommonPrefixes))
*objectCount += count
}

walkInfos := make([]walkInfoContainer, 0, count)
// When the "delimiter" argument is omitted, the S3 list API will list all objects in the bucket
// recursively, omitting directory paths. Objects are listed in sorted, depth-first order so we
// can infer all the directories by comparing each object path to the last one we saw.
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html

for _, dir := range objects.CommonPrefixes {
commonPrefix := *dir.Prefix
walkInfos = append(walkInfos, walkInfoContainer{
prefix: dir.Prefix,
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: strings.Replace(commonPrefix[:len(commonPrefix)-1], d.s3Path(""), prefix, 1),
},
})
}
// With files returned in sorted depth-first order, directories are inferred in the same order.
// ErrSkipDir is handled by explicitly skipping over any files under the skipped directory. This may be sub-optimal
// for extreme edge cases but for the general use case in a registry, this is orders of magnitude
// faster than a more explicit recursive implementation.
listObjectErr := d.S3.ListObjectsV2PagesWithContext(ctx, listObjectsInput, func(objects *s3.ListObjectsV2Output, lastPage bool) bool {
walkInfos := make([]storagedriver.FileInfoInternal, 0, len(objects.Contents))

for _, file := range objects.Contents {
// empty prefixes are listed as objects inside its own prefix.
// https://docs.aws.amazon.com/AmazonS3/latest/user-guide/using-folders.html
if strings.HasSuffix(*file.Key, "/") {
continue
filePath := strings.Replace(*file.Key, d.s3Path(""), prefix, 1)

// get a list of all inferred directories between the previous directory and this file
dirs := directoryDiff(prevDir, filePath)
if len(dirs) > 0 {
for _, dir := range dirs {
walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: true,
Path: dir,
},
})
prevDir = dir
}
}
walkInfos = append(walkInfos, walkInfoContainer{

walkInfos = append(walkInfos, storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
IsDir: false,
Size: *file.Size,
ModTime: *file.LastModified,
Path: strings.Replace(*file.Key, d.s3Path(""), prefix, 1),
Path: filePath,
},
})
}

sort.SliceStable(walkInfos, func(i, j int) bool { return walkInfos[i].FileInfoFields.Path < walkInfos[j].FileInfoFields.Path })

for _, walkInfo := range walkInfos {
// skip any results under the last skip directory
if prevSkipDir != "" && strings.HasPrefix(walkInfo.Path(), prevSkipDir) {
continue
}

err := f(walkInfo)
*objectCount++

if err == storagedriver.ErrSkipDir {
if walkInfo.IsDir() {
continue
} else {
break
if err != nil {
if err == storagedriver.ErrSkipDir {
if walkInfo.IsDir() {
prevSkipDir = walkInfo.Path()
continue
}
// is file, stop gracefully
return false
}
} else if err != nil {
retError = err
return false
}

if walkInfo.IsDir() {
if err := d.doWalk(ctx, objectCount, *walkInfo.prefix, prefix, f); err != nil {
retError = err
return false
}
}
}
return true
})
Expand All @@ -1061,6 +1036,44 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
return nil
}

// directoryDiff finds all directories that are not in common between
// the previous and current paths in sorted order.
//
// Eg 1 directoryDiff("/path/to/folder", "/path/to/folder/folder/file")
// => [ "/path/to/folder/folder" ],
// Eg 2 directoryDiff("/path/to/folder/folder1", "/path/to/folder/folder2/file")
// => [ "/path/to/folder/folder2" ]
// Eg 3 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/file")
// => [ "/path/to/folder/folder2" ]
// Eg 4 directoryDiff("/path/to/folder/folder1/file", "/path/to/folder/folder2/folder1/file")
// => [ "/path/to/folder/folder2", "/path/to/folder/folder2/folder1" ]
// Eg 5 directoryDiff("/", "/path/to/folder/folder/file")
// => [ "/path", "/path/to", "/path/to/folder", "/path/to/folder/folder" ],
func directoryDiff(prev, current string) []string {
var paths []string

if prev == "" || current == "" {
return paths
}

parent := current
for {
parent = filepath.Dir(parent)
if parent == "/" || parent == prev || strings.HasPrefix(prev, parent) {
break
}
paths = append(paths, parent)
}
reverse(paths)
return paths
}

func reverse(s []string) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}

func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
Expand Down
Loading

0 comments on commit 1563384

Please sign in to comment.