Skip to content

Commit

Permalink
Merge pull request #220 from PerseidMeteor/fix-blob-lost
Browse files Browse the repository at this point in the history
fix: blob lost and lack mutex protection
  • Loading branch information
imeoer committed Oct 24, 2023
2 parents 7d59989 + 879220a commit 95fbdcd
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 21 deletions.
63 changes: 53 additions & 10 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type RemoteCache struct {
records map[digest.Digest]*Item
// size is the cache record capacity of target layers.
size int
// newAdded records the number of new caches added in one conversion.
newAdded uint
}

func New(ctx context.Context, ref string, size int, pvd Provider) (context.Context, *RemoteCache) {
Expand All @@ -81,6 +83,8 @@ func New(ctx context.Context, ref string, size int, pvd Provider) (context.Conte
}

func (rc *RemoteCache) getByTarget(target digest.Digest) *Item {
rc.mutex.Lock()
defer rc.mutex.Unlock()
for _, item := range rc.records {
if item.Target.Digest == target {
return item
Expand All @@ -90,12 +94,12 @@ func (rc *RemoteCache) getByTarget(target digest.Digest) *Item {
}

func (rc *RemoteCache) getBySource(source digest.Digest) *Item {
rc.mutex.Lock()
defer rc.mutex.Unlock()
return rc.records[source]
}

func (rc *RemoteCache) get(digest digest.Digest) *ocispec.Descriptor {
rc.mutex.Lock()
defer rc.mutex.Unlock()
if item := rc.getBySource(digest); item != nil {
return &item.Source
}
Expand All @@ -119,6 +123,18 @@ func (rc *RemoteCache) set(source, target ocispec.Descriptor) {
}
}

func (rc *RemoteCache) updateItem(dgst digest.Digest, labels map[string]string) (*RemoteCache, *ocispec.Descriptor) {
if item := rc.getBySource(dgst); item != nil {
item.Source.Annotations = mergeMap(item.Source.Annotations, labels)
return rc, &item.Source
}
if item := rc.getByTarget(dgst); item != nil {
item.Target.Annotations = mergeMap(item.Source.Annotations, labels)
return rc, &item.Target
}
return nil, nil
}

func mergeMap(left, right map[string]string) map[string]string {
if left == nil {
left = map[string]string{}
Expand All @@ -143,21 +159,17 @@ func Get(ctx context.Context, dgst digest.Digest) (*RemoteCache, *ocispec.Descri
func Set(ctx context.Context, source, target ocispec.Descriptor) {
rc, ok := ctx.Value(cacheKey{}).(*RemoteCache)
if ok {
if rc.get(source.Digest) == nil {
rc.newAdded++
}
rc.set(source, target)
}
}

func Update(ctx context.Context, dgst digest.Digest, labels map[string]string) (*RemoteCache, *ocispec.Descriptor) {
rc, ok := ctx.Value(cacheKey{}).(*RemoteCache)
if ok {
if item := rc.getBySource(dgst); item != nil {
item.Source.Annotations = mergeMap(item.Source.Annotations, labels)
return rc, &item.Source
}
if item := rc.getByTarget(dgst); item != nil {
item.Target.Annotations = mergeMap(item.Source.Annotations, labels)
return rc, &item.Target
}
return rc.updateItem(dgst, labels)
}
return nil, nil
}
Expand Down Expand Up @@ -449,3 +461,34 @@ func appendLayers(orgDescs, newDescs []ocispec.Descriptor, size int) []ocispec.D
}
return mergedLayers
}

// HitCount returns the hitted and total count of cache layers in a conversion.
func (rc *RemoteCache) HitCount(ctx context.Context, desc ocispec.Descriptor, platform platforms.MatchComparer) (uint, uint, error) {
maniDescs := []ocispec.Descriptor{}
switch desc.MediaType {
case ocispec.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
maniDescs = append(maniDescs, desc)
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
descs, err := utils.GetManifests(ctx, rc.provider.ContentStore(), desc, platform)
if err != nil {
return 0, 0, err
}
maniDescs = append(maniDescs, descs...)
}
var cached, total uint
for _, maniDesc := range maniDescs {
manifest := ocispec.Manifest{}
_, err := utils.ReadJSON(ctx, rc.provider.ContentStore(), &manifest, maniDesc)
if err != nil {
return 0, 0, errors.Wrap(err, "read manifest")
}
total += uint(len(manifest.Layers))
for _, sourceLayer := range manifest.Layers {
if item := rc.get(sourceLayer.Digest); item != nil {
cached++
}
}
}
cached -= rc.newAdded
return cached, total, nil
}
19 changes: 10 additions & 9 deletions pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,17 @@ func (content *Content) Update(ctx context.Context, info ctrcontent.Info, fieldp
}
}

info, err := content.store.Update(ctx, info, fieldpaths...)
if _, cached := cache.Update(ctx, info.Digest, info.Labels); cached != nil {
return ctrcontent.Info{
Digest: cached.Digest,
Size: cached.Size,
Labels: cached.Annotations,
}, nil
updatedInfo, err := content.store.Update(ctx, info, fieldpaths...)
if errors.Is(err, errdefs.ErrNotFound) {
if _, cached := cache.Update(ctx, info.Digest, info.Labels); cached != nil {
return ctrcontent.Info{
Digest: cached.Digest,
Size: cached.Size,
Labels: cached.Annotations,
}, nil
}
}

return info, err
return updatedInfo, err
}

func (content *Content) Walk(ctx context.Context, fn ctrcontent.WalkFunc, fs ...string) error {
Expand Down
28 changes: 26 additions & 2 deletions pkg/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/reference/docker"
"github.com/goharbor/acceleration-service/pkg/adapter/annotation"
"github.com/goharbor/acceleration-service/pkg/cache"
"github.com/goharbor/acceleration-service/pkg/content"
"github.com/goharbor/acceleration-service/pkg/driver"
"github.com/goharbor/acceleration-service/pkg/errdefs"
Expand Down Expand Up @@ -142,7 +143,11 @@ func (cvt *Converter) Convert(ctx context.Context, source, target, cacheRef stri
if err := metric.SetSourceImageSize(ctx, cvt, source); err != nil {
return nil, errors.Wrap(err, "get source image size")
}
logger.Infof("pulled image %s, elapse %s", source, metric.SourcePullElapsed)
hitInfo, err := cvt.cacheHitInfo(ctx, source, cache)
if err != nil {
logger.Warnf("get cache hit count: %s", err.Error())
}
logger.Infof("pulled image %s %s, elapse %s", source, hitInfo, metric.SourcePullElapsed)

logger.Infof("converting image %s", source)
start = time.Now()
Expand All @@ -158,7 +163,11 @@ func (cvt *Converter) Convert(ctx context.Context, source, target, cacheRef stri
if err := metric.SetTargetImageSize(ctx, cvt, desc); err != nil {
return nil, errors.Wrap(err, "get target image size")
}
logger.Infof("converted image %s, elapse %s", target, metric.ConversionElapsed)
hitInfo, err = cvt.cacheHitInfo(ctx, source, cache)
if err != nil {
logger.Warnf("get cache hit count: %s", err.Error())
}
logger.Infof("converted image %s %s, elapse %s", target, hitInfo, metric.ConversionElapsed)

if cache != nil {
sourceImage, err := cvt.provider.Image(ctx, source)
Expand Down Expand Up @@ -190,3 +199,18 @@ func (cvt *Converter) Convert(ctx context.Context, source, target, cacheRef stri

return &metric, nil
}

func (cvt *Converter) cacheHitInfo(ctx context.Context, source string, cache *cache.RemoteCache) (string, error) {
if cache != nil {
sourceImage, err := cvt.provider.Image(ctx, source)
if err != nil {
return "", err
}
cached, total, err := cache.HitCount(ctx, *sourceImage, cvt.platformMC)
if err != nil {
return "", err
}
return fmt.Sprintf("(cached %d/%d)", cached, total), nil
}
return "", nil
}

0 comments on commit 95fbdcd

Please sign in to comment.