diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index fd473e457c2..f83ef329a91 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -79,35 +79,6 @@ func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter return true, pbs.localStore.ServeBlob(ctx, w, r, dgst) } -func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error { - defer func() { - mu.Lock() - delete(inflight, dgst) - mu.Unlock() - }() - - var desc distribution.Descriptor - var err error - var bw distribution.BlobWriter - - bw, err = pbs.localStore.Create(ctx) - if err != nil { - return err - } - - desc, err = pbs.copyContent(ctx, dgst, bw) - if err != nil { - return err - } - - _, err = bw.Commit(ctx, desc) - if err != nil { - return err - } - - return nil -} - func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { served, err := pbs.serveLocal(ctx, w, r, dgst) if err != nil { @@ -126,6 +97,9 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, mu.Lock() _, ok := inflight[dgst] if ok { + // If the blob has been serving in other requests. + // Will return the blob from the remote store directly. + // TODO Maybe we could reuse the these blobs are serving remotely and caching locally. mu.Unlock() _, err := pbs.copyContent(ctx, dgst, w) return err @@ -133,33 +107,40 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, inflight[dgst] = struct{}{} mu.Unlock() - // storeLocalCtx will be independent with ctx, because ctx it used to fetch remote image. - // There would be a situation, that is pulling remote bytes ends before pbs.storeLocal( 'Copy', 'Commit' ...) - // Then the registry fails to cache the layer, even though the layer had been served to client. - storeLocalCtx, cancel := context.WithCancel(context.Background()) - go func(dgst digest.Digest) { - defer cancel() - if err := pbs.storeLocal(storeLocalCtx, dgst); err != nil { - dcontext.GetLogger(storeLocalCtx).Errorf("Error committing to storage: %s", err.Error()) - } - - blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) - if err != nil { - dcontext.GetLogger(storeLocalCtx).Errorf("Error creating reference: %s", err) - return - } - - if pbs.scheduler != nil && pbs.ttl != nil { - pbs.scheduler.AddBlob(blobRef, *pbs.ttl) - } - - }(dgst) - - _, err = pbs.copyContent(ctx, dgst, w) + defer func() { + mu.Lock() + delete(inflight, dgst) + mu.Unlock() + }() + + bw, err := pbs.localStore.Create(ctx) + if err != nil { + return err + } + + // Serving client and storing locally over same fetching request. + // This can prevent a redundant blob fetching. + multiWriter := io.MultiWriter(w, bw) + desc, err := pbs.copyContent(ctx, dgst, multiWriter) + if err != nil { + return err + } + + _, err = bw.Commit(ctx, desc) if err != nil { - cancel() return err } + + blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) + if err != nil { + dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) + return err + } + + if pbs.scheduler != nil && pbs.ttl != nil { + pbs.scheduler.AddBlob(blobRef, *pbs.ttl) + } + return nil } diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index 60297b360f2..6a4324e7bcf 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -336,6 +336,12 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { remoteStats := te.RemoteStats() var wg sync.WaitGroup + var descHitMap = map[digest.Digest]bool{} + var hitLock sync.Mutex + + for _, remoteBlob := range te.inRemote { + descHitMap[remoteBlob.Digest] = true + } for i := 0; i < numClients; i++ { // Serveblob - pulls through blobs @@ -362,6 +368,15 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { t.Errorf("Mismatching blob fetch from proxy") return } + + desc, err := te.store.localStore.Stat(te.ctx, remoteBlob.Digest) + if err != nil { + continue + } + + hitLock.Lock() + delete(descHitMap, desc.Digest) + hitLock.Unlock() } }() } @@ -371,11 +386,16 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) { t.FailNow() } + if len(descHitMap) > 0 { + t.Errorf("Expected hit cache at least once, but it turns out that no caches was hit") + t.FailNow() + } + remoteBlobCount := len(te.inRemote) sbsMu.Lock() - if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique { + if (*localStats)["stat"] != remoteBlobCount*numClients*2 && (*localStats)["create"] != te.numUnique { sbsMu.Unlock() - t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount) + t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount, "Got: stat:", (*localStats)["stat"], "create:", (*localStats)["create"]) } sbsMu.Unlock()