Skip to content

Commit

Permalink
Concurrent Lookup in TagStore
Browse files Browse the repository at this point in the history
Based on #3589

Signed-off-by: Andrey Voronkov <andrey.voronkov@sbermarket.ru>
  • Loading branch information
Andrey Voronkov committed Apr 17, 2024
1 parent bc6e81e commit 31bbac8
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 60 deletions.
5 changes: 1 addition & 4 deletions registry/storage/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ func (repo *repository) Named() reference.Named {
}

func (repo *repository) Tags(ctx context.Context) distribution.TagService {
tags := &tagStore{
repository: repo,
blobStore: repo.registry.blobStore,
}
tags := NewStore(ctx, repo, repo.registry.blobStore)

return tags
}
Expand Down
104 changes: 73 additions & 31 deletions registry/storage/tagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,47 @@ package storage

import (
"context"
"os"
"path"
"sort"
"strconv"

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/dcontext"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/opencontainers/go-digest"
"golang.org/x/sync/errgroup"
)

var _ distribution.TagService = &tagStore{}
var _ distribution.TagService = &TagStore{}

// tagStore provides methods to manage manifest tags in a backend storage driver.
// TagStore provides methods to manage manifest tags in a backend storage driver.
// This implementation uses the same on-disk layout as the (now deleted) tag
// store. This provides backward compatibility with current registry deployments
// which only makes use of the Digest field of the returned distribution.Descriptor
// but does not enable full roundtripping of Descriptor objects
type tagStore struct {
repository *repository
blobStore *blobStore
type TagStore struct {
repository *repository
blobStore *blobStore
lookupConcurrencyFactor int
}

func NewStore(ctx context.Context, repository *repository, blobStore *blobStore) *TagStore {
logger := dcontext.GetLogger(ctx)
lookupConcurrencyFactor, err := strconv.Atoi(os.Getenv("STORAGE_TAGSTORE_LOOKUP_CONCURRENCY"))
if err != nil {
lookupConcurrencyFactor = 64
logger.Infof("TagStore: STORAGE_TAGSTORE_LOOKUP_CONCURRENCY is not set. Using default %d as lookup concurrency factor", lookupConcurrencyFactor)
}
return &TagStore{
repository: repository,
blobStore: blobStore,
lookupConcurrencyFactor: lookupConcurrencyFactor,
}
}

// All returns all tags
func (ts *tagStore) All(ctx context.Context) ([]string, error) {
func (ts *TagStore) All(ctx context.Context) ([]string, error) {
pathSpec, err := pathFor(manifestTagsPathSpec{
name: ts.repository.Named().Name(),
})
Expand Down Expand Up @@ -56,7 +75,7 @@ func (ts *tagStore) All(ctx context.Context) ([]string, error) {

// Tag tags the digest with the given tag, updating the store to point at
// the current tag. The digest must point to a manifest.
func (ts *tagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error {
func (ts *TagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error {
currentPath, err := pathFor(manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
Expand All @@ -77,7 +96,7 @@ func (ts *tagStore) Tag(ctx context.Context, tag string, desc distribution.Descr
}

// resolve the current revision for name and tag.
func (ts *tagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) {
func (ts *TagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) {
currentPath, err := pathFor(manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
Expand All @@ -100,7 +119,7 @@ func (ts *tagStore) Get(ctx context.Context, tag string) (distribution.Descripto
}

// Untag removes the tag association
func (ts *tagStore) Untag(ctx context.Context, tag string) error {
func (ts *TagStore) Untag(ctx context.Context, tag string) error {
tagPath, err := pathFor(manifestTagPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
Expand All @@ -116,7 +135,7 @@ func (ts *tagStore) Untag(ctx context.Context, tag string) error {
// to index manifest blobs by tag name. While the tag store doesn't map
// precisely to the linked blob store, using this ensures the links are
// managed via the same code path.
func (ts *tagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlobStore {
func (ts *TagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlobStore {
return &linkedBlobStore{
blobStore: ts.blobStore,
repository: ts.repository,
Expand All @@ -133,44 +152,67 @@ func (ts *tagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlob

// Lookup recovers a list of tags which refer to this digest. When a manifest is deleted by
// digest, tag entries which point to it need to be recovered to avoid dangling tags.
func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error) {
func (ts *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error) {
allTags, err := ts.All(ctx)
switch err.(type) {
case nil:
break
case distribution.ErrRepositoryUnknown:
// This tag store has been initialized but not yet populated
break
case nil:
break
default:
return nil, err
}

var tags []string
for _, tag := range allTags {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
outputChan := make(chan string)

group, ctx := errgroup.WithContext(ctx)
group.SetLimit(ts.lookupConcurrencyFactor)

go func() {
for _, tag := range allTags {
tag := tag // https://go.dev/doc/faq#closures_and_goroutines
group.Go(func() error {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
}

tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)

if err != nil {
switch err.(type) {
// PathNotFoundError shouldn't count as an error
case storagedriver.PathNotFoundError:
return nil
}
return err
}

if tagDigest == desc.Digest {
outputChan <- tag
}
return nil
})
}
group.Wait()
close(outputChan)
}()

tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
continue
}
return nil, err
}
var tags []string
for tag := range outputChan {
tags = append(tags, tag)
}

if tagDigest == desc.Digest {
tags = append(tags, tag)
}
if err := group.Wait(); err != nil {
return nil, err
}

return tags, nil
}

func (ts *tagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) {
func (ts *TagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) {
tagLinkPath := func(name string, dgst digest.Digest) (string, error) {
return pathFor(manifestTagIndexEntryLinkPathSpec{
name: name,
Expand Down
95 changes: 70 additions & 25 deletions registry/storage/tagstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,45 @@ package storage

import (
"context"
"errors"
"reflect"
"strconv"
"testing"

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/manifest"
"github.com/distribution/distribution/v3/manifest/schema2"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
"github.com/distribution/reference"
digest "github.com/opencontainers/go-digest"
)

type tagsTestEnv struct {
ts distribution.TagService
bs distribution.BlobStore
ms distribution.ManifestService
gbs distribution.BlobStatter
ctx context.Context
ts distribution.TagService
bs distribution.BlobStore
ms distribution.ManifestService
gbs distribution.BlobStatter
ctx context.Context
mockDriver *mockInMemory
}

type mockInMemory struct {
*inmemory.Driver
GetContentError error
}

func (m *mockInMemory) GetContent(ctx context.Context, path string) ([]byte, error) {
if m.GetContentError != nil {
return nil, m.GetContentError
}
return m.Driver.GetContent(ctx, path)
}

func testTagStore(t *testing.T) *tagsTestEnv {
ctx := context.Background()
d := inmemory.New()
reg, err := NewRegistry(ctx, d)
mockDriver := mockInMemory{inmemory.New(), nil}
reg, err := NewRegistry(ctx, &mockDriver)
if err != nil {
t.Fatal(err)
}
Expand All @@ -40,11 +56,12 @@ func testTagStore(t *testing.T) *tagsTestEnv {
}

return &tagsTestEnv{
ctx: ctx,
ts: repo.Tags(ctx),
bs: repo.Blobs(ctx),
gbs: reg.BlobStatter(),
ms: ms,
ctx: ctx,
ts: repo.Tags(ctx),
bs: repo.Blobs(ctx),
gbs: reg.BlobStatter(),
ms: ms,
mockDriver: &mockDriver,
}
}

Expand Down Expand Up @@ -167,41 +184,41 @@ func TestTagStoreAll(t *testing.T) {

func TestTagLookup(t *testing.T) {
env := testTagStore(t)
tagStore := env.ts
TagStore := env.ts
ctx := env.ctx

descA := distribution.Descriptor{Digest: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
desc0 := distribution.Descriptor{Digest: "sha256:0000000000000000000000000000000000000000000000000000000000000000"}

tags, err := tagStore.Lookup(ctx, descA)
tags, err := TagStore.Lookup(ctx, descA)
if err != nil {
t.Fatal(err)
}
if len(tags) != 0 {
t.Fatalf("Lookup returned > 0 tags from empty store")
}

err = tagStore.Tag(ctx, "a", descA)
err = TagStore.Tag(ctx, "a", descA)
if err != nil {
t.Fatal(err)
}

err = tagStore.Tag(ctx, "b", descA)
err = TagStore.Tag(ctx, "b", descA)
if err != nil {
t.Fatal(err)
}

err = tagStore.Tag(ctx, "0", desc0)
err = TagStore.Tag(ctx, "0", desc0)
if err != nil {
t.Fatal(err)
}

err = tagStore.Tag(ctx, "1", desc0)
err = TagStore.Tag(ctx, "1", desc0)
if err != nil {
t.Fatal(err)
}

tags, err = tagStore.Lookup(ctx, descA)
tags, err = TagStore.Lookup(ctx, descA)
if err != nil {
t.Fatal(err)
}
Expand All @@ -210,24 +227,52 @@ func TestTagLookup(t *testing.T) {
t.Errorf("Lookup of descA returned %d tags, expected 2", len(tags))
}

tags, err = tagStore.Lookup(ctx, desc0)
tags, err = TagStore.Lookup(ctx, desc0)
if err != nil {
t.Fatal(err)
}

if len(tags) != 2 {
t.Errorf("Lookup of descB returned %d tags, expected 2", len(tags))
}
/// Should handle error looking up tag
env.mockDriver.GetContentError = errors.New("Lookup failure")

for i := 2; i < 15; i++ {
err = TagStore.Tag(ctx, strconv.Itoa(i), desc0)
if err != nil {
t.Fatal(err)
}
}

tags, err = TagStore.Lookup(ctx, desc0)
if err == nil {
t.Fatal("Expected error but none retrieved")
}
if len(tags) > 0 {
t.Errorf("Expected 0 tags on an error but got %d tags", len(tags))
}

// Should not error for a path not found
env.mockDriver.GetContentError = storagedriver.PathNotFoundError{}

tags, err = TagStore.Lookup(ctx, desc0)
if err != nil {
t.Fatal(err)
}
if len(tags) > 0 {
t.Errorf("Expected 0 tags on path not found but got %d tags", len(tags))
}
}

func TestTagIndexes(t *testing.T) {
env := testTagStore(t)
tagStore := env.ts
TagStore := env.ts
ctx := env.ctx

md, ok := tagStore.(distribution.TagManifestsProvider)
md, ok := TagStore.(distribution.TagManifestsProvider)
if !ok {
t.Fatal("tagStore does not implement TagManifestDigests interface")
t.Fatal("TagStore does not implement TagManifestDigests interface")
}

conf, err := env.bs.Put(ctx, "application/octet-stream", []byte{0})
Expand Down Expand Up @@ -274,14 +319,14 @@ func TestTagIndexes(t *testing.T) {
}
if i < 3 {
// tag first 3 manifests as "t1"
err = tagStore.Tag(ctx, "t1", desc)
err = TagStore.Tag(ctx, "t1", desc)
if err != nil {
t.Fatal(err)
}
t1Dgsts[dgst] = struct{}{}
} else {
// the last two under "t2"
err = tagStore.Tag(ctx, "t2", desc)
err = TagStore.Tag(ctx, "t2", desc)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 31bbac8

Please sign in to comment.