From 363a5bc97cd8b18f7cbf330943427574ebb960f5 Mon Sep 17 00:00:00 2001 From: Andrey Voronkov Date: Wed, 17 Apr 2024 15:17:55 +0300 Subject: [PATCH] Concurrent Lookup in TagStore Based on #3589 Signed-off-by: Andrey Voronkov --- registry/storage/registry.go | 5 +- registry/storage/tagstore.go | 104 ++++++++++---- registry/storage/tagstore_test.go | 95 +++++++++---- vendor/golang.org/x/sync/errgroup/errgroup.go | 132 ++++++++++++++++++ 4 files changed, 276 insertions(+), 60 deletions(-) create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go diff --git a/registry/storage/registry.go b/registry/storage/registry.go index ecf483bf9c2..e6e14f2a995 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -184,10 +184,7 @@ func (repo *repository) Named() reference.Named { } func (repo *repository) Tags(ctx context.Context) distribution.TagService { - tags := &tagStore{ - repository: repo, - blobStore: repo.registry.blobStore, - } + tags := NewStore(ctx, repo, repo.registry.blobStore) return tags } diff --git a/registry/storage/tagstore.go b/registry/storage/tagstore.go index 29dcf4e3f19..c8cd931e4fa 100644 --- a/registry/storage/tagstore.go +++ b/registry/storage/tagstore.go @@ -2,28 +2,47 @@ package storage import ( "context" + "os" "path" "sort" + "strconv" "github.com/distribution/distribution/v3" + "github.com/distribution/distribution/v3/internal/dcontext" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/opencontainers/go-digest" + "golang.org/x/sync/errgroup" ) -var _ distribution.TagService = &tagStore{} +var _ distribution.TagService = &TagStore{} -// tagStore provides methods to manage manifest tags in a backend storage driver. +// TagStore provides methods to manage manifest tags in a backend storage driver. // This implementation uses the same on-disk layout as the (now deleted) tag // store. This provides backward compatibility with current registry deployments // which only makes use of the Digest field of the returned distribution.Descriptor // but does not enable full roundtripping of Descriptor objects -type tagStore struct { - repository *repository - blobStore *blobStore +type TagStore struct { + repository *repository + blobStore *blobStore + lookupConcurrencyFactor int +} + +func NewStore(ctx context.Context, repository *repository, blobStore *blobStore) *TagStore { + logger := dcontext.GetLogger(ctx) + lookupConcurrencyFactor, err := strconv.Atoi(os.Getenv("STORAGE_TAGSTORE_LOOKUP_CONCURRENCY")) + if err != nil { + lookupConcurrencyFactor = 64 + logger.Infof("TagStore: STORAGE_TAGSTORE_LOOKUP_CONCURRENCY is not set. Using default %d as lookup concurrency factor", lookupConcurrencyFactor) + } + return &TagStore{ + repository: repository, + blobStore: blobStore, + lookupConcurrencyFactor: lookupConcurrencyFactor, + } } // All returns all tags -func (ts *tagStore) All(ctx context.Context) ([]string, error) { +func (ts *TagStore) All(ctx context.Context) ([]string, error) { pathSpec, err := pathFor(manifestTagsPathSpec{ name: ts.repository.Named().Name(), }) @@ -56,7 +75,7 @@ func (ts *tagStore) All(ctx context.Context) ([]string, error) { // Tag tags the digest with the given tag, updating the store to point at // the current tag. The digest must point to a manifest. -func (ts *tagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error { +func (ts *TagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error { currentPath, err := pathFor(manifestTagCurrentPathSpec{ name: ts.repository.Named().Name(), tag: tag, @@ -77,7 +96,7 @@ func (ts *tagStore) Tag(ctx context.Context, tag string, desc distribution.Descr } // resolve the current revision for name and tag. -func (ts *tagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) { +func (ts *TagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) { currentPath, err := pathFor(manifestTagCurrentPathSpec{ name: ts.repository.Named().Name(), tag: tag, @@ -100,7 +119,7 @@ func (ts *tagStore) Get(ctx context.Context, tag string) (distribution.Descripto } // Untag removes the tag association -func (ts *tagStore) Untag(ctx context.Context, tag string) error { +func (ts *TagStore) Untag(ctx context.Context, tag string) error { tagPath, err := pathFor(manifestTagPathSpec{ name: ts.repository.Named().Name(), tag: tag, @@ -116,7 +135,7 @@ func (ts *tagStore) Untag(ctx context.Context, tag string) error { // to index manifest blobs by tag name. While the tag store doesn't map // precisely to the linked blob store, using this ensures the links are // managed via the same code path. -func (ts *tagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlobStore { +func (ts *TagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlobStore { return &linkedBlobStore{ blobStore: ts.blobStore, repository: ts.repository, @@ -133,44 +152,67 @@ func (ts *tagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlob // Lookup recovers a list of tags which refer to this digest. When a manifest is deleted by // digest, tag entries which point to it need to be recovered to avoid dangling tags. -func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error) { +func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error) { allTags, err := ts.All(ctx) switch err.(type) { + case nil: + break case distribution.ErrRepositoryUnknown: // This tag store has been initialized but not yet populated break - case nil: - break default: return nil, err } - var tags []string - for _, tag := range allTags { - tagLinkPathSpec := manifestTagCurrentPathSpec{ - name: ts.repository.Named().Name(), - tag: tag, + outputChan := make(chan string) + + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(ts.lookupConcurrencyFactor) + + go func() { + for _, tag := range allTags { + tag := tag // https://go.dev/doc/faq#closures_and_goroutines + group.Go(func() error { + tagLinkPathSpec := manifestTagCurrentPathSpec{ + name: ts.repository.Named().Name(), + tag: tag, + } + + tagLinkPath, _ := pathFor(tagLinkPathSpec) + tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) + + if err != nil { + switch err.(type) { + // PathNotFoundError shouldn't count as an error + case storagedriver.PathNotFoundError: + return nil + } + return err + } + + if tagDigest == desc.Digest { + outputChan <- tag + } + return nil + }) } + group.Wait() + close(outputChan) + }() - tagLinkPath, _ := pathFor(tagLinkPathSpec) - tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) - if err != nil { - switch err.(type) { - case storagedriver.PathNotFoundError: - continue - } - return nil, err - } + tags := make([]string, 0, len(allTags)) + for tag := range outputChan { + tags = append(tags, tag) + } - if tagDigest == desc.Digest { - tags = append(tags, tag) - } + if err := group.Wait(); err != nil { + return nil, err } return tags, nil } -func (ts *tagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) { +func (ts *TagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) { tagLinkPath := func(name string, dgst digest.Digest) (string, error) { return pathFor(manifestTagIndexEntryLinkPathSpec{ name: name, diff --git a/registry/storage/tagstore_test.go b/registry/storage/tagstore_test.go index 2659cfac274..2bff197abce 100644 --- a/registry/storage/tagstore_test.go +++ b/registry/storage/tagstore_test.go @@ -2,29 +2,45 @@ package storage import ( "context" + "errors" "reflect" + "strconv" "testing" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/manifest" "github.com/distribution/distribution/v3/manifest/schema2" + storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/distribution/v3/registry/storage/driver/inmemory" "github.com/distribution/reference" digest "github.com/opencontainers/go-digest" ) type tagsTestEnv struct { - ts distribution.TagService - bs distribution.BlobStore - ms distribution.ManifestService - gbs distribution.BlobStatter - ctx context.Context + ts distribution.TagService + bs distribution.BlobStore + ms distribution.ManifestService + gbs distribution.BlobStatter + ctx context.Context + mockDriver *mockInMemory +} + +type mockInMemory struct { + *inmemory.Driver + GetContentError error +} + +func (m *mockInMemory) GetContent(ctx context.Context, path string) ([]byte, error) { + if m.GetContentError != nil { + return nil, m.GetContentError + } + return m.Driver.GetContent(ctx, path) } func testTagStore(t *testing.T) *tagsTestEnv { ctx := context.Background() - d := inmemory.New() - reg, err := NewRegistry(ctx, d) + mockDriver := mockInMemory{inmemory.New(), nil} + reg, err := NewRegistry(ctx, &mockDriver) if err != nil { t.Fatal(err) } @@ -40,11 +56,12 @@ func testTagStore(t *testing.T) *tagsTestEnv { } return &tagsTestEnv{ - ctx: ctx, - ts: repo.Tags(ctx), - bs: repo.Blobs(ctx), - gbs: reg.BlobStatter(), - ms: ms, + ctx: ctx, + ts: repo.Tags(ctx), + bs: repo.Blobs(ctx), + gbs: reg.BlobStatter(), + ms: ms, + mockDriver: &mockDriver, } } @@ -167,13 +184,13 @@ func TestTagStoreAll(t *testing.T) { func TestTagLookup(t *testing.T) { env := testTagStore(t) - tagStore := env.ts + TagStore := env.ts ctx := env.ctx descA := distribution.Descriptor{Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"} desc0 := distribution.Descriptor{Digest: "sha256:0000000000000000000000000000000000000000000000000000000000000000"} - tags, err := tagStore.Lookup(ctx, descA) + tags, err := TagStore.Lookup(ctx, descA) if err != nil { t.Fatal(err) } @@ -181,27 +198,27 @@ func TestTagLookup(t *testing.T) { t.Fatalf("Lookup returned > 0 tags from empty store") } - err = tagStore.Tag(ctx, "a", descA) + err = TagStore.Tag(ctx, "a", descA) if err != nil { t.Fatal(err) } - err = tagStore.Tag(ctx, "b", descA) + err = TagStore.Tag(ctx, "b", descA) if err != nil { t.Fatal(err) } - err = tagStore.Tag(ctx, "0", desc0) + err = TagStore.Tag(ctx, "0", desc0) if err != nil { t.Fatal(err) } - err = tagStore.Tag(ctx, "1", desc0) + err = TagStore.Tag(ctx, "1", desc0) if err != nil { t.Fatal(err) } - tags, err = tagStore.Lookup(ctx, descA) + tags, err = TagStore.Lookup(ctx, descA) if err != nil { t.Fatal(err) } @@ -210,7 +227,7 @@ func TestTagLookup(t *testing.T) { t.Errorf("Lookup of descA returned %d tags, expected 2", len(tags)) } - tags, err = tagStore.Lookup(ctx, desc0) + tags, err = TagStore.Lookup(ctx, desc0) if err != nil { t.Fatal(err) } @@ -218,16 +235,44 @@ func TestTagLookup(t *testing.T) { if len(tags) != 2 { t.Errorf("Lookup of descB returned %d tags, expected 2", len(tags)) } + /// Should handle error looking up tag + env.mockDriver.GetContentError = errors.New("Lookup failure") + + for i := 2; i < 15; i++ { + err = TagStore.Tag(ctx, strconv.Itoa(i), desc0) + if err != nil { + t.Fatal(err) + } + } + + tags, err = TagStore.Lookup(ctx, desc0) + if err == nil { + t.Fatal("Expected error but none retrieved") + } + if len(tags) > 0 { + t.Errorf("Expected 0 tags on an error but got %d tags", len(tags)) + } + + // Should not error for a path not found + env.mockDriver.GetContentError = storagedriver.PathNotFoundError{} + + tags, err = TagStore.Lookup(ctx, desc0) + if err != nil { + t.Fatal(err) + } + if len(tags) > 0 { + t.Errorf("Expected 0 tags on path not found but got %d tags", len(tags)) + } } func TestTagIndexes(t *testing.T) { env := testTagStore(t) - tagStore := env.ts + TagStore := env.ts ctx := env.ctx - md, ok := tagStore.(distribution.TagManifestsProvider) + md, ok := TagStore.(distribution.TagManifestsProvider) if !ok { - t.Fatal("tagStore does not implement TagManifestDigests interface") + t.Fatal("TagStore does not implement TagManifestDigests interface") } conf, err := env.bs.Put(ctx, "application/octet-stream", []byte{0}) @@ -274,14 +319,14 @@ func TestTagIndexes(t *testing.T) { } if i < 3 { // tag first 3 manifests as "t1" - err = tagStore.Tag(ctx, "t1", desc) + err = TagStore.Tag(ctx, "t1", desc) if err != nil { t.Fatal(err) } t1Dgsts[dgst] = struct{}{} } else { // the last two under "t2" - err = tagStore.Tag(ctx, "t2", desc) + err = TagStore.Tag(ctx, "t2", desc) if err != nil { t.Fatal(err) } diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 00000000000..cbee7a4e230 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,132 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +}