Skip to content

Commit

Permalink
progress bars: use github.com/vbauerster/mpb
Browse files Browse the repository at this point in the history
Use the github.com/vbauerster/mpb library for displaying the progress
bars.  It is actively maintained, is designed for multi-bar usage and
just gets the job done.

This will fix issues with single bars being displayed multiple times.
Note that updating the bar's prefix does not work anymore.  We're now
logging those entries, for instance, when we skip a blob as it's
already present.

The output now looks as follows (e.g., when pulling jboss/wildfly):

```
Getting image source signatures
Copying blob aeb7866da422 [======================================] 71.2 MiB / 71.2 MiB
Copying blob 157601a0b538 [======================================] 10.3 MiB / 10.3 MiB
Copying blob 642f4164f381 [======================================] 1.9 KiB / 1.9 KiB
Copying blob bda512e97517 [======================================] 74.6 MiB / 74.6 MiB
Copying blob 4cccaafdae21 [======================================] 170.9 MiB / 170.9 MiB
Copying config 2602b48525 [======================================] 6.5KiB / 6.5KiB
Writing manifest to image destination
Storing signatures
```

Note that this change requires the following dependencies:
 * github.com/vbauerster/mpb v3.3.4
 * github.com/mattn/go-isatty v0.0.4
 * github.com/VividCortex/ewma v1.1.1

Once we migrate to using Go modules, we can start using mpb v4.x.

Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
  • Loading branch information
vrothberg committed Feb 14, 2019
1 parent 93bced0 commit e8eedb8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 48 deletions.
92 changes: 45 additions & 47 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"
"golang.org/x/crypto/ssh/terminal"
"golang.org/x/sync/semaphore"
pb "gopkg.in/cheggaaa/pb.v1"
)

type digestingReader struct {
Expand Down Expand Up @@ -90,6 +91,8 @@ type copier struct {
reportWriter io.Writer
progressOutput io.Writer
progressInterval time.Duration
progressPool *mpb.Progress
progressWG *sync.WaitGroup
progress chan types.ProgressProperties
blobInfoCache types.BlobInfoCache
copyInParallel bool
Expand Down Expand Up @@ -165,12 +168,15 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
progressOutput = ioutil.Discard
}
copyInParallel := dest.HasThreadSafePutBlob() && rawSource.HasThreadSafeGetBlob()
wg := new(sync.WaitGroup)
c := &copier{
dest: dest,
rawSource: rawSource,
reportWriter: reportWriter,
progressOutput: progressOutput,
progressInterval: options.ProgressInterval,
progressPool: mpb.New(mpb.WithWidth(40), mpb.WithOutput(progressOutput), mpb.WithWaitGroup(wg)),
progressWG: wg,
progress: options.Progress,
copyInParallel: copyInParallel,
// FIXME? The cache is used for sources and destinations equally, but we only have a SourceCtx and DestinationCtx.
Expand Down Expand Up @@ -427,22 +433,6 @@ func shortDigest(d digest.Digest) string {
return d.Encoded()[:12]
}

// createProgressBar creates a pb.ProgressBar. Note that if the copier's
// reportWriter is ioutil.Discard, the progress bar's output will be discarded
// and a single line will be printed instead.
func (c *copier) createProgressBar(srcInfo types.BlobInfo, kind string) *pb.ProgressBar {
bar := pb.New(int(srcInfo.Size)).SetUnits(pb.U_BYTES)
bar.SetMaxWidth(80)
bar.ShowTimeLeft = false
bar.ShowPercent = false
bar.Prefix(fmt.Sprintf("Copying %s %s:", kind, shortDigest(srcInfo.Digest)))
bar.Output = c.progressOutput
if bar.Output == ioutil.Discard {
c.Printf("Copying %s %s\n", kind, srcInfo.Digest)
}
return bar
}

// isTTY returns true if the io.Writer is a file and a tty.
func isTTY(w io.Writer) bool {
if f, ok := w.(*os.File); ok {
Expand Down Expand Up @@ -477,6 +467,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
// copyGroup is used to determine if all layers are copied
copyGroup := sync.WaitGroup{}
copyGroup.Add(numLayers)

// copySemaphore is used to limit the number of parallel downloads to
// avoid malicious images causing troubles and to be nice to servers.
var copySemaphore *semaphore.Weighted
Expand All @@ -487,8 +478,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, bar *pb.ProgressBar) {
defer bar.Finish()
copyLayerHelper := func(index int, srcLayer types.BlobInfo, bar *mpb.Bar) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
Expand All @@ -498,32 +488,22 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
// does not support them.
if ic.diffIDsAreNeeded {
cld.err = errors.New("getting DiffID for foreign layers is unimplemented")
bar.Prefix(fmt.Sprintf("Skipping blob %s (DiffID foreign layer unimplemented):", shortDigest(srcLayer.Digest)))
bar.Finish()
bar.SetTotal(srcLayer.Size, true)
} else {
cld.destInfo = srcLayer
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
bar.Prefix(fmt.Sprintf("Skipping blob %s (foreign layer):", shortDigest(srcLayer.Digest)))
bar.Add64(bar.Total)
bar.Finish()
bar.SetTotal(srcLayer.Size, true)
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, bar)
bar.SetTotal(srcLayer.Size, true)
}
data[index] = cld
}

progressBars := make([]*pb.ProgressBar, numLayers)
progressBars := make([]*mpb.Bar, numLayers)
for i, srcInfo := range srcInfos {
bar := ic.c.createProgressBar(srcInfo, "blob")
progressBars[i] = bar
}

progressPool := pb.NewPool(progressBars...)
progressPool.Output = ic.c.progressOutput

if err := progressPool.Start(); err != nil {
return errors.Wrapf(err, "error creating progress-bar pool")
progressBars[i] = ic.c.createProgressBar(srcInfo, shortDigest(srcInfo.Digest), "blob")
}

for i, srcLayer := range srcInfos {
Expand All @@ -534,8 +514,8 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)

// Wait for all layers to be copied
copyGroup.Wait()
progressPool.Stop()

for i, cld := range data {
if cld.err != nil {
Expand Down Expand Up @@ -600,13 +580,31 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte
return nil, err
}

ic.c.progressPool.Wait()
ic.c.Printf("Writing manifest to image destination\n")
if err := ic.c.dest.PutManifest(ctx, manifest); err != nil {
return nil, errors.Wrap(err, "Error writing manifest")
}
return manifest, nil
}

// createProgressBar creates a mpb.Bar. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(info types.BlobInfo, name, kind string) *mpb.Bar {
bar := c.progressPool.AddBar(info.Size,
mpb.PrependDecorators(
decor.Name(fmt.Sprintf("Copying %s %s", kind, name)),
),
mpb.AppendDecorators(
decor.CountersKibiByte("%.1f / %.1f"),
),
)
if c.progressOutput == ioutil.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
return bar
}

// copyConfig copies config.json, if any, from src to dest.
func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
srcInfo := src.ConfigInfo()
Expand All @@ -615,10 +613,11 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
if err != nil {
return errors.Wrapf(err, "Error reading config blob %s", srcInfo.Digest)
}
bar := c.createProgressBar(srcInfo, "config")
defer bar.Finish()
bar.Start()

// make the short digest only 10 characters long to make it align with the blob output
bar := c.createProgressBar(srcInfo, shortDigest(srcInfo.Digest)[0:10], "config")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
defer bar.SetTotal(srcInfo.Size, true)
if err != nil {
return err
}
Expand All @@ -638,7 +637,7 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, bar *pb.ProgressBar) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, bar *mpb.Bar) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

Expand All @@ -649,9 +648,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, ba
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
bar.Prefix(fmt.Sprintf("Skipping blob %s (already present):", shortDigest(srcInfo.Digest)))
bar.Add64(bar.Total)
bar.Finish()
logrus.Debugf("Skipping blob %s (already present):", shortDigest(srcInfo.Digest))
return blobInfo, cachedDiffID, nil
}
}
Expand All @@ -663,8 +660,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, ba
}
defer srcStream.Close()

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize},
diffIDIsNeeded, bar)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand Down Expand Up @@ -692,7 +688,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, ba
// perhaps compressing the stream if canCompress,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, bar *pb.ProgressBar) (types.BlobInfo, <-chan diffIDResult, error) {
diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand Down Expand Up @@ -753,7 +749,9 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, bar *pb.ProgressBar) (types.BlobInfo, error) {
canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) {
c.progressWG.Add(1)
defer c.progressWG.Done()
// The copying happens through a pipeline of connected io.Readers.
// === Input: srcStream

Expand All @@ -776,7 +774,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
isCompressed := decompressor != nil
destStream = bar.NewProxyReader(destStream)
destStream = bar.ProxyReader(destStream)

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
Expand Down
4 changes: 3 additions & 1 deletion vendor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ golang.org/x/crypto 453249f01cfeb54c3d549ddb75ff152ca243f9d8
golang.org/x/net 6b27048ae5e6ad1ef927e72e437531493de612fe
golang.org/x/sync 42b317875d0fa942474b76e1b46a6060d720ae6e
golang.org/x/sys 43e60d72a8e2bd92ee98319ba9a384a0e9837c08
gopkg.in/cheggaaa/pb.v1 v1.0.27
gopkg.in/yaml.v2 a3f3340b5840cee44f372bddb5880fcbc419b46a
k8s.io/client-go bcde30fb7eaed76fd98a36b4120321b94995ffb6
github.com/xeipuuv/gojsonschema master
Expand All @@ -48,3 +47,6 @@ github.com/boltdb/bolt master
github.com/klauspost/pgzip v1.2.1
github.com/klauspost/compress v1.4.1
github.com/klauspost/cpuid v1.2.0
github.com/vbauerster/mpb v3.3.4
github.com/mattn/go-isatty v0.0.4
github.com/VividCortex/ewma v1.1.1

0 comments on commit e8eedb8

Please sign in to comment.