Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/compression: new zstd variant zstd:chunked #1084

Merged
merged 12 commits into from
Jul 7, 2021
147 changes: 121 additions & 26 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type copier struct {
ociEncryptConfig *encconfig.EncryptConfig
maxParallelDownloads uint
downloadForeignLayers bool
fetchPartialBlobs bool
}

// imageCopier tracks state specific to a single image (possibly an item of a manifest list)
Expand Down Expand Up @@ -194,15 +195,21 @@ type Options struct {
// OciDecryptConfig contains the config that can be used to decrypt an image if it is
// encrypted if non-nil. If nil, it does not attempt to decrypt an image.
OciDecryptConfig *encconfig.DecryptConfig

// MaxParallelDownloads indicates the maximum layers to pull at the same time. A reasonable default is used if this is left as 0.
MaxParallelDownloads uint

// When OptimizeDestinationImageAlreadyExists is set, optimize the copy assuming that the destination image already
// exists (and is equivalent). Making the eventual (no-op) copy more performant for this case. Enabling the option
// is slightly pessimistic if the destination image doesn't exist, or is not equivalent.
OptimizeDestinationImageAlreadyExists bool

// Download layer contents with "nondistributable" media types ("foreign" layers) and translate the layer media type
// to not indicate "nondistributable".
DownloadForeignLayers bool

// FetchPartialBlobs indicates whether to attempt to fetch the blob partially. Experimental.
FetchPartialBlobs bool
}

// validateImageListSelection returns an error if the passed-in value is not one that we recognize as a valid ImageListSelection value
Expand Down Expand Up @@ -283,6 +290,7 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
ociEncryptConfig: options.OciEncryptConfig,
maxParallelDownloads: options.MaxParallelDownloads,
downloadForeignLayers: options.DownloadForeignLayers,
fetchPartialBlobs: options.FetchPartialBlobs,
}
// Default to using gzip compression unless specified otherwise.
if options.DestinationCtx == nil || options.DestinationCtx.CompressionFormat == nil {
Expand Down Expand Up @@ -1072,9 +1080,25 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
}
}

// customPartialBlobCounter provides a decorator function for the partial blobs retrieval progress bar
func customPartialBlobCounter(filler interface{}, wcc ...decor.WC) decor.Decorator {
producer := func(filler interface{}) decor.DecorFunc {
return func(s decor.Statistics) string {
if s.Total == 0 {
pairFmt := "%.1f / %.1f (skipped: %.1f)"
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill))
}
pairFmt := "%.1f / %.1f (skipped: %.1f = %.2f%%)"
percentage := 100.0 * float64(s.Refill) / float64(s.Total)
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill), percentage)
}
}
return decor.Any(producer(filler), wcc...)
}

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

Expand All @@ -1091,18 +1115,30 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind
// Use a normal progress bar when we know the size (i.e., size > 0).
// Otherwise, use a spinner to indicate that something's happening.
var bar *mpb.Bar
sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft()
if info.Size > 0 {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
if partial {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
customPartialBlobCounter(sstyle.Build()),
),
)
} else {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
}
} else {
sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft()
bar = pool.Add(0,
sstyle.Build(),
mpb.BarFillerClearOnComplete(),
Expand All @@ -1129,7 +1165,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
bar := c.createProgressBar(progressPool, false, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1, false)
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -1217,7 +1253,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)

// Throw an event that the layer has been skipped
Expand All @@ -1244,14 +1280,57 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
}

// A partial pull is managed by the destination storage, that decides what portions
// of the source file are not known yet and must be fetched.
// Attempt a partial only when the source allows to retrieve a blob partially and
// the destination has support for it.
imgSource, okSource := ic.c.rawSource.(internalTypes.ImageSourceSeekable)
giuseppe marked this conversation as resolved.
Show resolved Hide resolved
imgDest, okDest := ic.c.dest.(internalTypes.ImageDestinationPartial)
if ic.c.fetchPartialBlobs && okSource && okDest && !diffIDIsNeeded {
giuseppe marked this conversation as resolved.
Show resolved Hide resolved
bar := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done")

progress := make(chan int64)
terminate := make(chan interface{})

defer close(terminate)
defer close(progress)

proxy := imageSourceSeekableProxy{
source: imgSource,
progress: progress,
}
go func() {
for {
select {
case written := <-progress:
bar.IncrInt64(written)
case <-terminate:
return
}
}
}()

bar.SetTotal(srcInfo.Size, false)
info, err := imgDest.PutBlobPartial(ctx, proxy, srcInfo, ic.c.blobInfoCache)
if err == nil {
bar.SetRefill(srcInfo.Size - bar.Current())
bar.SetCurrent(srcInfo.Size)
bar.SetTotal(srcInfo.Size, true)
logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest)
return info, cachedDiffID, nil
}
bar.Abort(true)
logrus.Errorf("Failed to retrieve partial blob: %v", err)
}

// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "reading blob %s", srcInfo.Digest)
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "done")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer)
if err != nil {
Expand Down Expand Up @@ -1425,6 +1504,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
originalLayerReader = destStream
}

compressionMetadata := map[string]string{}
// === Deal with layer compression/decompression if necessary
// WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists
// short-circuit conditions
Expand Down Expand Up @@ -1453,7 +1533,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we don’t care.
go c.compressGoroutine(pipeWriter, destStream, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, destStream, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
inputInfo.Size = -1
Expand All @@ -1473,7 +1553,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

go c.compressGoroutine(pipeWriter, s, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, s, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter

destStream = pipeReader
inputInfo.Digest = ""
Expand Down Expand Up @@ -1640,23 +1720,38 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, srcCompressorName)
}
}

// Copy all the metadata generated by the compressor into the annotations.
if uploadedInfo.Annotations == nil {
giuseppe marked this conversation as resolved.
Show resolved Hide resolved
uploadedInfo.Annotations = map[string]string{}
}
for k, v := range compressionMetadata {
uploadedInfo.Annotations[k] = v
}

return uploadedInfo, nil
}

// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, compressionFormat compression.Algorithm) {
err := errors.New("Internal error: unexpected panic in compressGoroutine")
defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
_ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}()

compressor, err := compression.CompressStream(dest, compressionFormat, c.compressionLevel)
// doCompression reads all input from src and writes its compressed equivalent to dest.
func doCompression(dest io.Writer, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm, compressionLevel *int) error {
compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, compressionLevel)
if err != nil {
return
return err
}
defer compressor.Close()

buf := make([]byte, compressionBufferSize)

_, err = io.CopyBuffer(compressor, src, buf) // Sets err to nil, i.e. causes dest.Close()
if err != nil {
compressor.Close()
return err
}

return compressor.Close()
}

// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm) {
err := doCompression(dest, src, metadata, compressionFormat, c.compressionLevel)
_ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}
25 changes: 25 additions & 0 deletions copy/progress_reader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package copy

import (
"context"
"io"
"time"

internalTypes "github.com/containers/image/v5/internal/types"
"github.com/containers/image/v5/types"
)

Expand Down Expand Up @@ -77,3 +79,26 @@ func (r *progressReader) Read(p []byte) (int, error) {
}
return n, err
}

// imageSourceSeekableProxy wraps ImageSourceSeekable and keeps track of how many bytes
// are received.
type imageSourceSeekableProxy struct {
// source is the seekable input to read from.
source internalTypes.ImageSourceSeekable
// progress is the chan where the total number of bytes read so far are reported.
progress chan int64
}

// GetBlobAt reads from the ImageSourceSeekable and report how many bytes were received
// to the progress chan.
func (s imageSourceSeekableProxy) GetBlobAt(ctx context.Context, bInfo types.BlobInfo, chunks []internalTypes.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
rc, errs, err := s.source.GetBlobAt(ctx, bInfo, chunks)
if err == nil {
total := int64(0)
for _, c := range chunks {
total += int64(c.Length)
}
s.progress <- total
}
return rc, errs, err
}
Loading