Skip to content

Commit

Permalink
Add support for zstd layer upload
Browse files Browse the repository at this point in the history
It is now possible to `crane append -f bla.tar.zstd` to upload Zstd layer, both from file and from pipe.

Before this commit, crane would erroneously upload such layer with gzip mime type.

While this PR does not _fully_ solve google#1501, it is very close to that.

Signed-off-by: Marat Radchenko <marat@slonopotamus.org>
  • Loading branch information
slonopotamus authored and ethan-gallant committed Feb 7, 2024
1 parent 8dadbe7 commit 97ff863
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 99 deletions.
6 changes: 5 additions & 1 deletion cmd/crane/cmd/flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ func flattenImage(old v1.Image, repo name.Repository, use string, o crane.Option
}

// TODO: Make compression configurable?
layer := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
layer, err := stream.NewLayer(mutate.Extract(old), stream.WithCompressionLevel(gzip.BestCompression))
if err != nil {
return nil, fmt.Errorf("new layer: %w", err)
}

if err := remote.WriteLayer(repo, layer, o.Remote...); err != nil {
return nil, fmt.Errorf("uploading layer: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/zstd/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ func UnzipReadCloser(r io.ReadCloser) (io.ReadCloser, error) {
}, nil
}

func NewReader(r io.Reader) (io.Reader, error) {
return zstd.NewReader(r)
}

func NewWriterLevel(w io.Writer, level int) (*zstd.Encoder, error) {
return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(level)))
}

// Is detects whether the input stream is compressed.
func Is(r io.Reader) (bool, error) {
magicHeader := make([]byte, 4)
Expand Down
32 changes: 32 additions & 0 deletions pkg/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
// Package compression abstracts over gzip and zstd.
package compression

import (
"fmt"

"github.com/google/go-containerregistry/pkg/v1/types"
)

// Compression is an enumeration of the supported compression algorithms
type Compression string

Expand All @@ -24,3 +30,29 @@ const (
GZip Compression = "gzip"
ZStd Compression = "zstd"
)

func (compression Compression) ToMediaType(oci bool) (types.MediaType, error) {
if oci {
switch compression {
case ZStd:
return types.OCILayerZStd, nil
case GZip:
return types.OCILayer, nil
case None:
return types.OCIUncompressedLayer, nil
default:
return types.OCILayer, fmt.Errorf("unsupported compression: %s", compression)
}
} else {
switch compression {
case ZStd:
return types.DockerLayerZstd, nil
case GZip:
return types.DockerLayer, nil
case None:
return types.DockerUncompressedLayer, nil
default:
return types.DockerLayer, fmt.Errorf("unsupported compression: %s", compression)
}
}
}
14 changes: 5 additions & 9 deletions pkg/crane/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
return nil, fmt.Errorf("getting base image media type: %w", err)
}

layerType := types.DockerLayer

if baseMediaType == types.OCIManifestSchema1 {
layerType = types.OCILayer
}
oci := baseMediaType == types.OCIManifestSchema1

layers := make([]v1.Layer, 0, len(paths))
for _, path := range paths {
layer, err := getLayer(path, layerType)
layer, err := getLayer(path, oci)
if err != nil {
return nil, fmt.Errorf("reading layer %q: %w", path, err)
}
Expand All @@ -81,16 +77,16 @@ func Append(base v1.Image, paths ...string) (v1.Image, error) {
return mutate.AppendLayers(base, layers...)
}

func getLayer(path string, layerType types.MediaType) (v1.Layer, error) {
func getLayer(path string, oci bool) (v1.Layer, error) {
f, err := streamFile(path)
if err != nil {
return nil, err
}
if f != nil {
return stream.NewLayer(f, stream.WithMediaType(layerType)), nil
return stream.NewLayer(f, stream.WithOCIMediaType(oci))
}

return tarball.LayerFromFile(path, tarball.WithMediaType(layerType))
return tarball.LayerFromFile(path, tarball.WithOCIMediaType(oci))
}

// If we're dealing with a named pipe, trying to open it multiple times will
Expand Down
6 changes: 5 additions & 1 deletion pkg/v1/layout/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,12 @@ func TestStreamingWriteLayer(t *testing.T) {
return tw.Close()
}())
}()
layer, err := stream.NewLayer(pr)
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
img, err := mutate.Append(empty.Image, mutate.Addendum{
Layer: stream.NewLayer(pr),
Layer: layer,
})
if err != nil {
t.Fatalf("creating random streaming image failed: %v", err)
Expand Down
19 changes: 13 additions & 6 deletions pkg/v1/mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,19 @@ func TestMutateMediaType(t *testing.T) {
}

func TestAppendStreamableLayer(t *testing.T) {
img, err := mutate.AppendLayers(
sourceImage(t),
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("a", 100)))),
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("b", 100)))),
stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("c", 100)))),
)
l1, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("a", 100))))
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
l2, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("b", 100))))
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
l3, err := stream.NewLayer(io.NopCloser(strings.NewReader(strings.Repeat("c", 100))))
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
img, err := mutate.AppendLayers(sourceImage(t), l1, l2, l3)
if err != nil {
t.Fatalf("AppendLayers: %v", err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/v1/remote/multi_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ func streamable(t *testing.T) v1.Layer {
t.Fatalf("Uncompressed(): %v", err)
}

return stream.NewLayer(rc)
l, err := stream.NewLayer(rc)
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}

return l
}

type rawManifest struct {
Expand Down
15 changes: 12 additions & 3 deletions pkg/v1/remote/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,10 @@ func TestDedupeLayers(t *testing.T) {
// Append three identical stream.Layers, whose uploads will *not* be
// deduped since Write can't tell they're identical ahead of time.
for i := 0; i < 3; i++ {
sl := stream.NewLayer(newBlob())
sl, err := stream.NewLayer(newBlob())
if err != nil {
t.Fatalf("stream.NewLayer(#%d): %v", i, err)
}
img, err = mutate.AppendLayers(img, sl)
if err != nil {
t.Fatalf("mutate.AppendLayer(#%d): %v", i, err)
Expand Down Expand Up @@ -697,7 +700,10 @@ func TestStreamLayer(t *testing.T) {
defer closer.Close()

streamLocation := w.url(expectedPath)
sl := stream.NewLayer(newBlob())
sl, err := stream.NewLayer(newBlob())
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}

commitLocation, err := w.streamBlob(context.Background(), sl, streamLocation.String())
if err != nil {
Expand Down Expand Up @@ -856,7 +862,10 @@ func TestUploadOneStreamedLayer(t *testing.T) {
newBlob := func() io.ReadCloser { return io.NopCloser(bytes.NewReader(bytes.Repeat([]byte{'a'}, int(n)))) }
wantDigest := "sha256:3d7c465be28d9e1ed810c42aeb0e747b44441424f566722ba635dc93c947f30e"
wantDiffID := "sha256:27dd1f61b867b6a0f6e9d8a41c43231de52107e53ae424de8f847b821db4b711"
l := stream.NewLayer(newBlob())
l, err := stream.NewLayer(newBlob())
if err != nil {
t.Fatalf("stream.NewLayer: %v", err)
}
if err := w.uploadOne(ctx, l); err != nil {
t.Fatalf("uploadOne: %v", err)
}
Expand Down
95 changes: 73 additions & 22 deletions pkg/v1/stream/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"os"
"sync"

internalcomp "github.com/google/go-containerregistry/internal/compression"
"github.com/google/go-containerregistry/internal/zstd"
"github.com/google/go-containerregistry/pkg/compression"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
)
Expand All @@ -42,14 +45,19 @@ var (

// Layer is a streaming implementation of v1.Layer.
type Layer struct {
blob io.ReadCloser
consumed bool
compression int
closer io.Closer
uncompressedReader io.Reader

consumed bool

compression compression.Compression
compressionLevel int

mu sync.Mutex
digest, diffID *v1.Hash
size int64
mediaType types.MediaType

oci bool
}

var _ v1.Layer = (*Layer)(nil)
Expand All @@ -60,32 +68,54 @@ type LayerOption func(*Layer)
// WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values.
func WithCompressionLevel(level int) LayerOption {
return func(l *Layer) {
l.compression = level
l.compressionLevel = level
}
}

// WithMediaType is a functional option for overriding the layer's media type.
func WithMediaType(mt types.MediaType) LayerOption {
// WithOCIMediaType is a functional option for overriding the layer's media type.
func WithOCIMediaType(oci bool) LayerOption {
return func(l *Layer) {
l.mediaType = mt
l.oci = oci
}
}

// NewLayer creates a Layer from an io.ReadCloser.
func NewLayer(rc io.ReadCloser, opts ...LayerOption) *Layer {
func NewLayer(rc io.ReadCloser, opts ...LayerOption) (*Layer, error) {
comp, peekReader, err := internalcomp.PeekCompression(rc)
if err != nil {
return nil, err
}

layer := &Layer{
blob: rc,
compression: gzip.BestSpeed,
// We use DockerLayer for now as uncompressed layers
// are unimplemented
mediaType: types.DockerLayer,
closer: rc,
compression: comp,
compressionLevel: gzip.BestSpeed,
}

switch comp {
case compression.ZStd:
layer.compression = comp
layer.uncompressedReader, err = zstd.NewReader(peekReader)
if err != nil {
return nil, err
}
case compression.GZip:
layer.compression = comp
layer.uncompressedReader, err = gzip.NewReader(peekReader)
if err != nil {
return nil, err
}
default:
// No support for uncompressed layers for now
layer.compression = compression.GZip
layer.uncompressedReader = peekReader
}

for _, opt := range opts {
opt(layer)
}

return layer
return layer, nil
}

// Digest implements v1.Layer.
Expand Down Expand Up @@ -120,7 +150,7 @@ func (l *Layer) Size() (int64, error) {

// MediaType implements v1.Layer
func (l *Layer) MediaType() (types.MediaType, error) {
return l.mediaType, nil
return l.compression.ToMediaType(l.oci)
}

// Uncompressed implements v1.Layer.
Expand Down Expand Up @@ -183,9 +213,27 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
// Buffer the output of the gzip writer so we don't have to wait on pr to keep writing.
// 64K ought to be small enough for anybody.
bw := bufio.NewWriterSize(mw, 2<<16)
zw, err := gzip.NewWriterLevel(bw, l.compression)
if err != nil {
return nil, err

var compressedWriter io.Writer
var compressedCloser io.Closer

switch l.compression {
case compression.ZStd:
w, err := zstd.NewWriterLevel(bw, l.compressionLevel)
if err != nil {
return nil, err
}
compressedWriter = w
compressedCloser = w
case compression.GZip:
w, err := gzip.NewWriterLevel(bw, l.compressionLevel)
if err != nil {
return nil, err
}
compressedWriter = w
compressedCloser = w
case compression.None:
compressedWriter = bw
}

doneDigesting := make(chan struct{})
Expand All @@ -211,7 +259,7 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
//
// NOTE: net/http will call close on success, so if we've already
// closed the inner rc, it's not an error.
if err := l.blob.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
if err := l.closer.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
return err
}

Expand All @@ -223,13 +271,16 @@ func newCompressedReader(l *Layer) (*compressedReader, error) {
go func() {
// Copy blob into the gzip writer, which also hashes and counts the
// size of the compressed output, and hasher of the raw contents.
_, copyErr := io.Copy(io.MultiWriter(h, zw), l.blob)
_, copyErr := io.Copy(io.MultiWriter(h, compressedWriter), l.uncompressedReader)

// Close the gzip writer once copying is done. If this is done in the
// Close method of compressedReader instead, then it can cause a panic
// when the compressedReader is closed before the blob is fully
// consumed and io.Copy in this goroutine is still blocking.
closeErr := zw.Close()
var closeErr error
if compressedCloser != nil {
closeErr = compressedCloser.Close()
}

// Check errors from writing and closing streams.
if copyErr != nil {
Expand Down
Loading

0 comments on commit 97ff863

Please sign in to comment.