-
Notifications
You must be signed in to change notification settings - Fork 0
/
push_v2.go
727 lines (629 loc) · 24.5 KB
/
push_v2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
package distribution // import "github.com/docker/docker/distribution"
import (
"context"
"fmt"
"io"
"os"
"runtime"
"sort"
"strings"
"sync"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/client"
apitypes "github.com/docker/docker/api/types"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
smallLayerMaximumSize = 100 * (1 << 10) // 100KB
middleLayerMaximumSize = 10 * (1 << 20) // 10MB
)
type v2Pusher struct {
v2MetadataService metadata.V2MetadataService
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
repo distribution.Repository
// pushState is state built by the Upload functions.
pushState pushState
}
type pushState struct {
sync.Mutex
// remoteLayers is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers. It is also used to fill in digest and size
// information when building the manifest.
remoteLayers map[layer.DiffID]distribution.Descriptor
// confirmedV2 is set to true if we confirm we're talking to a v2
// registry. This is used to limit fallbacks to the v1 protocol.
confirmedV2 bool
hasAuthInfo bool
}
func (p *v2Pusher) Push(ctx context.Context) (err error) {
p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
p.pushState.hasAuthInfo = p.config.AuthConfig.RegistryToken != "" || (p.config.AuthConfig.Username != "" && p.config.AuthConfig.Password != "")
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return err
}
if err = p.pushV2Repository(ctx); err != nil {
if continueOnError(err, p.endpoint.Mirror) {
return fallbackError{
err: err,
confirmedV2: p.pushState.confirmedV2,
transportOK: true,
}
}
}
return err
}
func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
imageID, err := p.config.ReferenceStore.Get(p.ref)
if err != nil {
return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref))
}
return p.pushV2Tag(ctx, namedTagged, imageID)
}
if !reference.IsNameOnly(p.ref) {
return errors.New("cannot push a digest reference")
}
// Push all tags
pushed := 0
for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
pushed++
if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil {
return err
}
}
}
if pushed == 0 {
return fmt.Errorf("no tags to push for %s", reference.FamiliarName(p.repoInfo.Name))
}
return nil
}
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
imgConfig, err := p.config.ImageStore.Get(ctx, id)
if err != nil {
return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
}
rootfs, err := p.config.ImageStore.RootFSFromConfig(imgConfig)
if err != nil {
return fmt.Errorf("unable to get rootfs for image %s: %s", reference.FamiliarString(ref), err)
}
platform, err := p.config.ImageStore.PlatformFromConfig(imgConfig)
if err != nil {
return fmt.Errorf("unable to get platform for image %s: %s", reference.FamiliarString(ref), err)
}
l, err := p.config.LayerStores[platform.OS].Get(rootfs.ChainID())
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err)
}
defer l.Release()
hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
if err != nil {
return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
}
var descriptors []xfer.UploadDescriptor
descriptorTemplate := v2PushDescriptor{
v2MetadataService: p.v2MetadataService,
hmacKey: hmacKey,
repoInfo: p.repoInfo.Name,
ref: p.ref,
endpoint: p.endpoint,
repo: p.repo,
pushState: &p.pushState,
}
// Loop bounds condition is to avoid pushing the base layer on Windows.
for range rootfs.DiffIDs {
descriptor := descriptorTemplate
descriptor.layer = l
descriptor.checkedDigests = make(map[digest.Digest]struct{})
descriptors = append(descriptors, &descriptor)
l = l.Parent()
}
if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
return err
}
// Try schema2 first
builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig)
manifest, err := manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
}
manSvc, err := p.repo.Manifests(ctx)
if err != nil {
return err
}
putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
if runtime.GOOS == "windows" || p.config.TrustKey == nil || p.config.RequireSchema2 {
logrus.Warnf("failed to upload schema2 manifest: %v", err)
return err
}
// This is a temporary environment variables used in CI to allow pushing
// manifest v2 schema 1 images to test-registries used for testing *pulling*
// these images.
if os.Getenv("DOCKER_ALLOW_SCHEMA1_PUSH_DONOTUSE") == "" {
if err.Error() == "tag invalid" {
msg := "[DEPRECATED] support for pushing manifest v2 schema1 images has been removed. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/"
logrus.WithError(err).Error(msg)
return errors.Wrap(err, msg)
}
return err
}
logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
// Note: this fallback is deprecated, see log messages below
manifestRef, err := reference.WithTag(p.repo.Named(), ref.Tag())
if err != nil {
return err
}
builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, imgConfig)
manifest, err = manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
}
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
return err
}
// schema2 failed but schema1 succeeded
msg := fmt.Sprintf("[DEPRECATION NOTICE] support for pushing manifest v2 schema1 images will be removed in an upcoming release. Please contact admins of the %s registry NOW to avoid future disruption. More information at https://docs.docker.com/registry/spec/deprecated-schema-v1/", reference.Domain(ref))
logrus.Warn(msg)
progress.Message(p.config.ProgressOutput, "", msg)
}
var canonicalManifest []byte
switch v := manifest.(type) {
case *schema1.SignedManifest:
canonicalManifest = v.Canonical
case *schema2.DeserializedManifest:
_, canonicalManifest, err = v.Payload()
if err != nil {
return err
}
}
manifestDigest := digest.FromBytes(canonicalManifest)
progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
return err
}
// Signal digest to the trust client so it can sign the
// push, if appropriate.
progress.Aux(p.config.ProgressOutput, apitypes.PushResult{Tag: ref.Tag(), Digest: manifestDigest.String(), Size: len(canonicalManifest)})
return nil
}
func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
// descriptors is in reverse order; iterate backwards to get references
// appended in the right order.
for i := len(descriptors) - 1; i >= 0; i-- {
if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
return nil, err
}
}
return builder.Build(ctx)
}
type v2PushDescriptor struct {
layer PushLayer
v2MetadataService metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
ref reference.Named
endpoint registry.APIEndpoint
repo distribution.Repository
pushState *pushState
remoteDescriptor distribution.Descriptor
// a set of digests whose presence has been checked in a target repository
checkedDigests map[digest.Digest]struct{}
}
func (pd *v2PushDescriptor) Key() string {
return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String()
}
func (pd *v2PushDescriptor) ID() string {
return stringid.TruncateID(pd.layer.DiffID().String())
}
func (pd *v2PushDescriptor) DiffID() layer.DiffID {
return pd.layer.DiffID()
}
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
// Skip foreign layers unless this registry allows nondistributable artifacts.
if !pd.endpoint.AllowNondistributableArtifacts {
if fs, ok := pd.layer.(distribution.Describable); ok {
if d := fs.Descriptor(); len(d.URLs) > 0 {
progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
return d, nil
}
}
}
diffID := pd.DiffID()
pd.pushState.Lock()
if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
// it is already known that the push is not needed and
// therefore doing a stat is unnecessary
pd.pushState.Unlock()
progress.Update(progressOutput, pd.ID(), "Layer already exists")
return descriptor, nil
}
pd.pushState.Unlock()
maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
// Do we have any metadata associated with this layer's DiffID?
v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
if err == nil {
// check for blob existence in the target repository
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata)
if exists || err != nil {
return descriptor, err
}
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then push the blob.
bs := pd.repo.Blobs(ctx)
var layerUpload distribution.BlobWriter
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
isUnauthorizedError := false
for _, mountCandidate := range candidates {
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
createOpts := []distribution.BlobCreateOption{}
if len(mountCandidate.SourceRepository) > 0 {
namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository)
if err != nil {
logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err)
pd.v2MetadataService.Remove(mountCandidate)
continue
}
// Candidates are always under same domain, create remote reference
// with only path to set mount from with
remoteRef, err := reference.WithName(reference.Path(namedRef))
if err != nil {
logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err)
continue
}
canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest)
if err != nil {
logrus.Errorf("failed to make canonical reference: %v", err)
continue
}
createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
}
// send the layer
lu, err := bs.Create(ctx, createOpts...)
switch err := err.(type) {
case nil:
// noop
case distribution.ErrBlobMounted:
progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
err.Descriptor.MediaType = schema2.MediaTypeLayer
pd.pushState.Lock()
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = err.Descriptor
pd.pushState.Unlock()
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: err.Descriptor.Digest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
return err.Descriptor, nil
case errcode.Errors:
for _, e := range err {
switch e := e.(type) {
case errcode.Error:
if e.Code == errcode.ErrorCodeUnauthorized {
// when unauthorized error that indicate user don't has right to push layer to register
logrus.Debugln("failed to push layer to registry because unauthorized error")
isUnauthorizedError = true
}
default:
}
}
default:
logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
}
// when error is unauthorizedError and user don't hasAuthInfo that's the case user don't has right to push layer to register
// and he hasn't login either, in this case candidate cache should be removed
if len(mountCandidate.SourceRepository) > 0 &&
!(isUnauthorizedError && !pd.pushState.hasAuthInfo) &&
(metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
len(mountCandidate.HMAC) == 0) {
cause := "blob mount failure"
if err != nil {
cause = fmt.Sprintf("an error: %v", err.Error())
}
logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
pd.v2MetadataService.Remove(mountCandidate)
}
if lu != nil {
// cancel previous upload
cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
layerUpload = lu
}
}
if maxExistenceChecks-len(pd.checkedDigests) > 0 {
// do additional layer existence checks with other known digests if any
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
if exists || err != nil {
return descriptor, err
}
}
logrus.Debugf("Pushing layer: %s", diffID)
if layerUpload == nil {
layerUpload, err = bs.Create(ctx)
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
}
defer layerUpload.Close()
// upload the blob
return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
}
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
return pd.remoteDescriptor
}
func (pd *v2PushDescriptor) uploadUsingSession(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
layerUpload distribution.BlobWriter,
) (distribution.Descriptor, error) {
var reader io.ReadCloser
contentReader, err := pd.layer.Open()
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
size, _ := pd.layer.Size()
reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, size, pd.ID(), "Pushing")
switch m := pd.layer.MediaType(); m {
case schema2.MediaTypeUncompressedLayer:
compressedReader, compressionDone := compress(reader)
defer func(closer io.Closer) {
closer.Close()
<-compressionDone
}(reader)
reader = compressedReader
case schema2.MediaTypeLayer:
default:
reader.Close()
return distribution.Descriptor{}, fmt.Errorf("unsupported layer media type %s", m)
}
digester := digest.Canonical.Digester()
tee := io.TeeReader(reader, digester.Hash())
nn, err := layerUpload.ReadFrom(tee)
reader.Close()
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
pushDigest := digester.Digest()
if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
progress.Update(progressOutput, pd.ID(), "Pushed")
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: pushDigest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
desc := distribution.Descriptor{
Digest: pushDigest,
MediaType: schema2.MediaTypeLayer,
Size: nn,
}
pd.pushState.Lock()
// If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = desc
pd.pushState.Unlock()
return desc, nil
}
// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
// (not just the target one).
func (pd *v2PushDescriptor) layerAlreadyExists(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
checkOtherRepositories bool,
maxExistenceCheckAttempts int,
v2Metadata []metadata.V2Metadata,
) (desc distribution.Descriptor, exists bool, err error) {
// filter the metadata
candidates := []metadata.V2Metadata{}
for _, meta := range v2Metadata {
if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.Name() {
continue
}
candidates = append(candidates, meta)
}
// sort the candidates by similarity
sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)
digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
// an array of unique blob digests ordered from the best mount candidates to worst
layerDigests := []digest.Digest{}
for i := 0; i < len(candidates); i++ {
if len(layerDigests) >= maxExistenceCheckAttempts {
break
}
meta := &candidates[i]
if _, exists := digestToMetadata[meta.Digest]; exists {
// keep reference just to the first mapping (the best mount candidate)
continue
}
if _, exists := pd.checkedDigests[meta.Digest]; exists {
// existence of this digest has already been tested
continue
}
digestToMetadata[meta.Digest] = meta
layerDigests = append(layerDigests, meta.Digest)
}
attempts:
for _, dgst := range layerDigests {
meta := digestToMetadata[dgst]
logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
pd.checkedDigests[meta.Digest] = struct{}{}
switch err {
case nil:
if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
// cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: desc.Digest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
}
}
desc.MediaType = schema2.MediaTypeLayer
exists = true
break attempts
case distribution.ErrBlobUnknown:
if meta.SourceRepository == pd.repoInfo.Name() {
// remove the mapping to the target repository
pd.v2MetadataService.Remove(*meta)
}
default:
logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
}
}
if exists {
progress.Update(progressOutput, pd.ID(), "Layer already exists")
pd.pushState.Lock()
pd.pushState.remoteLayers[diffID] = desc
pd.pushState.Unlock()
}
return desc, exists, nil
}
// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
// source repositories of target registry, maximum number of layer existence checks performed on the target
// repository and whether the check shall be done also with digests mapped to different repositories. The
// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
// of upload does not outweigh a latency.
func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
size, err := layer.Size()
switch {
// big blob
case size > middleLayerMaximumSize:
// 1st attempt to mount the blob few times
// 2nd few existence checks with digests associated to any repository
// then fallback to upload
return 4, 3, true
// middle sized blobs; if we could not get the size, assume we deal with middle sized blob
case size > smallLayerMaximumSize, err != nil:
// 1st attempt to mount blobs of average size few times
// 2nd try at most 1 existence check if there's an existing mapping to the target repository
// then fallback to upload
return 3, 1, false
// small blobs, do a minimum number of checks
default:
return 1, 1, false
}
}
// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
// array is sorted from youngest to oldest. If requireRegistryMatch is true, the resulting array will contain
// only metadata entries having registry part of SourceRepository matching the part of repoInfo.
func getRepositoryMountCandidates(
repoInfo reference.Named,
hmacKey []byte,
max int,
v2Metadata []metadata.V2Metadata,
) []metadata.V2Metadata {
candidates := []metadata.V2Metadata{}
for _, meta := range v2Metadata {
sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
if err != nil || reference.Domain(repoInfo) != reference.Domain(sourceRepo) {
continue
}
// target repository is not a viable candidate
if meta.SourceRepository == repoInfo.Name() {
continue
}
candidates = append(candidates, meta)
}
sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
if max >= 0 && len(candidates) > max {
// select the youngest metadata
candidates = candidates[:max]
}
return candidates
}
// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
// candidate "a" is preferred over "b":
//
// 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
// "b" was not
// 2. if a number of its repository path components exactly matching path components of target repository is higher
type byLikeness struct {
arr []metadata.V2Metadata
hmacKey []byte
pathComponents []string
}
func (bla byLikeness) Less(i, j int) bool {
aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
if aMacMatch != bMacMatch {
return aMacMatch
}
aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
return aMatch > bMatch
}
func (bla byLikeness) Swap(i, j int) {
bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
}
func (bla byLikeness) Len() int { return len(bla.arr) }
func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
// reverse the metadata array to shift the newest entries to the beginning
for i := 0; i < len(marr)/2; i++ {
marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
}
// keep equal entries ordered from the youngest to the oldest
sort.Stable(byLikeness{
arr: marr,
hmacKey: hmacKey,
pathComponents: getPathComponents(repoInfo.Name()),
})
}
// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
func numOfMatchingPathComponents(pth string, matchComponents []string) int {
pthComponents := getPathComponents(pth)
i := 0
for ; i < len(pthComponents) && i < len(matchComponents); i++ {
if matchComponents[i] != pthComponents[i] {
return i
}
}
return i
}
func getPathComponents(path string) []string {
return strings.Split(path, "/")
}
func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
if layerUpload != nil {
logrus.Debugf("cancelling upload of blob %s", dgst)
err := layerUpload.Cancel(ctx)
if err != nil {
logrus.Warnf("failed to cancel upload: %v", err)
}
}
}