Skip to content

Commit

Permalink
chunked: support converting existing images
Browse files Browse the repository at this point in the history
if the "convert_images" option is set in the configuration file, then
convert traditional images to the chunked format on the fly.

This is very expensive at the moment since the entire zstd:chunked
file is created and then processed.

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Jul 25, 2023
1 parent 1de59bc commit 2e1408a
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 23 deletions.
3 changes: 1 addition & 2 deletions pkg/chunked/compression_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chunked
import (
archivetar "archive/tar"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -150,7 +149,7 @@ func readEstargzChunkedManifest(blobStream ImageSourceSeekable, blobSize int64,
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream. The blob total size must
// be specified.
// This function uses the io.github.containers.zstd-chunked. annotations when specified.
func readZstdChunkedManifest(ctx context.Context, blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, []byte, int64, error) {
func readZstdChunkedManifest(blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, []byte, int64, error) {
footerSize := int64(internal.FooterSizeSupported)
if blobSize <= footerSize {
return nil, nil, 0, errors.New("blob too small")
Expand Down
179 changes: 160 additions & 19 deletions pkg/chunked/storage_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
graphdriver "github.com/containers/storage/drivers"
driversCopy "github.com/containers/storage/drivers/copy"
"github.com/containers/storage/pkg/archive"
"github.com/containers/storage/pkg/chunked/compressor"
"github.com/containers/storage/pkg/chunked/internal"
"github.com/containers/storage/pkg/idtools"
"github.com/containers/storage/pkg/system"
Expand Down Expand Up @@ -70,6 +71,12 @@ type chunkedDiffer struct {
rawReader io.Reader

tocDigest digest.Digest

convertToZstdChunked bool

blobSize int64

storeOpts *types.StoreOptions
}

var xattrsToIgnore = map[string]interface{}{
Expand Down Expand Up @@ -166,19 +173,131 @@ func GetTOCDigest(annotations map[string]string) (*digest.Digest, error) {
return nil, nil
}

type seekableFile struct {
file *os.File
}

func (f *seekableFile) Close() error {
return f.file.Close()
}

func (f *seekableFile) GetBlobAt(chunks []ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
streams := make(chan io.ReadCloser)
errs := make(chan error)

go func() {
for _, chunk := range chunks {
streams <- io.NopCloser(io.NewSectionReader(f.file, int64(chunk.Offset), int64(chunk.Length)))
}
close(streams)
close(errs)
}()

return streams, errs, nil
}

func retrieveWholeFile(destDirectory string, blobSize int64, iss ImageSourceSeekable) (*seekableFile, digest.Digest, map[string]string, error) {
var payload io.ReadCloser
var streams chan io.ReadCloser
var errs chan error
var err error

chunksToRequest := []ImageSourceChunk{
{
Offset: 0,
Length: uint64(blobSize),
},
}

streams, errs, err = iss.GetBlobAt(chunksToRequest)
if err != nil {
return nil, "", nil, err
}
select {
case p := <-streams:
payload = p
case err := <-errs:
return nil, "", nil, err
}
if payload == nil {
return nil, "", nil, errors.New("invalid stream returned")
}

diff, err := archive.DecompressStream(payload)
if err != nil {
return nil, "", nil, err
}

fd, err := unix.Open(destDirectory, unix.O_TMPFILE|unix.O_RDWR|unix.O_CLOEXEC, 0o600)
if err != nil {
return nil, "", nil, err
}

f := os.NewFile(uintptr(fd), destDirectory)

newAnnotations := make(map[string]string)
chunked, err := compressor.ZstdCompressor(f, newAnnotations, nil)
if err != nil {
f.Close()
return nil, "", nil, err
}

digester := digest.Canonical.Digester()
hash := digester.Hash()

if _, err := io.Copy(io.MultiWriter(chunked, hash), diff); err != nil {
f.Close()
return nil, "", nil, err
}
if err := chunked.Close(); err != nil {
f.Close()
return nil, "", nil, err
}
is := seekableFile{
file: f,
}
return &is, digester.Digest(), newAnnotations, nil
}

// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer.
func GetDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (graphdriver.Differ, error) {
storeOpts, err := types.DefaultStoreOptionsAutoDetectUID()
if err != nil {
return nil, err
}

if _, ok := annotations[internal.ManifestChecksumKey]; ok {
return makeZstdChunkedDiffer(ctx, store, blobSize, annotations, iss)
return makeZstdChunkedDiffer(ctx, store, blobSize, annotations, iss, &storeOpts)
}
if _, ok := annotations[estargz.TOCJSONDigestAnnotation]; ok {
return makeEstargzChunkedDiffer(ctx, store, blobSize, annotations, iss)
return makeEstargzChunkedDiffer(ctx, store, blobSize, annotations, iss, &storeOpts)
}
return nil, errors.New("blob type not supported for partial retrieval")

return makeConvertDiffer(ctx, store, blobSize, annotations, iss, &storeOpts)
}

func makeZstdChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (*chunkedDiffer, error) {
manifest, tarSplit, tocOffset, err := readZstdChunkedManifest(ctx, iss, blobSize, annotations)
func makeConvertDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable, storeOpts *types.StoreOptions) (*chunkedDiffer, error) {
if !parseBooleanPullOption(storeOpts, "convert_images", false) {
return nil, errors.New("convert_images not configured")
}

layersCache, err := getLayersCache(store)
if err != nil {
return nil, err
}

return &chunkedDiffer{
blobSize: blobSize,
convertToZstdChunked: true,
copyBuffer: makeCopyBuffer(),
layersCache: layersCache,
storeOpts: storeOpts,
stream: iss,
}, nil
}

func makeZstdChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable, storeOpts *types.StoreOptions) (*chunkedDiffer, error) {
manifest, tarSplit, tocOffset, err := readZstdChunkedManifest(iss, blobSize, annotations)
if err != nil {
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err)
}
Expand All @@ -193,18 +312,20 @@ func makeZstdChunkedDiffer(ctx context.Context, store storage.Store, blobSize in
}

return &chunkedDiffer{
blobSize: blobSize,
copyBuffer: makeCopyBuffer(),
fileType: fileTypeZstdChunked,
layersCache: layersCache,
manifest: manifest,
storeOpts: storeOpts,
stream: iss,
tarSplit: tarSplit,
tocOffset: tocOffset,
tocDigest: tocDigest,
tocOffset: tocOffset,
}, nil
}

func makeEstargzChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (*chunkedDiffer, error) {
func makeEstargzChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable, storeOpts *types.StoreOptions) (*chunkedDiffer, error) {
manifest, tocOffset, err := readEstargzChunkedManifest(iss, blobSize, annotations)
if err != nil {
return nil, fmt.Errorf("read zstd:chunked manifest: %w", err)
Expand All @@ -220,13 +341,15 @@ func makeEstargzChunkedDiffer(ctx context.Context, store storage.Store, blobSize
}

return &chunkedDiffer{
blobSize: blobSize,
copyBuffer: makeCopyBuffer(),
stream: iss,
manifest: manifest,
layersCache: layersCache,
tocOffset: tocOffset,
fileType: fileTypeEstargz,
layersCache: layersCache,
manifest: manifest,
storeOpts: storeOpts,
stream: iss,
tocDigest: tocDigest,
tocOffset: tocOffset,
}, nil
}

Expand Down Expand Up @@ -1372,6 +1495,29 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
}
}()

if c.convertToZstdChunked {
fileSource, diffID, annotations, err := retrieveWholeFile(dest, c.blobSize, c.stream)
if err != nil {
return graphdriver.DriverWithDifferOutput{}, err
}
defer fileSource.Close()

c.stream = fileSource

manifest, tarSplit, tocOffset, err := readZstdChunkedManifest(c.stream, c.blobSize, annotations)
if err != nil {
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("read zstd:chunked manifest: %w", err)
}

c.fileType = fileTypeZstdChunked
c.manifest = manifest
c.convertToZstdChunked = false
c.tarSplit = tarSplit
// since we retrieved the whole file, use the diffID instead of the TOC digest.
c.tocDigest = diffID
c.tocOffset = tocOffset
}

lcd := chunkedLayerData{
Format: differOpts.Format,
}
Expand All @@ -1391,21 +1537,16 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions, diff
TOCDigest: c.tocDigest,
}

storeOpts, err := types.DefaultStoreOptionsAutoDetectUID()
if err != nil {
return output, err
}

if !parseBooleanPullOption(&storeOpts, "enable_partial_images", false) {
if !parseBooleanPullOption(c.storeOpts, "enable_partial_images", false) {
return output, errors.New("enable_partial_images not configured")
}

// When the hard links deduplication is used, file attributes are ignored because setting them
// modifies the source file as well.
useHardLinks := parseBooleanPullOption(&storeOpts, "use_hard_links", false)
useHardLinks := parseBooleanPullOption(c.storeOpts, "use_hard_links", false)

// List of OSTree repositories to use for deduplication
ostreeRepos := strings.Split(storeOpts.PullOptions["ostree_repos"], ":")
ostreeRepos := strings.Split(c.storeOpts.PullOptions["ostree_repos"], ":")

// Generate the manifest
toc, err := unmarshalToc(c.manifest)
Expand Down
3 changes: 1 addition & 2 deletions pkg/chunked/zstdchunked_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package chunked
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -161,7 +160,7 @@ func TestGenerateAndParseManifest(t *testing.T) {
t: t,
}

manifest, _, _, err := readZstdChunkedManifest(context.TODO(), s, 8192, annotations)
manifest, _, _, err := readZstdChunkedManifest(s, 8192, annotations)
if err != nil {
t.Error(err)
}
Expand Down

0 comments on commit 2e1408a

Please sign in to comment.