From 5b9542eae023ddef054955b526d1484b15470aca Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Fri, 8 Feb 2019 18:58:22 +0100 Subject: [PATCH] progress bars: use github.com/vbauerster/mpb Use the github.com/vbauerster/mpb library for displaying the progress bars. It is actively maintained, is designed for multi-bar usage and just gets the job done. This will fix issues with single bars being displayed multiple times. Note that updating the bar's prefix does not work anymore. We're now logging those entries, for instance, when we skip a blob as it's already present. The output now looks as follows (e.g., when pulling jboss/wildfly): ``` Getting image source signatures Copying blob aeb7866da422 [======================================] 71.2 MiB / 71.2 MiB Copying blob 157601a0b538 [======================================] 10.3 MiB / 10.3 MiB Copying blob 642f4164f381 [======================================] 1.9 KiB / 1.9 KiB Copying blob bda512e97517 [======================================] 74.6 MiB / 74.6 MiB Copying blob 4cccaafdae21 [======================================] 170.9 MiB / 170.9 MiB Copying config 2602b48525 [======================================] 6.5KiB / 6.5KiB Writing manifest to image destination Storing signatures ``` Note that this change requires the following dependencies: * github.com/vbauerster/mpb v3.3.4 * github.com/mattn/go-isatty v0.0.4 * github.com/VividCortex/ewma v1.1.1 Once we migrate to using Go modules, we can start using mpb v4.x. Signed-off-by: Valentin Rothberg --- copy/copy.go | 92 +++++++++++++++++++++++++--------------------------- vendor.conf | 4 ++- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index 2d3a2a1a8b..99017b4d84 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -25,9 +25,10 @@ import ( "github.com/opencontainers/go-digest" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/vbauerster/mpb" + "github.com/vbauerster/mpb/decor" "golang.org/x/crypto/ssh/terminal" "golang.org/x/sync/semaphore" - pb "gopkg.in/cheggaaa/pb.v1" ) type digestingReader struct { @@ -90,6 +91,8 @@ type copier struct { reportWriter io.Writer progressOutput io.Writer progressInterval time.Duration + progressPool *mpb.Progress + progressWG *sync.WaitGroup progress chan types.ProgressProperties blobInfoCache types.BlobInfoCache copyInParallel bool @@ -165,12 +168,15 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef, progressOutput = ioutil.Discard } copyInParallel := dest.HasThreadSafePutBlob() && rawSource.HasThreadSafeGetBlob() + wg := new(sync.WaitGroup) c := &copier{ dest: dest, rawSource: rawSource, reportWriter: reportWriter, progressOutput: progressOutput, progressInterval: options.ProgressInterval, + progressPool: mpb.New(mpb.WithWidth(40), mpb.WithOutput(progressOutput), mpb.WithWaitGroup(wg)), + progressWG: wg, progress: options.Progress, copyInParallel: copyInParallel, // FIXME? The cache is used for sources and destinations equally, but we only have a SourceCtx and DestinationCtx. @@ -427,22 +433,6 @@ func shortDigest(d digest.Digest) string { return d.Encoded()[:12] } -// createProgressBar creates a pb.ProgressBar. Note that if the copier's -// reportWriter is ioutil.Discard, the progress bar's output will be discarded -// and a single line will be printed instead. -func (c *copier) createProgressBar(srcInfo types.BlobInfo, kind string) *pb.ProgressBar { - bar := pb.New(int(srcInfo.Size)).SetUnits(pb.U_BYTES) - bar.SetMaxWidth(80) - bar.ShowTimeLeft = false - bar.ShowPercent = false - bar.Prefix(fmt.Sprintf("Copying %s %s:", kind, shortDigest(srcInfo.Digest))) - bar.Output = c.progressOutput - if bar.Output == ioutil.Discard { - c.Printf("Copying %s %s\n", kind, srcInfo.Digest) - } - return bar -} - // isTTY returns true if the io.Writer is a file and a tty. func isTTY(w io.Writer) bool { if f, ok := w.(*os.File); ok { @@ -477,6 +467,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { // copyGroup is used to determine if all layers are copied copyGroup := sync.WaitGroup{} copyGroup.Add(numLayers) + // copySemaphore is used to limit the number of parallel downloads to // avoid malicious images causing troubles and to be nice to servers. var copySemaphore *semaphore.Weighted @@ -487,8 +478,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { } data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, bar *pb.ProgressBar) { - defer bar.Finish() + copyLayerHelper := func(index int, srcLayer types.BlobInfo, bar *mpb.Bar) { defer copySemaphore.Release(1) defer copyGroup.Done() cld := copyLayerData{} @@ -498,14 +488,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { // does not support them. if ic.diffIDsAreNeeded { cld.err = errors.New("getting DiffID for foreign layers is unimplemented") - bar.Prefix(fmt.Sprintf("Skipping blob %s (DiffID foreign layer unimplemented):", shortDigest(srcLayer.Digest))) - bar.Finish() + bar.SetTotal(srcLayer.Size, true) } else { cld.destInfo = srcLayer logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) - bar.Prefix(fmt.Sprintf("Skipping blob %s (foreign layer):", shortDigest(srcLayer.Digest))) - bar.Add64(bar.Total) - bar.Finish() + bar.SetTotal(srcLayer.Size, true) } } else { cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, bar) @@ -513,17 +500,9 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { data[index] = cld } - progressBars := make([]*pb.ProgressBar, numLayers) + progressBars := make([]*mpb.Bar, numLayers) for i, srcInfo := range srcInfos { - bar := ic.c.createProgressBar(srcInfo, "blob") - progressBars[i] = bar - } - - progressPool := pb.NewPool(progressBars...) - progressPool.Output = ic.c.progressOutput - - if err := progressPool.Start(); err != nil { - return errors.Wrapf(err, "error creating progress-bar pool") + progressBars[i] = ic.c.createProgressBar(srcInfo, shortDigest(srcInfo.Digest), "blob") } for i, srcLayer := range srcInfos { @@ -534,8 +513,8 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { destInfos := make([]types.BlobInfo, numLayers) diffIDs := make([]digest.Digest, numLayers) + // Wait for all layers to be copied copyGroup.Wait() - progressPool.Stop() for i, cld := range data { if cld.err != nil { @@ -600,6 +579,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte return nil, err } + ic.c.progressPool.Wait() ic.c.Printf("Writing manifest to image destination\n") if err := ic.c.dest.PutManifest(ctx, manifest); err != nil { return nil, errors.Wrap(err, "Error writing manifest") @@ -607,6 +587,25 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte return manifest, nil } +// createProgressBar creates a mpb.Bar. Note that if the copier's reportWriter +// is ioutil.Discard, the progress bar's output will be discarded +func (c *copier) createProgressBar(info types.BlobInfo, name, kind string) *mpb.Bar { + bar := c.progressPool.AddBar(info.Size, + mpb.PrependDecorators( + decor.Name(fmt.Sprintf("Copying %s %s", kind, name)), + ), + mpb.AppendDecorators( + decor.CountersKibiByte("%.1f / %.1f"), + ), + ) + if c.progressOutput == ioutil.Discard { + c.Printf("Copying %s %s\n", kind, info.Digest) + } + // increment by 0 to start rendering the bars at creation + bar.IncrBy(0) + return bar +} + // copyConfig copies config.json, if any, from src to dest. func (c *copier) copyConfig(ctx context.Context, src types.Image) error { srcInfo := src.ConfigInfo() @@ -615,9 +614,9 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { if err != nil { return errors.Wrapf(err, "Error reading config blob %s", srcInfo.Digest) } - bar := c.createProgressBar(srcInfo, "config") - defer bar.Finish() - bar.Start() + + // make the short digest only 10 characters long to make it align with the blob output + bar := c.createProgressBar(srcInfo, shortDigest(srcInfo.Digest)[0:10], "config") destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) if err != nil { return err @@ -638,7 +637,7 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, bar *pb.ProgressBar) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, bar *mpb.Bar) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" @@ -649,9 +648,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, ba return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) } if reused { - bar.Prefix(fmt.Sprintf("Skipping blob %s (already present):", shortDigest(srcInfo.Digest))) - bar.Add64(bar.Total) - bar.Finish() + logrus.Debugf("Skipping blob %s (already present):", shortDigest(srcInfo.Digest)) return blobInfo, cachedDiffID, nil } } @@ -663,8 +660,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, ba } defer srcStream.Close() - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, - diffIDIsNeeded, bar) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err } @@ -692,7 +688,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, ba // perhaps compressing the stream if canCompress, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, bar *pb.ProgressBar) (types.BlobInfo, <-chan diffIDResult, error) { + diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -753,7 +749,9 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) // and returns a complete blobInfo of the copied blob. func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, - canModifyBlob bool, isConfig bool, bar *pb.ProgressBar) (types.BlobInfo, error) { + canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) { + c.progressWG.Add(1) + defer c.progressWG.Done() // The copying happens through a pipeline of connected io.Readers. // === Input: srcStream @@ -776,7 +774,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest) } isCompressed := decompressor != nil - destStream = bar.NewProxyReader(destStream) + destStream = bar.ProxyReader(destStream) // === Send a copy of the original, uncompressed, stream, to a separate path if necessary. var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so. diff --git a/vendor.conf b/vendor.conf index bbbb1a4584..1c5b6b3785 100644 --- a/vendor.conf +++ b/vendor.conf @@ -28,7 +28,6 @@ golang.org/x/crypto 453249f01cfeb54c3d549ddb75ff152ca243f9d8 golang.org/x/net 6b27048ae5e6ad1ef927e72e437531493de612fe golang.org/x/sync 42b317875d0fa942474b76e1b46a6060d720ae6e golang.org/x/sys 43e60d72a8e2bd92ee98319ba9a384a0e9837c08 -gopkg.in/cheggaaa/pb.v1 v1.0.27 gopkg.in/yaml.v2 a3f3340b5840cee44f372bddb5880fcbc419b46a k8s.io/client-go bcde30fb7eaed76fd98a36b4120321b94995ffb6 github.com/xeipuuv/gojsonschema master @@ -48,3 +47,6 @@ github.com/boltdb/bolt master github.com/klauspost/pgzip v1.2.1 github.com/klauspost/compress v1.4.1 github.com/klauspost/cpuid v1.2.0 +github.com/vbauerster/mpb v3.3.4 +github.com/mattn/go-isatty v0.0.4 +github.com/VividCortex/ewma v1.1.1