Skip to content

Commit

Permalink
Concurrent Lookup in TagStore
Browse files Browse the repository at this point in the history
Based on distribution#3589

Signed-off-by: Andrey Voronkov <voronkovaa@gmail.com>
  • Loading branch information
Antiarchitect committed Jan 25, 2023
1 parent 3629105 commit a53638d
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 34 deletions.
5 changes: 1 addition & 4 deletions registry/storage/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,7 @@ func (repo *repository) Named() reference.Named {
}

func (repo *repository) Tags(ctx context.Context) distribution.TagService {
tags := &tagStore{
repository: repo,
blobStore: repo.registry.blobStore,
}
tags := NewStore(repo, repo.registry.blobStore)

return tags
}
Expand Down
118 changes: 99 additions & 19 deletions registry/storage/tagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package storage

import (
"context"
"os"
"path"
"sort"
"strconv"
"sync"

"github.com/distribution/distribution/v3"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
Expand All @@ -18,8 +21,21 @@ var _ distribution.TagService = &tagStore{}
// which only makes use of the Digest field of the returned distribution.Descriptor
// but does not enable full roundtripping of Descriptor objects
type tagStore struct {
repository *repository
blobStore *blobStore
repository *repository
blobStore *blobStore
lookupConcurrencyFactor int
}

func NewStore(repository *repository, blobStore *blobStore) *tagStore {
lookupConcurrencyFactor, err := strconv.Atoi(os.Getenv("STORAGE_TAGSTORE_LOOKUP_CONCURRENCY"))
if err != nil {
lookupConcurrencyFactor = 64
}
return &tagStore{
repository: repository,
blobStore: blobStore,
lookupConcurrencyFactor: lookupConcurrencyFactor,
}
}

// All returns all tags
Expand Down Expand Up @@ -132,45 +148,109 @@ func (ts *tagStore) linkedBlobStore(ctx context.Context, tag string) *linkedBlob
}
}

type atomicError struct {
mu sync.Mutex
err error
}

func (e *atomicError) Store(err error) {
e.mu.Lock()
defer e.mu.Unlock()
e.err = err
}

func (e *atomicError) Load() error {
e.mu.Lock()
defer e.mu.Unlock()
return e.err
}

// Lookup recovers a list of tags which refer to this digest. When a manifest is deleted by
// digest, tag entries which point to it need to be recovered to avoid dangling tags.
func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error) {
allTags, err := ts.All(ctx)
switch err.(type) {
case nil:
break
case distribution.ErrRepositoryUnknown:
// This tag store has been initialized but not yet populated
break
case nil:
break
default:
return nil, err
}

var tags []string
lookupErr := &atomicError{}

inputChan := make(chan string)
outputChan := make(chan string, len(allTags))

workersWaitGroup := sync.WaitGroup{}
workersWaitGroup.Add(ts.lookupConcurrencyFactor)

for i := 0; i < ts.lookupConcurrencyFactor; i++ {
go func() {
defer workersWaitGroup.Done()
for tag := range inputChan {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
}

tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)

if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
continue
}
lookupErr.Store(err)
}

if tagDigest == desc.Digest {
outputChan <- tag
}
}
}()
}

for _, tag := range allTags {
tagLinkPathSpec := manifestTagCurrentPathSpec{
name: ts.repository.Named().Name(),
tag: tag,
if lookupErr.Load() != nil {
break
}

tagLinkPath, _ := pathFor(tagLinkPathSpec)
tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
continue
}
return nil, err
// Fastcheck for ctx.Done()
select {
case <-ctx.Done():
lookupErr.Store(ctx.Err())
break
default:
}

if tagDigest == desc.Digest {
tags = append(tags, tag)
select {
case <-ctx.Done():
lookupErr.Store(ctx.Err())
break
case inputChan <- tag:
}
}
close(inputChan)

workersWaitGroup.Wait()

close(outputChan)

if err := lookupErr.Load(); err != nil {
return nil, err
}

tags := make([]string, 0, len(outputChan))
for tag := range outputChan {
tags = append(tags, tag)
}

return tags, nil
}

func (ts *tagStore) ManifestDigests(ctx context.Context, tag string) ([]digest.Digest, error) {
tagLinkPath := func(name string, dgst digest.Digest) (string, error) {
return pathFor(manifestTagIndexEntryLinkPathSpec{
Expand Down
100 changes: 89 additions & 11 deletions registry/storage/tagstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,78 @@ package storage

import (
"context"
"errors"
"io"
"reflect"
"strconv"
"testing"

"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/manifest"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/distribution/distribution/v3/reference"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
digest "github.com/opencontainers/go-digest"
)

type tagsTestEnv struct {
ts distribution.TagService
bs distribution.BlobStore
ms distribution.ManifestService
gbs distribution.BlobStatter
ctx context.Context
ts distribution.TagService
bs distribution.BlobStore
ms distribution.ManifestService
gbs distribution.BlobStatter
ctx context.Context
mockDriver *mockInMemory
}

type mockInMemory struct {
base.Base
driver *inmemory.Driver
GetContentError error
}

var _ storagedriver.StorageDriver = &mockInMemory{}

func (m *mockInMemory) Name() string {
return m.driver.Name()
}
func (m *mockInMemory) GetContent(ctx context.Context, path string) ([]byte, error) {
if m.GetContentError != nil {
return nil, m.GetContentError
}
return m.driver.GetContent(ctx, path)
}
func (m *mockInMemory) PutContent(ctx context.Context, path string, content []byte) error {
return m.driver.PutContent(ctx, path, content)
}
func (m *mockInMemory) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
return m.driver.Reader(ctx, path, offset)
}
func (m *mockInMemory) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
return m.driver.Writer(ctx, path, append)
}
func (m *mockInMemory) List(ctx context.Context, path string) ([]string, error) {
return m.driver.List(ctx, path)
}
func (m *mockInMemory) Move(ctx context.Context, sourcePath string, destPath string) error {
return m.driver.Move(ctx, sourcePath, destPath)
}
func (m *mockInMemory) Delete(ctx context.Context, path string) error {
return m.driver.Delete(ctx, path)
}
func (m *mockInMemory) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
return m.driver.URLFor(ctx, path, options)
}
func (m *mockInMemory) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
return m.driver.Walk(ctx, path, f)
}

func testTagStore(t *testing.T) *tagsTestEnv {
ctx := context.Background()
d := inmemory.New()
reg, err := NewRegistry(ctx, d)
mockDriver := mockInMemory{driver: d, Base: d.Base}
reg, err := NewRegistry(ctx, &mockDriver)
if err != nil {
t.Fatal(err)
}
Expand All @@ -40,11 +89,12 @@ func testTagStore(t *testing.T) *tagsTestEnv {
}

return &tagsTestEnv{
ctx: ctx,
ts: repo.Tags(ctx),
bs: repo.Blobs(ctx),
gbs: reg.BlobStatter(),
ms: ms,
ctx: ctx,
ts: repo.Tags(ctx),
bs: repo.Blobs(ctx),
gbs: reg.BlobStatter(),
ms: ms,
mockDriver: &mockDriver,
}
}

Expand Down Expand Up @@ -218,6 +268,34 @@ func TestTagLookup(t *testing.T) {
if len(tags) != 2 {
t.Errorf("Lookup of descB returned %d tags, expected 2", len(tags))
}
/// Should handle error looking up tag
env.mockDriver.GetContentError = errors.New("Lookup failure")

for i := 2; i < 15; i++ {
err = tagStore.Tag(ctx, strconv.Itoa(i), desc0)
if err != nil {
t.Fatal(err)
}
}

tags, err = tagStore.Lookup(ctx, desc0)
if err == nil {
t.Fatal("Expected error but none retrieved")
}
if len(tags) > 0 {
t.Errorf("Expected 0 tags on an error but got %d tags", len(tags))
}

// Should not error for a path not found
env.mockDriver.GetContentError = storagedriver.PathNotFoundError{}

tags, err = tagStore.Lookup(ctx, desc0)
if err != nil {
t.Fatal(err)
}
if len(tags) > 0 {
t.Errorf("Expected 0 tags on path not found but got %d tags", len(tags))
}
}

func TestTagIndexes(t *testing.T) {
Expand Down

0 comments on commit a53638d

Please sign in to comment.