Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to support caching compression. #867

Merged
merged 6 commits into from
Dec 13, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
119 changes: 75 additions & 44 deletions pkg/v1/tarball/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ import (
"io"
"io/ioutil"
"os"
"sync"

v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/google/go-containerregistry/pkg/v1/v1util"
)

type layer struct {
digest v1.Hash
diffID v1.Hash
size int64
opener Opener
compressed bool
compression int
digest v1.Hash
diffID v1.Hash
size int64
compressedopener Opener
uncompressedopener Opener
compression int
}

func (l *layer) Digest() (v1.Hash, error) {
Expand All @@ -44,21 +45,11 @@ func (l *layer) DiffID() (v1.Hash, error) {
}

func (l *layer) Compressed() (io.ReadCloser, error) {
rc, err := l.opener()
if err == nil && !l.compressed {
return v1util.GzipReadCloserLevel(rc, l.compression), nil
}

return rc, err
return l.compressedopener()
}

func (l *layer) Uncompressed() (io.ReadCloser, error) {
rc, err := l.opener()
if err == nil && l.compressed {
return v1util.GunzipReadCloser(rc)
}

return rc, err
return l.uncompressedopener()
}

func (l *layer) Size() (int64, error) {
Expand All @@ -72,13 +63,41 @@ func (l *layer) MediaType() (types.MediaType, error) {
// LayerOption applies options to layer
type LayerOption func(*layer)

// WithCompressionLevel sets the gzip compression. See `gzip.NewWriterLevel` for possible values.
// WithCompressionLevel is a functional option for overriding the default
// compression level used for compressing uncompressed tarballs.
func WithCompressionLevel(level int) LayerOption {
return func(l *layer) {
l.compression = level
}
}

// WithCompressedCaching is a functional option that overrides the
// logic for accessing the compressed bytes to memoize the result
// and avoid expensive repeated gzips.
func WithCompressedCaching(l *layer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an exported method that takes a param of an unexported type, so I don't think any external package can actually successfully call it.

Should it just be unexported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implements LayerOption, it is meant to pass to LayerFromOpener

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To pile on: tarball.LayerFromOpener(opener, tarball.WithCompressedCaching) -- it doesn't get called, just passed in.

We could thunk it or mess with signatures to make it clearer that this is an option? Or just document it as WithCompressedCaching is a functional option that....

var once sync.Once
var err error

buf := bytes.NewBuffer(nil)
og := l.compressedopener

l.compressedopener = func() (io.ReadCloser, error) {
once.Do(func() {
var rc io.ReadCloser
rc, err = og()
if err == nil {
defer rc.Close()
_, err = io.Copy(buf, rc)
}
})
if err != nil {
return nil, err
}

return ioutil.NopCloser(bytes.NewBuffer(buf.Bytes())), nil
}
}

// LayerFromFile returns a v1.Layer given a tarball
func LayerFromFile(path string, opts ...LayerOption) (v1.Layer, error) {
opener := func() (io.ReadCloser, error) {
Expand All @@ -87,7 +106,16 @@ func LayerFromFile(path string, opts ...LayerOption) (v1.Layer, error) {
return LayerFromOpener(opener, opts...)
}

// LayerFromOpener returns a v1.Layer given an Opener function
// LayerFromOpener returns a v1.Layer given an Opener function.
// The Opener may return either an uncompressed tarball (common),
// or a compressed tarball (uncommon).
//
// When using this in conjunction with something like remote.Write
// the uncompressed path may end up gzipping things multiple times:
// 1. Compute the layer SHA256
// 2. Upload the compressed layer.
// Since gzip can be expensive, we support an option to memoize the
// compression that can be passed here: tarball.WithCompressedCaching
func LayerFromOpener(opener Opener, opts ...LayerOption) (v1.Layer, error) {
rc, err := opener()
if err != nil {
Expand All @@ -101,20 +129,38 @@ func LayerFromOpener(opener Opener, opts ...LayerOption) (v1.Layer, error) {
}

layer := &layer{
compressed: compressed,
compression: gzip.BestSpeed,
opener: opener,
}

if compressed {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to drop the above (L123) use of IsGzipped because this came up recently and is really annoying: #112 (comment)

We should be able to call opener just once, which would fix this... what about something like this:

Use a bufio.Reader to Peek at the first 2 bytes. We technically only need a 2 byte buffer for this, but we should probably do 4K to read (at least) a whole block. This will let us know if it's compressed without consuming the reader.

crane append uses tarball.LayerFromFile, which we should usually be able to call multiple times, but that doesn't seem to work with process substitution without fiddling.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so I went ahead and fixed the thing that I actually care about: #868

I still think it's a good idea to use a bufio.Reader here to peek at stuff, but I care about 15% less about it than I did before.

layer.compressedopener = opener
layer.uncompressedopener = func() (io.ReadCloser, error) {
rc, err := opener()
if err != nil {
return nil, err
}
return v1util.GunzipReadCloser(rc)
}
} else {
layer.uncompressedopener = opener
layer.compressedopener = func() (io.ReadCloser, error) {
rc, err := opener()
if err != nil {
return nil, err
}
return v1util.GzipReadCloserLevel(rc, layer.compression), nil
}
}

for _, opt := range opts {
opt(layer)
}

if layer.digest, layer.size, err = computeDigest(opener, compressed, layer.compression); err != nil {
if layer.digest, layer.size, err = computeDigest(layer.compressedopener); err != nil {
return nil, err
}

if layer.diffID, err = computeDiffID(opener, compressed); err != nil {
if layer.diffID, err = computeDiffID(layer.uncompressedopener); err != nil {
return nil, err
}

Expand All @@ -133,38 +179,23 @@ func LayerFromReader(reader io.Reader, opts ...LayerOption) (v1.Layer, error) {
}, opts...)
}

func computeDigest(opener Opener, compressed bool, compression int) (v1.Hash, int64, error) {
func computeDigest(opener Opener) (v1.Hash, int64, error) {
rc, err := opener()
if err != nil {
return v1.Hash{}, 0, err
}
defer rc.Close()

if compressed {
return v1.SHA256(rc)
}

return v1.SHA256(v1util.GzipReadCloserLevel(ioutil.NopCloser(rc), compression))
return v1.SHA256(rc)
}

func computeDiffID(opener Opener, compressed bool) (v1.Hash, error) {
func computeDiffID(opener Opener) (v1.Hash, error) {
rc, err := opener()
if err != nil {
return v1.Hash{}, err
}
defer rc.Close()

if !compressed {
digest, _, err := v1.SHA256(rc)
return digest, err
}

reader, err := gzip.NewReader(rc)
if err != nil {
return v1.Hash{}, err
}
defer reader.Close()

diffID, _, err := v1.SHA256(reader)
return diffID, err
digest, _, err := v1.SHA256(rc)
return digest, err
}
29 changes: 27 additions & 2 deletions pkg/v1/tarball/layer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,37 @@ func TestLayerFromOpenerReader(t *testing.T) {
if err != nil {
t.Fatalf("Unable to read tar file: %v", err)
}
count := 0
ucOpener := func() (io.ReadCloser, error) {
count++
return ioutil.NopCloser(bytes.NewReader(ucBytes)), nil
}
tarLayer, err := LayerFromOpener(ucOpener)
tarLayer, err := LayerFromOpener(ucOpener, WithCompressedCaching)
if err != nil {
t.Fatalf("Unable to create layer from tar file: %v", err)
t.Fatal("Unable to create layer from tar file:", err)
}
for i := 0; i < 10; i++ {
tarLayer.Compressed()
}

// Store the count and reset the counter.
cachedCount := count
count = 0

tarLayer, err = LayerFromOpener(ucOpener)
if err != nil {
t.Fatal("Unable to create layer from tar file:", err)
}
for i := 0; i < 10; i++ {
tarLayer.Compressed()
}

// We expect three calls: gzip sniff, diffid computation, cached compression
if cachedCount != 3 {
t.Errorf("cached count = %d, wanted %d", cachedCount, 3)
}
if cachedCount+10 != count {
t.Errorf("count = %d, wanted %d", count, cachedCount+10)
}

gzBytes, err := ioutil.ReadFile("gzip_content.tgz")
Expand Down