Skip to content

Commit

Permalink
pkg/chunked: add support for sparse files
Browse files Browse the repository at this point in the history
automatically detect holes in sparse files (the threshold is hardcoded
at 1kb for now) and add this information to the manifest file.

The receiver will create a hole (using unix.Seek and unix.Ftruncate)
instead of writing the actual zeros.

Closes: #1091

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Jan 13, 2022
1 parent fd89b93 commit 1988208
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 53 deletions.
4 changes: 3 additions & 1 deletion pkg/chunked/cache_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) {
for iter.ReadArray() {
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
switch field {
case "type", "name", "linkName", "digest", "chunkDigest":
case "type", "name", "linkName", "digest", "chunkDigest", "chunkType":
count += len(iter.ReadStringAsSlice())
case "xattrs":
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
Expand Down Expand Up @@ -609,6 +609,8 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) {
m.ChunkOffset = iter.ReadInt64()
case "chunkDigest":
m.ChunkDigest = getString(iter.ReadStringAsSlice())
case "chunkType":
m.ChunkType = getString(iter.ReadStringAsSlice())
case "xattrs":
m.Xattrs = make(map[string]string)
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
Expand Down
176 changes: 165 additions & 11 deletions pkg/chunked/compressor/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,152 @@ import (
)

const RollsumBits = 16
const holesThreshold = int64(1 << 10)

type holesFinder struct {
reader *bufio.Reader
fileOff int64
zeros int64
from int64
threshold int64

state int
}

const (
holesFinderStateRead = iota
holesFinderStateAccumulate
holesFinderStateFound
holesFinderStateEOF
)

// ReadByte reads a single byte from the underlying reader.
// If a single byte is read, the return value is (0, RAW-BYTE-VALUE, nil).
// If there are at least f.THRESHOLD consecutive zeros, then the
// return value is (N_CONSECUTIVE_ZEROS, '\x00').
func (f *holesFinder) ReadByte() (int64, byte, error) {
for {
switch f.state {
// reading the file stream
case holesFinderStateRead:
if f.zeros > 0 {
f.zeros--
return 0, 0, nil
}
b, err := f.reader.ReadByte()
if err != nil {
return 0, b, err
}

if b != 0 {
return 0, b, err
}

f.zeros = 1
if f.zeros == f.threshold {
f.state = holesFinderStateFound
} else {
f.state = holesFinderStateAccumulate
}
// accumulating zeros, but still didn't reach the threshold
case holesFinderStateAccumulate:
b, err := f.reader.ReadByte()
if err != nil {
if err == io.EOF {
f.state = holesFinderStateEOF
continue
}
return 0, b, err
}

if b == 0 {
f.zeros++
if f.zeros == f.threshold {
f.state = holesFinderStateFound
}
} else {
if f.reader.UnreadByte(); err != nil {
return 0, 0, err
}
f.state = holesFinderStateRead
}
// found a hole. Number of zeros >= threshold
case holesFinderStateFound:
b, err := f.reader.ReadByte()
if err != nil {
if err == io.EOF {
f.state = holesFinderStateEOF
}
holeLen := f.zeros
f.zeros = 0
return holeLen, 0, nil
}
if b != 0 {
if f.reader.UnreadByte(); err != nil {
return 0, 0, err
}
f.state = holesFinderStateRead

holeLen := f.zeros
f.zeros = 0
return holeLen, 0, nil
}
f.zeros++
// reached EOF. Flush pending zeros if any.
case holesFinderStateEOF:
if f.zeros > 0 {
f.zeros--
return 0, 0, nil
}
return 0, 0, io.EOF
}
}
}

type rollingChecksumReader struct {
reader *bufio.Reader
closed bool
rollsum *RollSum
reader *holesFinder
closed bool
rollsum *RollSum
pendingHole int64

// WrittenOut is the total number of bytes read from
// the stream.
WrittenOut int64

// IsLastChunkZeros tells whether the last generated
// chunk is a hole (made of consecutive zeros). If it
// is false, then the last chunk is a data chunk
// generated by the rolling checksum.
IsLastChunkZeros bool
}

func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) {
rc.IsLastChunkZeros = false

if rc.pendingHole > 0 {
toCopy := int64(len(b))
if rc.pendingHole < toCopy {
toCopy = rc.pendingHole
}
rc.pendingHole -= toCopy
for i := int64(0); i < toCopy; i++ {
b[i] = 0
}

rc.WrittenOut += toCopy

rc.IsLastChunkZeros = true

// if there are no other zeros left, terminate the chunk
return rc.pendingHole == 0, int(toCopy), nil
}

if rc.closed {
return false, 0, io.EOF
}

for i := 0; i < len(b); i++ {
n, err := rc.reader.ReadByte()
holeLen, n, err := rc.reader.ReadByte()
if err != nil {
if err == io.EOF {
rc.closed = true
Expand All @@ -43,6 +174,13 @@ func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) {
// Report any other error type
return false, -1, err
}
if holeLen > 0 {
for j := int64(0); j < holeLen; j++ {
rc.rollsum.Roll(0)
}
rc.pendingHole = holeLen
return true, i, nil
}
b[i] = n
rc.WrittenOut++
rc.rollsum.Roll(n)
Expand All @@ -58,6 +196,7 @@ type chunk struct {
Offset int64
Checksum string
ChunkSize int64
ChunkType string
}

func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error {
Expand Down Expand Up @@ -119,8 +258,13 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r

chunks := []chunk{}

hf := &holesFinder{
threshold: holesThreshold,
reader: bufio.NewReader(tr),
}

rcReader := &rollingChecksumReader{
reader: bufio.NewReader(tr),
reader: hf,
rollsum: NewRollSum(),
}

Expand Down Expand Up @@ -150,12 +294,21 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
return err
}

chunks = append(chunks, chunk{
ChunkOffset: lastChunkOffset,
Offset: lastOffset,
Checksum: chunkDigester.Digest().String(),
ChunkSize: rcReader.WrittenOut - lastChunkOffset,
})
chunkSize := rcReader.WrittenOut - lastChunkOffset
if chunkSize > 0 {
chunkType := internal.ChunkTypeData
if rcReader.IsLastChunkZeros {
chunkType = internal.ChunkTypeZeros
}

chunks = append(chunks, chunk{
ChunkOffset: lastChunkOffset,
Offset: lastOffset,
Checksum: chunkDigester.Digest().String(),
ChunkSize: chunkSize,
ChunkType: chunkType,
})
}

lastOffset = off
lastChunkOffset = rcReader.WrittenOut
Expand Down Expand Up @@ -210,6 +363,7 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r
entries[i].ChunkSize = chunks[i].ChunkSize
entries[i].Offset = chunks[i].Offset
entries[i].ChunkDigest = chunks[i].Checksum
entries[i].ChunkType = chunks[i].ChunkType
}
}
metadata = append(metadata, entries...)
Expand Down
90 changes: 90 additions & 0 deletions pkg/chunked/compressor/compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package compressor

import (
"bufio"
"bytes"
"io"
"testing"
)

func TestHole(t *testing.T) {
data := []byte("\x00\x00\x00\x00\x00")

hf := &holesFinder{
threshold: 1,
reader: bufio.NewReader(bytes.NewReader(data)),
}

hole, _, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 5 {
t.Error("expected hole not found")
}

if _, _, err := hf.ReadByte(); err != io.EOF {
t.Errorf("EOF not found")
}

hf = &holesFinder{
threshold: 1000,
reader: bufio.NewReader(bytes.NewReader(data)),
}
for i := 0; i < 5; i++ {
hole, byte, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 0 {
t.Error("hole found")
}
if byte != 0 {
t.Error("wrong read")
}
}
if _, _, err := hf.ReadByte(); err != io.EOF {
t.Error("didn't receive EOF")
}
}

func TestTwoHoles(t *testing.T) {
data := []byte("\x00\x00\x00\x00\x00FOO\x00\x00\x00\x00\x00")

hf := &holesFinder{
threshold: 2,
reader: bufio.NewReader(bytes.NewReader(data)),
}

hole, _, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 5 {
t.Error("hole not found")
}

for _, e := range []byte("FOO") {
hole, c, err := hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 0 {
t.Error("hole found")
}
if c != e {
t.Errorf("wrong byte read %v instead of %v", c, e)
}
}
hole, _, err = hf.ReadByte()
if err != nil {
t.Errorf("got error: %w", err)
}
if hole != 5 {
t.Error("expected hole not found")
}

if _, _, err := hf.ReadByte(); err != io.EOF {
t.Error("didn't receive EOF")
}
}
6 changes: 6 additions & 0 deletions pkg/chunked/internal/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ type FileMetadata struct {
ChunkSize int64 `json:"chunkSize,omitempty"`
ChunkOffset int64 `json:"chunkOffset,omitempty"`
ChunkDigest string `json:"chunkDigest,omitempty"`
ChunkType string `json:"chunkType,omitempty"`

// internal: computed by mergeTOCEntries.
Chunks []*FileMetadata `json:"-"`
}

const (
ChunkTypeData = ""
ChunkTypeZeros = "zeros"
)

const (
TypeReg = "reg"
TypeChunk = "chunk"
Expand Down
Loading

0 comments on commit 1988208

Please sign in to comment.