From 19e529284b2d27cf5767657f4ef588b6eedde75b Mon Sep 17 00:00:00 2001 From: Liang Zheng Date: Thu, 18 Apr 2024 15:56:26 +0800 Subject: [PATCH] add concurrency limits for tag lookup and untag Harbor is using the distribution for it's (harbor-registry) registry component. The harbor GC will call into the registry to delete the manifest, which in turn then does a lookup for all tags that reference the deleted manifest. To find the tag references, the registry will iterate every tag in the repository and read it's link file to check if it matches the deleted manifest (i.e. to see if uses the same sha256 digest). So, the more tags in repository, the worse the performance will be (as there will be more s3 API calls occurring for the tag directory lookups and tag file reads). Therefore, we can use concurrent lookup and untag to optimize performance as described in https://github.com/goharbor/harbor/issues/12948. This optimization was originally contributed by @Antiarchitect, now I would like to take it over. Thanks @Antiarchitect's efforts with PR https://github.com/distribution/distribution/pull/3890. Signed-off-by: Liang Zheng --- cmd/registry/config-dev.yml | 2 ++ configuration/configuration.go | 17 +++++++++ configuration/configuration_test.go | 8 +++++ registry/handlers/app.go | 12 +++++++ registry/handlers/manifests.go | 18 +++++++--- registry/storage/registry.go | 21 ++++++++++-- registry/storage/tagstore.go | 53 +++++++++++++++++++---------- 7 files changed, 107 insertions(+), 24 deletions(-) diff --git a/cmd/registry/config-dev.yml b/cmd/registry/config-dev.yml index 9bf36583ea..a72c001236 100644 --- a/cmd/registry/config-dev.yml +++ b/cmd/registry/config-dev.yml @@ -14,6 +14,8 @@ storage: maintenance: uploadpurging: enabled: false + tag: + concurrency_limit: 10 http: addr: :5000 debug: diff --git a/configuration/configuration.go b/configuration/configuration.go index 427081977c..aa74e3cb4e 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -441,6 +441,8 @@ func (storage Storage) Type() string { // allow configuration of delete case "redirect": // allow configuration of redirect + case "tag": + // allow configuration of tag default: storageType = append(storageType, k) } @@ -454,6 +456,19 @@ func (storage Storage) Type() string { return "" } +// TagParameters returns the Parameters map for a Storage tag configuration +func (storage Storage) TagParameters() Parameters { + return storage["tag"] +} + +// setTagParameter changes the parameter at the provided key to the new value +func (storage Storage) setTagParameter(key string, value interface{}) { + if _, ok := storage["tag"]; !ok { + storage["tag"] = make(Parameters) + } + storage["tag"][key] = value +} + // Parameters returns the Parameters map for a Storage configuration func (storage Storage) Parameters() Parameters { return storage[storage.Type()] @@ -482,6 +497,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error { // allow configuration of delete case "redirect": // allow configuration of redirect + case "tag": + // allow configuration of tag default: types = append(types, k) } diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 2139f8f1a9..39520bbd7d 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -39,6 +39,9 @@ var configStruct = Configuration{ "url1": "https://foo.example.com", "path1": "/some-path", }, + "tag": Parameters{ + "concurrency_limit": 10, + }, }, Auth: Auth{ "silly": Parameters{ @@ -167,6 +170,8 @@ storage: int1: 42 url1: "https://foo.example.com" path1: "/some-path" + tag: + concurrency_limit: 10 auth: silly: realm: silly @@ -542,6 +547,9 @@ func copyConfig(config Configuration) *Configuration { for k, v := range config.Storage.Parameters() { configCopy.Storage.setParameter(k, v) } + for k, v := range config.Storage.TagParameters() { + configCopy.Storage.setTagParameter(k, v) + } configCopy.Auth = Auth{config.Auth.Type(): Parameters{}} for k, v := range config.Auth.Parameters() { diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 2983176b85..9b033c6e3c 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -184,6 +184,18 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App { } } + // configure tag lookup concurrency limit + if p := config.Storage.TagParameters(); p != nil { + l, ok := p["concurrency_limit"] + if ok { + limit, ok := l.(int) + if !ok { + panic("tag lookup concurrency limit config key must have a integer value") + } + options = append(options, storage.TagLookupConcurrencyLimit(limit)) + } + } + // configure redirects var redirectDisabled bool if redirectConfig, ok := config.Storage["redirect"]; ok { diff --git a/registry/handlers/manifests.go b/registry/handlers/manifests.go index 704a8ab7f2..bde4f212b1 100644 --- a/registry/handlers/manifests.go +++ b/registry/handlers/manifests.go @@ -13,11 +13,13 @@ import ( "github.com/distribution/distribution/v3/manifest/ocischema" "github.com/distribution/distribution/v3/manifest/schema2" "github.com/distribution/distribution/v3/registry/api/errcode" + "github.com/distribution/distribution/v3/registry/storage" "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/reference" "github.com/gorilla/handlers" "github.com/opencontainers/go-digest" v1 "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/errgroup" ) const ( @@ -481,12 +483,20 @@ func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Reques return } + g := errgroup.Group{} + g.SetLimit(storage.DefaultConcurrencyLimit) for _, tag := range referencedTags { - if err := tagService.Untag(imh, tag); err != nil { - imh.Errors = append(imh.Errors, err) - return - } + tag := tag + + g.Go(func() error { + if err := tagService.Untag(imh, tag); err != nil { + imh.Errors = append(imh.Errors, err) + } + return nil + }) } + // imh already records all errors, so ignore the error of Wait() + g.Wait() //nolint:errcheck w.WriteHeader(http.StatusAccepted) } diff --git a/registry/storage/registry.go b/registry/storage/registry.go index ecf483bf9c..224acd31b3 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -10,6 +10,10 @@ import ( "github.com/distribution/reference" ) +const ( + DefaultConcurrencyLimit = 8 +) + // registry is the top-level implementation of Registry for use in the storage // package. All instances should descend from this object. type registry struct { @@ -18,6 +22,7 @@ type registry struct { statter *blobStatter // global statter service. blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider deleteEnabled bool + tagLookupConcurrencyLimit int resumableDigestEnabled bool blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory manifestURLs manifestURLs @@ -40,6 +45,13 @@ func EnableRedirect(registry *registry) error { return nil } +func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption { + return func(registry *registry) error { + registry.tagLookupConcurrencyLimit = concurrencyLimit + return nil + } +} + // EnableDelete is a functional option for NewRegistry. It enables deletion on // the registry. func EnableDelete(registry *registry) error { @@ -184,9 +196,14 @@ func (repo *repository) Named() reference.Named { } func (repo *repository) Tags(ctx context.Context) distribution.TagService { + limit := DefaultConcurrencyLimit + if repo.tagLookupConcurrencyLimit > 0 { + limit = repo.tagLookupConcurrencyLimit + } tags := &tagStore{ - repository: repo, - blobStore: repo.registry.blobStore, + repository: repo, + blobStore: repo.registry.blobStore, + concurrencyLimit: limit, } return tags diff --git a/registry/storage/tagstore.go b/registry/storage/tagstore.go index 29dcf4e3f1..d54fbf762f 100644 --- a/registry/storage/tagstore.go +++ b/registry/storage/tagstore.go @@ -5,9 +5,11 @@ import ( "path" "sort" + "github.com/opencontainers/go-digest" + "golang.org/x/sync/errgroup" + "github.com/distribution/distribution/v3" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" - "github.com/opencontainers/go-digest" ) var _ distribution.TagService = &tagStore{} @@ -18,8 +20,9 @@ var _ distribution.TagService = &tagStore{} // which only makes use of the Digest field of the returned distribution.Descriptor // but does not enable full roundtripping of Descriptor objects type tagStore struct { - repository *repository - blobStore *blobStore + repository *repository + blobStore *blobStore + concurrencyLimit int } // All returns all tags @@ -145,26 +148,40 @@ func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([ return nil, err } + g := errgroup.Group{} + g.SetLimit(ts.concurrencyLimit) + var tags []string for _, tag := range allTags { - tagLinkPathSpec := manifestTagCurrentPathSpec{ - name: ts.repository.Named().Name(), - tag: tag, - } + tag := tag - tagLinkPath, _ := pathFor(tagLinkPathSpec) - tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) - if err != nil { - switch err.(type) { - case storagedriver.PathNotFoundError: - continue + g.Go(func() error { + tagLinkPathSpec := manifestTagCurrentPathSpec{ + name: ts.repository.Named().Name(), + tag: tag, } - return nil, err - } - if tagDigest == desc.Digest { - tags = append(tags, tag) - } + tagLinkPath, _ := pathFor(tagLinkPathSpec) + tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) + if err != nil { + switch err.(type) { + case storagedriver.PathNotFoundError: + return nil + } + return err + } + + if tagDigest == desc.Digest { + tags = append(tags, tag) + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + return nil, err } return tags, nil