Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive index rework to make loading faster #8078

Merged
merged 5 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions go/store/nbs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,28 @@ Index:
+--------------+------------+-----------------+----------+
| ByteSpan Map | Prefix Map | ChunkReferences | Suffixes |
+--------------+------------+-----------------+----------+
- The Index is a concatenation of 4 sections, the first three of which are compressed as one stream. The Suffixes are
are not compressed because they won't compress well. For this reason there are two methods on the footer to get the
- The Index is a concatenation of 4 sections, the first three of which are compressed as one stream. The Suffixes
are not compressed because they won't compress well. For this reason there are two methods on the footer to get
the two spans individually.

ByteSpan Map:
+------------------+------------------+-----+------------------+
| ByteSpanLength 1 | ByteSpanLength 2 | ... | ByteSpanLength N |
| ByteSpanOffset 1 | ByteSpanOffset 2 | ... | ByteSpanOffset N |
+------------------+------------------+-----+------------------+
- The Length of each ByteSpan is recorded as a varuint, and as we read them we will calculate the offset of each.
- The ByteSpanMap is effectively the offset and length of each data block to read from disk. In a series of Uint64s,
we record the _end_ of each ByteSpan. This allows for reading the data quickly at index load time, and then
quick calculation of the length based on the previous ByteSpan.

An example:
+-------------------+-------------------+-------------------+-------------------+
| ByteSpan 1, len 7 | ByteSpan 2, len 3 | ByteSpan 3, len 5 | ByteSpan 4, len 9 |
+-------------------+-------------------+-------------------+-------------------+

Written as the following Uint64 on disk: [7, 10, 15, 24]
- The first ByteSpan is 7 bytes long, and starts at offset 0.
- The second ByteSpan is 3 bytes long, and starts at offset 7.
- The third ByteSpan is 5 bytes long, and starts at offset 10.
- The fourth ByteSpan is 9 bytes long, and starts at offset 15.

The ByteSpan Map contains N ByteSpan Records. The index in the map is considered the ByteSpan's ID, and
is used to reference the ByteSpan in the ChunkRefs. Note that the ByteSpan ID is 1-based, as 0 is reserved to indicate
Expand All @@ -88,12 +101,11 @@ Index:
| ChunkRef 0 | ChunkRef 1 | ... | ChunkRef M-1 |
+------------+------------+-----+--------------+
ChunkRef:
+-------------------------------+--------------------------+
| (uvarint) Dictionary ByteSpan | (uvarint) Chunk ByteSpan |
+-------------------------------+--------------------------+
+------------------------------+-------------------------+
| (Uint32) Dictionary ByteSpan | (Uint32) Chunk ByteSpan |
+------------------------------+-------------------------+
- Dictionary: ID for a ByteSpan to be used as zstd dictionary. 0 refers to the empty ByteSpan, which indicates no dictionary.
- Chunk: ID for the ByteSpan containing the Chunk data. Never 0.
- Dictionary and Chunk ByteSpans are constrained to be uint32, which is plenty. Varints can exceed this value, but we constrain them.

Suffixes:
+--------------------+--------------------+-----+----------------------+
Expand Down
3 changes: 1 addition & 2 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
}

func convertTableFileToArchive(ctx context.Context, cs chunkSource, idx tableIndex, dagGroups *ChunkRelations, archivePath string) (string, hash.Hash, error) {

allChunks, defaultSamples, err := gatherAllChunks(ctx, cs, idx)
if err != nil {
return "", hash.Hash{}, err
Expand Down Expand Up @@ -153,7 +152,7 @@ func convertTableFileToArchive(ctx context.Context, cs chunkSource, idx tableInd
}

// indexAndFinalizeArchive writes the index, metadata, and footer to the archive file. It also flushes the archive writer
// to the directory provided. The name is calculate from the footer, and can be obtained by calling getName on the archive.
// to the directory provided. The name is calculated from the footer, and can be obtained by calling getName on the archive.
func indexAndFinalizeArchive(arcW *archiveWriter, archivePath string) error {
err := arcW.finalizeByteSpans()
if err != nil {
Expand Down
202 changes: 93 additions & 109 deletions go/store/nbs/archive_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,28 @@
package nbs

import (
"bufio"
"crypto/sha512"
"encoding/binary"
"fmt"
"io"
"math"
"math/bits"

"github.com/dolthub/gozstd"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"

"github.com/dolthub/dolt/go/store/hash"
)

// archiveReader is a reader for the archive format. We use primitive type slices where possible. These are read directly
// from disk into memory for speed. The downside is complexity on the read path, but it's all constant time.
type archiveReader struct {
reader io.ReaderAt
prefixes []uint64
byteSpans []byteSpan
chunkRefs []chunkRef
suffixes []suffix
footer footer
dictCache *lru.TwoQueueCache[uint32, *gozstd.DDict]
}

type chunkRef struct {
dict uint32
data uint32
reader io.ReaderAt
prefixes []uint64
byteOffSpans []uint64
chunkRefs []uint32 // Pairs of uint32s. First is the dict id, second is the data id.
suffixes []byte
footer footer
dictCache *lru.TwoQueueCache[uint32, *gozstd.DDict]
}

type suffix [hash.SuffixLen]byte
Expand Down Expand Up @@ -73,21 +67,35 @@ func (f footer) totalIndexSpan() byteSpan {
return byteSpan{offset: f.fileSize - archiveFooterSize - uint64(f.metadataSize) - uint64(f.indexSize), length: uint64(f.indexSize)}
}

// indexCompressedSpan returns the span of the index section of the archive.
func (f footer) indexCompressedSpan() byteSpan {
suffixLen := uint64(f.chunkCount * hash.SuffixLen)
// indexByteOffsetSpan returns the span of the byte offsets section of the index. This is the first part of the index
func (f footer) indexByteOffsetSpan() byteSpan {
totalIdx := f.totalIndexSpan()
return byteSpan{offset: totalIdx.offset, length: totalIdx.length - suffixLen}
return byteSpan{offset: totalIdx.offset, length: uint64(f.byteSpanCount * uint64Size)}
}

// indexPrefixSpan returns the span of the prefix section of the index. This is the second part of the index.
func (f footer) indexPrefixSpan() byteSpan {
// Prefix starts after the byte spans. Length is uint64 * chunk count.
offs := f.indexByteOffsetSpan()
return byteSpan{offs.offset + offs.length, uint64(f.chunkCount) * uint64Size}
}

// indexChunkRefSpan returns the span of the chunk reference section of the index. This is the third part of the index.
func (f footer) indexChunkRefSpan() byteSpan {
// chunk refs starts after the prefix. Length is (uint32 + uint32) * chunk count.
prefixes := f.indexPrefixSpan()
chLen := uint64(f.chunkCount) * (uint32Size + uint32Size)
return byteSpan{prefixes.offset + prefixes.length, chLen}
}

// indexSuffixSpan returns the span of the suffix section of the index. This is the fourth part of the index.
func (f footer) indexSuffixSpan() byteSpan {
suffixLen := uint64(f.chunkCount * hash.SuffixLen)
totalIdx := f.totalIndexSpan()
compressedLen := totalIdx.length - suffixLen

return byteSpan{totalIdx.offset + compressedLen, suffixLen}
chunkRefs := f.indexChunkRefSpan()
return byteSpan{chunkRefs.offset + chunkRefs.length, suffixLen}
}

// metadataSpan returns the span of the metadata section of the archive.
func (f footer) metadataSpan() byteSpan {
return byteSpan{offset: f.fileSize - archiveFooterSize - uint64(f.metadataSize), length: uint64(f.metadataSize)}
}
Expand All @@ -98,86 +106,37 @@ func newArchiveReader(reader io.ReaderAt, fileSize uint64) (archiveReader, error
return archiveReader{}, err
}

indexSpan := footer.indexCompressedSpan()
secRdr := io.NewSectionReader(reader, int64(indexSpan.offset), int64(indexSpan.length))
rawReader := bufio.NewReader(secRdr)

redr, wrtr := io.Pipe()
defer redr.Close()
go func() {
err := gozstd.StreamDecompress(wrtr, rawReader)
if err != nil {
wrtr.CloseWithError(err)
} else {
wrtr.Close()
}
}()
byteReader := bufio.NewReader(redr)

byteSpans := make([]byteSpan, footer.byteSpanCount+1)
byteSpans = append(byteSpans, byteSpan{offset: 0, length: 0}) // Null byteSpan to simplify logic.

offset := uint64(0)
for i := uint32(0); i < footer.byteSpanCount; i++ {
length, err := binary.ReadUvarint(byteReader)
if err != nil {
return archiveReader{}, err
}

if length > math.MaxUint32 {
return archiveReader{}, errors.New("invalid byte span length. Byte span lengths must be uint32s.")
}

byteSpans[i+1] = byteSpan{offset: offset, length: length}
offset += length
byteOffSpan := footer.indexByteOffsetSpan()
secRdr := io.NewSectionReader(reader, int64(byteOffSpan.offset), int64(byteOffSpan.length))
byteSpans := make([]uint64, footer.byteSpanCount+1)
byteSpans[0] = 0 // Null byteSpan to simplify logic.
err = binary.Read(secRdr, binary.BigEndian, byteSpans[1:])
if err != nil {
return archiveReader{}, err
}

lastPrefix := uint64(0)
prefixSpan := footer.indexPrefixSpan()
prefixRdr := io.NewSectionReader(reader, int64(prefixSpan.offset), int64(prefixSpan.length))
prefixes := make([]uint64, footer.chunkCount)
for i := uint32(0); i < footer.chunkCount; i++ {
delta := uint64(0)
err := binary.Read(byteReader, binary.BigEndian, &delta)
if err != nil {
return archiveReader{}, err
}

nextDelta := lastPrefix + delta
if nextDelta < lastPrefix || nextDelta < delta {
return archiveReader{}, errors.New("invalid prefix delta. Overflow occurred.")
}
prefixes[i] = nextDelta
lastPrefix = nextDelta
err = binary.Read(prefixRdr, binary.BigEndian, prefixes[:])
if err != nil {
return archiveReader{}, err
}

chunks := make([]chunkRef, footer.chunkCount)
for i := uint32(0); i < footer.chunkCount; i++ {
dict64, err := binary.ReadUvarint(byteReader)
if err != nil {
return archiveReader{}, err
}
data64, err := binary.ReadUvarint(byteReader)
if err != nil {
return archiveReader{}, err
}

if dict64 > math.MaxUint32 || data64 > math.MaxUint32 {
return archiveReader{}, errors.New("invalid chunk reference. Chunk references must be 32-bit unsigned integers.")
}

chunks[i] = chunkRef{dict: uint32(dict64), data: uint32(data64)}
chunkRefSpan := footer.indexChunkRefSpan()
chunkRdr := io.NewSectionReader(reader, int64(chunkRefSpan.offset), int64(chunkRefSpan.length))
chunks := make([]uint32, footer.chunkCount*2)
err = binary.Read(chunkRdr, binary.BigEndian, chunks[:])
if err != nil {
return archiveReader{}, err
}
// Reading the compressed portion should be complete at this point.

// Read the suffixes.
suffixSpan := footer.indexSuffixSpan()
sufRdr := io.NewSectionReader(reader, int64(suffixSpan.offset), int64(suffixSpan.length))
sufReader := bufio.NewReader(sufRdr)
suffixes := make([]suffix, footer.chunkCount)
for i := uint32(0); i < footer.chunkCount; i++ {
_, err := io.ReadFull(sufReader, suffixes[i][:])
if err != nil {
return archiveReader{}, err
}
suffixes := make([]byte, footer.chunkCount*hash.SuffixLen)
_, err = io.ReadFull(sufRdr, suffixes)
if err != nil {
return archiveReader{}, err
}

dictCache, err := lru.New2Q[uint32, *gozstd.DDict](256)
Expand All @@ -186,13 +145,13 @@ func newArchiveReader(reader io.ReaderAt, fileSize uint64) (archiveReader, error
}

return archiveReader{
reader: reader,
prefixes: prefixes,
byteSpans: byteSpans,
chunkRefs: chunks,
suffixes: suffixes,
footer: footer,
dictCache: dictCache,
reader: reader,
prefixes: prefixes,
byteOffSpans: byteSpans,
chunkRefs: chunks,
suffixes: suffixes,
footer: footer,
dictCache: dictCache,
}, nil
}

Expand Down Expand Up @@ -227,7 +186,7 @@ func loadFooter(reader io.ReaderAt, fileSize uint64) (f footer, err error) {
f.metaCheckSum = sha512Sum(buf[afrMetaChkSumOffset : afrMetaChkSumOffset+sha512.Size])
f.fileSize = fileSize

// calculate the has of the footer. We don't currently verify that this is what was used to load the content.
// calculate the hash of the footer. We don't currently verify that this is what was used to load the content.
sha := sha512.New()
sha.Write(buf)
f.hash = hash.New(sha.Sum(nil)[:hash.ByteLen])
Expand All @@ -246,7 +205,7 @@ func (ai archiveReader) search(hash hash.Hash) int {
}

for idx := possibleMatch; idx < len(ai.prefixes) && ai.prefixes[idx] == prefix; idx++ {
if ai.suffixes[idx] == suffix(targetSfx) {
if ai.getSuffixByID(uint32(idx)) == suffix(targetSfx) {
return idx
}
}
Expand Down Expand Up @@ -287,6 +246,7 @@ func (ai archiveReader) close() error {
return nil
}

// readByteSpan reads the byte span from the archive. This allocates a new byte slice and returns it to the caller.
func (ai archiveReader) readByteSpan(bs byteSpan) ([]byte, error) {
buff := make([]byte, bs.length)
_, err := ai.reader.ReadAt(buff[:], int64(bs.offset))
Expand All @@ -306,12 +266,12 @@ func (ai archiveReader) getRaw(hash hash.Hash) (dict *gozstd.DDict, data []byte,
return nil, nil, nil
}

chunkRef := ai.chunkRefs[idx]
if chunkRef.dict != 0 {
if cached, cacheHit := ai.dictCache.Get(chunkRef.dict); cacheHit {
dictId, dataId := ai.getChunkRef(idx)
if dictId != 0 {
if cached, cacheHit := ai.dictCache.Get(dictId); cacheHit {
dict = cached
} else {
byteSpan := ai.byteSpans[chunkRef.dict]
byteSpan := ai.getByteSpanByID(dictId)
dictBytes, err := ai.readByteSpan(byteSpan)
if err != nil {
return nil, nil, err
Expand All @@ -327,18 +287,42 @@ func (ai archiveReader) getRaw(hash hash.Hash) (dict *gozstd.DDict, data []byte,
return nil, nil, e2
}

ai.dictCache.Add(chunkRef.dict, dict)
ai.dictCache.Add(dictId, dict)
}
}

byteSpan := ai.byteSpans[chunkRef.data]
byteSpan := ai.getByteSpanByID(dataId)
data, err = ai.readByteSpan(byteSpan)
if err != nil {
return nil, nil, err
}
return
}

// getChunkRef returns the dictionary and data references for the chunk at the given index. Assumes good input!
func (ai archiveReader) getChunkRef(idx int) (dict, data uint32) {
// Chunk refs are stored as pairs of uint32s, so we need to double the index.
idx *= 2
return ai.chunkRefs[idx], ai.chunkRefs[idx+1]
}

// getByteSpanByID returns the byte span for the chunk at the given index. Assumes good input!
func (ai archiveReader) getByteSpanByID(id uint32) byteSpan {
if id == 0 {
return byteSpan{}
}
// This works because byteOffSpan[0] == 0. See initialization.
offset := ai.byteOffSpans[id-1]
length := ai.byteOffSpans[id] - offset
return byteSpan{offset: offset, length: length}
}

// getSuffixByID returns the suffix for the chunk at the given index. Assumes good input!
func (ai archiveReader) getSuffixByID(id uint32) suffix {
start := id * hash.SuffixLen
return suffix(ai.suffixes[start : start+hash.SuffixLen])
}

func (ai archiveReader) getMetadata() ([]byte, error) {
return ai.readByteSpan(ai.footer.metadataSpan())
}
Expand Down
8 changes: 4 additions & 4 deletions go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ func TestArchiveSingleChunk(t *testing.T) {

err = aw.writeIndex()
assert.NoError(t, err)
// The 'uncompressed' size of the index is 23 bytes. Compressing such small data is not worth it, but we do verify
// that the index is 35 bytes in this situation.
assert.Equal(t, uint32(35), aw.indexLen)
// Index size is not deterministic from the number of chunks, but when no dictionaries are in play, 36 bytes is correct
// because: 8 (uint64,prefix) + 8 (uint64,offset) + 4 (uint32,dict) + 4 (uint32,data) + 12 (hash.Suffix) = 36
assert.Equal(t, uint32(36), aw.indexLen)

err = aw.writeMetadata([]byte(""))
assert.NoError(t, err)

err = aw.writeFooter()
assert.NoError(t, err)

assert.Equal(t, 10+35+archiveFooterSize, aw.bytesWritten) // 10 data bytes, 35 index bytes + footer
assert.Equal(t, 10+36+archiveFooterSize, aw.bytesWritten) // 10 data bytes, 36 index bytes + footer

theBytes := writer.buff[:writer.pos]
fileSize := uint64(len(theBytes))
Expand Down
Loading
Loading