Skip to content

Commit

Permalink
chunked: use rolling checksum to split files
Browse files Browse the repository at this point in the history
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Dec 23, 2021
1 parent 2e132b4 commit 432e3b9
Showing 1 changed file with 116 additions and 36 deletions.
152 changes: 116 additions & 36 deletions pkg/chunked/compressor/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package compressor
// larger software like the graph drivers.

import (
"bufio"
"encoding/base64"
"io"
"io/ioutil"
Expand All @@ -15,6 +16,50 @@ import (
"github.com/vbatts/tar-split/archive/tar"
)

const RollsumBits = 16

type rollingChecksumReader struct {
reader *bufio.Reader
closed bool
rollsum *RollSum

WrittenOut int64
}

func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) {
if rc.closed {
return false, 0, io.EOF
}
for i := 0; i < len(b); i++ {
n, err := rc.reader.ReadByte()
if err != nil {
if err == io.EOF {
rc.closed = true
if i == 0 {
return false, 0, err
}
return false, i, nil
}
// Report any other error type
return false, -1, err
}
b[i] = n
rc.WrittenOut++
rc.rollsum.Roll(n)
if rc.rollsum.OnSplitWithBits(RollsumBits) {
return true, i + 1, nil
}
}
return false, len(b), nil
}

type chunk struct {
ChunkOffset int64
Offset int64
Checksum string
ChunkSize int64
}

func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error {
// total written so far. Used to retrieve partial offsets in the file
dest := ioutils.NewWriteCounter(destFile)
Expand Down Expand Up @@ -64,40 +109,64 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
if _, err := zstdWriter.Write(rawBytes); err != nil {
return err
}
payloadDigester := digest.Canonical.Digester()
payloadChecksum := payloadDigester.Hash()

payloadDest := io.MultiWriter(payloadChecksum, zstdWriter)
payloadDigester := digest.Canonical.Digester()
chunkDigester := digest.Canonical.Digester()

// Now handle the payload, if any
var startOffset, endOffset int64
startOffset := int64(0)
lastOffset := int64(0)
lastChunkOffset := int64(0)

checksum := ""

chunks := []chunk{}

rcReader := &rollingChecksumReader{
reader: bufio.NewReader(tr),
rollsum: NewRollSum(),
}

payloadDest := io.MultiWriter(payloadDigester.Hash(), chunkDigester.Hash(), zstdWriter)
for {
read, errRead := tr.Read(buf)
mustSplit, read, errRead := rcReader.Read(buf)
if errRead != nil && errRead != io.EOF {
return err
}

// restart the compression only if there is
// a payload.
// restart the compression only if there is a payload.
if read > 0 {
if startOffset == 0 {
startOffset, err = restartCompression()
if err != nil {
return err
}
lastOffset = startOffset
}

if _, err := payloadDest.Write(buf[:read]); err != nil {
return err
}
_, err := payloadDest.Write(buf[:read])
}
if (mustSplit || errRead == io.EOF) && startOffset > 0 {
off, err := restartCompression()
if err != nil {
return err
}

chunks = append(chunks, chunk{
ChunkOffset: lastChunkOffset,
Offset: lastOffset,
Checksum: chunkDigester.Digest().String(),
ChunkSize: rcReader.WrittenOut - lastChunkOffset,
})

lastOffset = off
lastChunkOffset = rcReader.WrittenOut
chunkDigester = digest.Canonical.Digester()
payloadDest = io.MultiWriter(payloadDigester.Hash(), chunkDigester.Hash(), zstdWriter)
}
if errRead == io.EOF {
if startOffset > 0 {
endOffset, err = restartCompression()
if err != nil {
return err
}
checksum = payloadDigester.Digest().String()
}
break
Expand All @@ -112,30 +181,41 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
for k, v := range hdr.Xattrs {
xattrs[k] = base64.StdEncoding.EncodeToString([]byte(v))
}
m := internal.FileMetadata{
Type: typ,
Name: hdr.Name,
Linkname: hdr.Linkname,
Mode: hdr.Mode,
Size: hdr.Size,
UID: hdr.Uid,
GID: hdr.Gid,
ModTime: hdr.ModTime,
AccessTime: hdr.AccessTime,
ChangeTime: hdr.ChangeTime,
Devmajor: hdr.Devmajor,
Devminor: hdr.Devminor,
Xattrs: xattrs,
Digest: checksum,
Offset: startOffset,
EndOffset: endOffset,

// ChunkSize is 0 for the last chunk
ChunkSize: 0,
ChunkOffset: 0,
ChunkDigest: checksum,
entries := []internal.FileMetadata{
{
Type: typ,
Name: hdr.Name,
Linkname: hdr.Linkname,
Mode: hdr.Mode,
Size: hdr.Size,
UID: hdr.Uid,
GID: hdr.Gid,
ModTime: &hdr.ModTime,
AccessTime: &hdr.AccessTime,
ChangeTime: &hdr.ChangeTime,
Devmajor: hdr.Devmajor,
Devminor: hdr.Devminor,
Xattrs: xattrs,
Digest: checksum,
Offset: startOffset,
EndOffset: lastOffset,
},
}
for i := 1; i < len(chunks); i++ {
entries = append(entries, internal.FileMetadata{
Type: internal.TypeChunk,
Name: hdr.Name,
ChunkOffset: chunks[i].ChunkOffset,
})
}
if len(chunks) > 1 {
for i := range chunks {
entries[i].ChunkSize = chunks[i].ChunkSize
entries[i].Offset = chunks[i].Offset
entries[i].ChunkDigest = chunks[i].Checksum
}
}
metadata = append(metadata, m)
metadata = append(metadata, entries...)
}

rawBytes := tr.RawBytes()
Expand Down

0 comments on commit 432e3b9

Please sign in to comment.