-
Notifications
You must be signed in to change notification settings - Fork 147
/
copier.go
114 lines (95 loc) · 3.21 KB
/
copier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2021-Present The Zarf Authors
// Package oci contains functions for interacting with artifacts stored in OCI registries.
package oci
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/defenseunicorns/zarf/src/pkg/utils/helpers"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
// Copy copies an artifact from one OCI registry to another
func Copy(ctx context.Context, src *OrasRemote, dst *OrasRemote,
include func(d ocispec.Descriptor) bool, concurrency int, progressBar helpers.ProgressWriter) (err error) {
if progressBar == nil {
progressBar = helpers.DiscardProgressWriter{}
}
// create a new semaphore to limit concurrency
sem := semaphore.NewWeighted(int64(concurrency))
// fetch the source root manifest
srcRoot, err := src.FetchRoot(ctx)
if err != nil {
return err
}
layers := helpers.Filter(srcRoot.Layers, include)
layers = append(layers, srcRoot.Config)
start := time.Now()
for idx, layer := range layers {
b, err := json.MarshalIndent(layer, "", " ")
if err != nil {
src.log.Debug(fmt.Sprintf("ERROR marshalling json: %s", err.Error()))
}
src.log.Debug(fmt.Sprintf("Copying layer: %s", string(b)))
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
// check if the layer already exists in the destination
exists, err := dst.repo.Exists(ctx, layer)
if err != nil {
return err
}
if exists {
src.log.Debug("Layer already exists in destination, skipping")
b := make([]byte, layer.Size)
progressBar.Write(b)
progressBar.UpdateTitle(fmt.Sprintf("[%d/%d] layers copied", idx+1, len(layers)))
sem.Release(1)
continue
}
eg, ectx := errgroup.WithContext(ctx)
eg.SetLimit(2)
// fetch the layer from the source
rc, err := src.repo.Fetch(ectx, layer)
if err != nil {
return err
}
// create a new pipe so we can write to both the progressbar and the destination at the same time
pr, pw := io.Pipe()
// TeeReader gets the data from the fetching layer and writes it to the PipeWriter
tr := io.TeeReader(rc, pw)
// this goroutine is responsible for pushing the layer to the destination
eg.Go(func() error {
defer pw.Close()
// get data from the TeeReader and push it to the destination
// push the layer to the destination
err = dst.repo.Push(ectx, layer, tr)
if err != nil {
return fmt.Errorf("failed to push layer %s to %s: %w", layer.Digest, dst.repo.Reference, err)
}
return nil
})
// this goroutine is responsible for updating the progressbar
eg.Go(func() error {
// read from the PipeReader to the progressbar
if _, err := io.Copy(progressBar, pr); err != nil {
return fmt.Errorf("failed to update progress on layer %s: %w", layer.Digest, err)
}
return nil
})
// wait for the goroutines to finish
if err := eg.Wait(); err != nil {
return err
}
sem.Release(1)
progressBar.UpdateTitle(fmt.Sprintf("[%d/%d] layers copied", idx+1, len(layers)))
}
duration := time.Since(start)
src.log.Debug(fmt.Sprintf("Copied %s to %s with a concurrency of %d and took %s",
src.repo.Reference, dst.repo.Reference, concurrency, duration))
return nil
}