Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: avoid redundant blob fetching #3569

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 34 additions & 53 deletions registry/proxy/proxyblobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,35 +79,6 @@ func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
}

func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
defer func() {
mu.Lock()
delete(inflight, dgst)
mu.Unlock()
}()

var desc distribution.Descriptor
var err error
var bw distribution.BlobWriter

bw, err = pbs.localStore.Create(ctx)
if err != nil {
return err
}

desc, err = pbs.copyContent(ctx, dgst, bw)
if err != nil {
return err
}

_, err = bw.Commit(ctx, desc)
if err != nil {
return err
}

return nil
}

func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
served, err := pbs.serveLocal(ctx, w, r, dgst)
if err != nil {
Expand All @@ -126,40 +97,50 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
mu.Lock()
_, ok := inflight[dgst]
if ok {
// If the blob has been serving in other requests.
// Will return the blob from the remote store directly.
// TODO Maybe we could reuse the these blobs are serving remotely and caching locally.
mu.Unlock()
_, err := pbs.copyContent(ctx, dgst, w)
return err
}
inflight[dgst] = struct{}{}
mu.Unlock()

// storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image.
// There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...)
// Then the registry fails to cache the layer, even though the layer had been served to client.
storeLocalCtx, cancel := context.WithCancel(context.Background())
go func(dgst digest.Digest) {
defer cancel()
if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil {
dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error())
}

blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err)
return
}

if pbs.scheduler != nil && pbs.ttl != nil {
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
}

}(dgst)

_, err = pbs.copyContent(ctx, dgst, w)
defer func() {
mu.Lock()
delete(inflight, dgst)
mu.Unlock()
}()

bw, err := pbs.localStore.Create(ctx)
if err != nil {
return err
}

// Serving client and storing locally over same fetching request.
// This can prevent a redundant blob fetching.
multiWriter := io.MultiWriter(w, bw)
desc, err := pbs.copyContent(ctx, dgst, multiWriter)
if err != nil {
return err
}

_, err = bw.Commit(ctx, desc)
if err != nil {
cancel()
return err
}

blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
if err != nil {
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
return err
}

if pbs.scheduler != nil && pbs.ttl != nil {
pbs.scheduler.AddBlob(blobRef, *pbs.ttl)
}

return nil
}

Expand Down
24 changes: 22 additions & 2 deletions registry/proxy/proxyblobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
remoteStats := te.RemoteStats()

var wg sync.WaitGroup
var descHitMap = map[digest.Digest]bool{}
var hitLock sync.Mutex

for _, remoteBlob := range te.inRemote {
descHitMap[remoteBlob.Digest] = true
}

for i := 0; i < numClients; i++ {
// Serveblob - pulls through blobs
Expand All @@ -362,6 +368,15 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
t.Errorf("Mismatching blob fetch from proxy")
return
}

desc, err := te.store.localStore.Stat(te.ctx, remoteBlob.Digest)
if err != nil {
continue
}

hitLock.Lock()
delete(descHitMap, desc.Digest)
hitLock.Unlock()
}
}()
}
Expand All @@ -371,11 +386,16 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
t.FailNow()
}

if len(descHitMap) > 0 {
t.Errorf("Expected hit cache at least once, but it turns out that no caches was hit")
t.FailNow()
}

remoteBlobCount := len(te.inRemote)
sbsMu.Lock()
if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
if (*localStats)["stat"] != remoteBlobCount*numClients*2 && (*localStats)["create"] != te.numUnique {
sbsMu.Unlock()
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount, "Got: stat:", (*localStats)["stat"], "create:", (*localStats)["create"])
}
sbsMu.Unlock()

Expand Down