Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit 7289c72

Browse files
committed
Adds cross-repository blob pushing behavior
Tracks source repository information for each blob in the blobsum service, which is then used to attempt to mount blobs from another repository when pushing instead of having to re-push blobs to the same registry. Signed-off-by: Brian Bland <brian.bland@docker.com>
1 parent 9c9a1d1 commit 7289c72

File tree

14 files changed

+335
-42
lines changed

14 files changed

+335
-42
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ RUN set -x \
152152
# both. This allows integration-cli tests to cover push/pull with both schema1
153153
# and schema2 manifests.
154154
ENV REGISTRY_COMMIT_SCHEMA1 ec87e9b6971d831f0eff752ddb54fb64693e51cd
155-
ENV REGISTRY_COMMIT a7ae88da459b98b481a245e5b1750134724ac67d
155+
ENV REGISTRY_COMMIT 93d9070c8bb28414de9ec96fd38c89614acd8435
156156
RUN set -x \
157157
&& export GOPATH="$(mktemp -d)" \
158158
&& git clone https://github.com/docker/distribution.git "$GOPATH/src/github.com/docker/distribution" \

distribution/metadata/blobsum_service.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@ type BlobSumService struct {
1313
store Store
1414
}
1515

16+
// BlobSum contains the digest and source repository information for a layer.
17+
type BlobSum struct {
18+
Digest digest.Digest
19+
SourceRepository string
20+
}
21+
1622
// maxBlobSums is the number of blobsums to keep per layer DiffID.
17-
const maxBlobSums = 5
23+
const maxBlobSums = 50
1824

1925
// NewBlobSumService creates a new blobsum mapping service.
2026
func NewBlobSumService(store Store) *BlobSumService {
@@ -35,18 +41,18 @@ func (blobserv *BlobSumService) diffIDKey(diffID layer.DiffID) string {
3541
return string(digest.Digest(diffID).Algorithm()) + "/" + digest.Digest(diffID).Hex()
3642
}
3743

38-
func (blobserv *BlobSumService) blobSumKey(blobsum digest.Digest) string {
39-
return string(blobsum.Algorithm()) + "/" + blobsum.Hex()
44+
func (blobserv *BlobSumService) blobSumKey(blobsum BlobSum) string {
45+
return string(blobsum.Digest.Algorithm()) + "/" + blobsum.Digest.Hex()
4046
}
4147

4248
// GetBlobSums finds the blobsums associated with a layer DiffID.
43-
func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]digest.Digest, error) {
49+
func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]BlobSum, error) {
4450
jsonBytes, err := blobserv.store.Get(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID))
4551
if err != nil {
4652
return nil, err
4753
}
4854

49-
var blobsums []digest.Digest
55+
var blobsums []BlobSum
5056
if err := json.Unmarshal(jsonBytes, &blobsums); err != nil {
5157
return nil, err
5258
}
@@ -55,7 +61,7 @@ func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]digest.Diges
5561
}
5662

5763
// GetDiffID finds a layer DiffID from a blobsum hash.
58-
func (blobserv *BlobSumService) GetDiffID(blobsum digest.Digest) (layer.DiffID, error) {
64+
func (blobserv *BlobSumService) GetDiffID(blobsum BlobSum) (layer.DiffID, error) {
5965
diffIDBytes, err := blobserv.store.Get(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum))
6066
if err != nil {
6167
return layer.DiffID(""), err
@@ -66,12 +72,12 @@ func (blobserv *BlobSumService) GetDiffID(blobsum digest.Digest) (layer.DiffID,
6672

6773
// Add associates a blobsum with a layer DiffID. If too many blobsums are
6874
// present, the oldest one is dropped.
69-
func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum digest.Digest) error {
75+
func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum BlobSum) error {
7076
oldBlobSums, err := blobserv.GetBlobSums(diffID)
7177
if err != nil {
7278
oldBlobSums = nil
7379
}
74-
newBlobSums := make([]digest.Digest, 0, len(oldBlobSums)+1)
80+
newBlobSums := make([]BlobSum, 0, len(oldBlobSums)+1)
7581

7682
// Copy all other blobsums to new slice
7783
for _, oldSum := range oldBlobSums {
@@ -98,3 +104,34 @@ func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum digest.Digest)
98104

99105
return blobserv.store.Set(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum), []byte(diffID))
100106
}
107+
108+
// Remove unassociates a blobsum from a layer DiffID.
109+
func (blobserv *BlobSumService) Remove(blobsum BlobSum) error {
110+
diffID, err := blobserv.GetDiffID(blobsum)
111+
if err != nil {
112+
return err
113+
}
114+
oldBlobSums, err := blobserv.GetBlobSums(diffID)
115+
if err != nil {
116+
oldBlobSums = nil
117+
}
118+
newBlobSums := make([]BlobSum, 0, len(oldBlobSums))
119+
120+
// Copy all other blobsums to new slice
121+
for _, oldSum := range oldBlobSums {
122+
if oldSum != blobsum {
123+
newBlobSums = append(newBlobSums, oldSum)
124+
}
125+
}
126+
127+
if len(newBlobSums) == 0 {
128+
return blobserv.store.Delete(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID))
129+
}
130+
131+
jsonBytes, err := json.Marshal(newBlobSums)
132+
if err != nil {
133+
return err
134+
}
135+
136+
return blobserv.store.Set(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID), jsonBytes)
137+
}

distribution/metadata/blobsum_service_test.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package metadata
22

33
import (
4+
"encoding/hex"
45
"io/ioutil"
6+
"math/rand"
57
"os"
68
"reflect"
79
"testing"
@@ -23,33 +25,32 @@ func TestBlobSumService(t *testing.T) {
2325
}
2426
blobSumService := NewBlobSumService(metadataStore)
2527

28+
tooManyBlobSums := make([]BlobSum, 100)
29+
for i := range tooManyBlobSums {
30+
randDigest := randomDigest()
31+
tooManyBlobSums[i] = BlobSum{Digest: randDigest}
32+
}
33+
2634
testVectors := []struct {
2735
diffID layer.DiffID
28-
blobsums []digest.Digest
36+
blobsums []BlobSum
2937
}{
3038
{
3139
diffID: layer.DiffID("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"),
32-
blobsums: []digest.Digest{
33-
digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
40+
blobsums: []BlobSum{
41+
{Digest: digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937")},
3442
},
3543
},
3644
{
3745
diffID: layer.DiffID("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa"),
38-
blobsums: []digest.Digest{
39-
digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
40-
digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"),
46+
blobsums: []BlobSum{
47+
{Digest: digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937")},
48+
{Digest: digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e")},
4149
},
4250
},
4351
{
44-
diffID: layer.DiffID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"),
45-
blobsums: []digest.Digest{
46-
digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
47-
digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"),
48-
digest.Digest("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"),
49-
digest.Digest("sha256:8902a7ca89aabbb868835260912159026637634090dd8899eee969523252236e"),
50-
digest.Digest("sha256:c84364306344ccc48532c52ff5209236273525231dddaaab53262322352883aa"),
51-
digest.Digest("sha256:aa7583bbc87532a8352bbb72520a821b3623523523a8352523a52352aaa888fe"),
52-
},
52+
diffID: layer.DiffID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"),
53+
blobsums: tooManyBlobSums,
5354
},
5455
}
5556

@@ -70,8 +71,8 @@ func TestBlobSumService(t *testing.T) {
7071
t.Fatalf("error calling Get: %v", err)
7172
}
7273
expectedBlobsums := len(vec.blobsums)
73-
if expectedBlobsums > 5 {
74-
expectedBlobsums = 5
74+
if expectedBlobsums > 50 {
75+
expectedBlobsums = 50
7576
}
7677
if !reflect.DeepEqual(blobsums, vec.blobsums[len(vec.blobsums)-expectedBlobsums:len(vec.blobsums)]) {
7778
t.Fatal("Get returned incorrect layer ID")
@@ -85,7 +86,7 @@ func TestBlobSumService(t *testing.T) {
8586
}
8687

8788
// Test GetDiffID on a nonexistent entry
88-
_, err = blobSumService.GetDiffID(digest.Digest("sha256:82379823067823853223359023576437723560923756b03560378f4497753917"))
89+
_, err = blobSumService.GetDiffID(BlobSum{Digest: digest.Digest("sha256:82379823067823853223359023576437723560923756b03560378f4497753917")})
8990
if err == nil {
9091
t.Fatal("expected error looking up nonexistent entry")
9192
}
@@ -103,3 +104,12 @@ func TestBlobSumService(t *testing.T) {
103104
t.Fatal("GetDiffID returned incorrect diffID")
104105
}
105106
}
107+
108+
func randomDigest() digest.Digest {
109+
b := [32]byte{}
110+
for i := 0; i < len(b); i++ {
111+
b[i] = byte(rand.Intn(256))
112+
}
113+
d := hex.EncodeToString(b[:])
114+
return digest.Digest("sha256:" + d)
115+
}

distribution/metadata/metadata.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type Store interface {
1515
Get(namespace string, key string) ([]byte, error)
1616
// Set writes data indexed by namespace and key.
1717
Set(namespace, key string, value []byte) error
18+
// Delete removes data indexed by namespace and key.
19+
Delete(namespace, key string) error
1820
}
1921

2022
// FSMetadataStore uses the filesystem to associate metadata with layer and
@@ -63,3 +65,13 @@ func (store *FSMetadataStore) Set(namespace, key string, value []byte) error {
6365
}
6466
return os.Rename(tempFilePath, path)
6567
}
68+
69+
// Delete removes data indexed by namespace and key. The data file named after
70+
// the key, stored in the namespace's directory is deleted.
71+
func (store *FSMetadataStore) Delete(namespace, key string) error {
72+
store.Lock()
73+
defer store.Unlock()
74+
75+
path := store.path(namespace, key)
76+
return os.Remove(path)
77+
}

distribution/pull_v2.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e
111111

112112
type v2LayerDescriptor struct {
113113
digest digest.Digest
114+
repoInfo *registry.RepositoryInfo
114115
repo distribution.Repository
115116
blobSumService *metadata.BlobSumService
116117
}
@@ -124,7 +125,7 @@ func (ld *v2LayerDescriptor) ID() string {
124125
}
125126

126127
func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
127-
return ld.blobSumService.GetDiffID(ld.digest)
128+
return ld.blobSumService.GetDiffID(metadata.BlobSum{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()})
128129
}
129130

130131
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
@@ -196,7 +197,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
196197

197198
func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
198199
// Cache mapping from this layer's DiffID to the blobsum
199-
ld.blobSumService.Add(diffID, ld.digest)
200+
ld.blobSumService.Add(diffID, metadata.BlobSum{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()})
200201
}
201202

202203
func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdated bool, err error) {
@@ -334,6 +335,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Named, unverif
334335

335336
layerDescriptor := &v2LayerDescriptor{
336337
digest: blobSum,
338+
repoInfo: p.repoInfo,
337339
repo: p.repo,
338340
blobSumService: p.blobSumService,
339341
}
@@ -400,6 +402,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
400402
layerDescriptor := &v2LayerDescriptor{
401403
digest: d.Digest,
402404
repo: p.repo,
405+
repoInfo: p.repoInfo,
403406
blobSumService: p.blobSumService,
404407
}
405408

distribution/push_v2.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima
131131

132132
descriptorTemplate := v2PushDescriptor{
133133
blobSumService: p.blobSumService,
134+
repoInfo: p.repoInfo,
134135
repo: p.repo,
135136
pushState: &p.pushState,
136137
}
@@ -211,6 +212,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
211212
type v2PushDescriptor struct {
212213
layer layer.Layer
213214
blobSumService *metadata.BlobSumService
215+
repoInfo reference.Named
214216
repo distribution.Repository
215217
pushState *pushState
216218
}
@@ -243,7 +245,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
243245
// Do we have any blobsums associated with this layer's DiffID?
244246
possibleBlobsums, err := pd.blobSumService.GetBlobSums(diffID)
245247
if err == nil {
246-
descriptor, exists, err := blobSumAlreadyExists(ctx, possibleBlobsums, pd.repo, pd.pushState)
248+
descriptor, exists, err := blobSumAlreadyExists(ctx, possibleBlobsums, pd.repoInfo, pd.repo, pd.pushState)
247249
if err != nil {
248250
progress.Update(progressOutput, pd.ID(), "Image push failed")
249251
return retryOnError(err)
@@ -263,6 +265,37 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
263265
// then push the blob.
264266
bs := pd.repo.Blobs(ctx)
265267

268+
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
269+
for _, blobsum := range possibleBlobsums {
270+
sourceRepo, err := reference.ParseNamed(blobsum.SourceRepository)
271+
if err != nil {
272+
continue
273+
}
274+
if pd.repoInfo.Hostname() == sourceRepo.Hostname() {
275+
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, blobsum.Digest, sourceRepo.FullName())
276+
277+
desc, err := bs.Mount(ctx, sourceRepo.RemoteName(), blobsum.Digest)
278+
if err == nil {
279+
progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", sourceRepo.RemoteName())
280+
281+
pd.pushState.Lock()
282+
pd.pushState.confirmedV2 = true
283+
pd.pushState.remoteLayers[diffID] = desc
284+
pd.pushState.Unlock()
285+
286+
// Cache mapping from this layer's DiffID to the blobsum
287+
if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: blobsum.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
288+
return xfer.DoNotRetry{Err: err}
289+
}
290+
291+
return nil
292+
}
293+
// Unable to mount layer from this repository, so this source mapping is no longer valid
294+
logrus.Debugf("unassociating layer %s (%s) with %s", diffID, blobsum.Digest, sourceRepo.FullName())
295+
pd.blobSumService.Remove(blobsum)
296+
}
297+
}
298+
266299
// Send the layer
267300
layerUpload, err := bs.Create(ctx)
268301
if err != nil {
@@ -300,7 +333,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
300333
progress.Update(progressOutput, pd.ID(), "Pushed")
301334

302335
// Cache mapping from this layer's DiffID to the blobsum
303-
if err := pd.blobSumService.Add(diffID, pushDigest); err != nil {
336+
if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
304337
return xfer.DoNotRetry{Err: err}
305338
}
306339

@@ -332,9 +365,13 @@ func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
332365
// blobSumAlreadyExists checks if the registry already know about any of the
333366
// blobsums passed in the "blobsums" slice. If it finds one that the registry
334367
// knows about, it returns the known digest and "true".
335-
func blobSumAlreadyExists(ctx context.Context, blobsums []digest.Digest, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
336-
for _, dgst := range blobsums {
337-
descriptor, err := repo.Blobs(ctx).Stat(ctx, dgst)
368+
func blobSumAlreadyExists(ctx context.Context, blobsums []metadata.BlobSum, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
369+
for _, blobSum := range blobsums {
370+
// Only check blobsums that are known to this repository or have an unknown source
371+
if blobSum.SourceRepository != "" && blobSum.SourceRepository != repoInfo.FullName() {
372+
continue
373+
}
374+
descriptor, err := repo.Blobs(ctx).Stat(ctx, blobSum.Digest)
338375
switch err {
339376
case nil:
340377
descriptor.MediaType = schema2.MediaTypeLayer

hack/vendor.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ clone git github.com/boltdb/bolt v1.1.0
4444
clone git github.com/miekg/dns d27455715200c7d3e321a1e5cadb27c9ee0b0f02
4545

4646
# get graph and distribution packages
47-
clone git github.com/docker/distribution a7ae88da459b98b481a245e5b1750134724ac67d
47+
clone git github.com/docker/distribution 93d9070c8bb28414de9ec96fd38c89614acd8435
4848
clone git github.com/vbatts/tar-split v0.9.11
4949

5050
# get desired notary commit, might also need to be updated in Dockerfile

0 commit comments

Comments
 (0)