Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pin chunk and index format to schema version. #10213

Merged
merged 25 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
59f90b3
Hack: passing tests in /pkg/chunkenc
kavirajk Aug 7, 2023
c3037de
idk
kavirajk Aug 8, 2023
c904bf9
Fix loki build
kavirajk Aug 10, 2023
bc2238f
test passes except /index/tsdb package
kavirajk Aug 14, 2023
1779a6c
idk
kavirajk Aug 14, 2023
5f637b8
Merge branch 'main' into kavirajk/schema-v13
kavirajk Aug 19, 2023
559ec35
fix unexported chunk format usage
kavirajk Aug 19, 2023
d7382df
pass `TestBuildLegacyWALs` test
kavirajk Aug 20, 2023
5e4a46f
pass `TestCompactor_Compact`
kavirajk Aug 20, 2023
8297725
pass `TestChunkStats`
kavirajk Aug 20, 2023
3cc3ff4
Break cycle dependency with `pkg/storage/chunk` with `pkg/storage/con…
kavirajk Aug 20, 2023
40401c1
`make format`
kavirajk Aug 20, 2023
1714319
Rename `chunkVersion` -> `chunkFormat`
kavirajk Aug 20, 2023
35c0419
Rename `indexVersion()` -> `indexFormat()`
kavirajk Aug 20, 2023
644876b
expose both chunkformat and headformat in single API
kavirajk Aug 21, 2023
0a104cf
Fix `headfmt` initialize correctly on `MemChunk`
kavirajk Aug 21, 2023
5d43b9d
fix failing `Test_SeriesQuery`
kavirajk Aug 21, 2023
8b6787f
Fix TODOs and error handlings
kavirajk Aug 21, 2023
662e733
Introduce v13 entries on index package
kavirajk Aug 21, 2023
431117d
Merge branch 'main' into kavirajk/schema-v13
kavirajk Aug 22, 2023
61ba7df
PR remarks
kavirajk Aug 22, 2023
105f4df
remove unused `latestFormat`
kavirajk Aug 22, 2023
ba2d3ac
PR remarks
kavirajk Aug 22, 2023
dd4cbd9
Make chunk format selection `stream` based instead of `instance` based
kavirajk Aug 23, 2023
8a4ec25
Handle missing error
kavirajk Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 49 additions & 35 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ import (

const (
_ byte = iota
chunkFormatV1
chunkFormatV2
chunkFormatV3
chunkFormatV4

DefaultChunkFormat = chunkFormatV4 // the currently used chunk format
ChunkFormatV1
ChunkFormatV2
ChunkFormatV3
ChunkFormatV4

blocksPerChunk = 10
maxLineLength = 1024 * 1024 * 1024
Expand Down Expand Up @@ -84,10 +82,22 @@ const (
OrderedHeadBlockFmt
UnorderedHeadBlockFmt
UnorderedWithNonIndexedLabelsHeadBlockFmt

DefaultHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt
)

// ChunkHeadFormatFor returns corresponding head block format for the given `chunkfmt`.
func ChunkHeadFormatFor(chunkfmt byte) HeadBlockFmt {
if chunkfmt < ChunkFormatV3 {
return OrderedHeadBlockFmt
}

if chunkfmt == ChunkFormatV3 {
return UnorderedHeadBlockFmt
}

// return the latest head format for all chunkformat >v3
return UnorderedWithNonIndexedLabelsHeadBlockFmt
}

var magicNumber = uint32(0x12EE56A)

// The table gets initialized with sync.Once but may still cause a race
Expand Down Expand Up @@ -293,7 +303,7 @@ func (hb *headBlock) LoadBytes(b []byte) error {
return errors.Wrap(db.err(), "verifying headblock header")
}
switch version {
case chunkFormatV1, chunkFormatV2, chunkFormatV3, chunkFormatV4:
case ChunkFormatV1, ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
default:
return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version)
}
Expand Down Expand Up @@ -344,15 +354,16 @@ type entry struct {
}

// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(DefaultChunkFormat, enc, head, blockSize, targetSize)
func NewMemChunk(chunkFormat byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize)
}

func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
if chunkFmt == chunkFormatV2 && head != OrderedHeadBlockFmt {
if chunkFmt == ChunkFormatV2 && head != OrderedHeadBlockFmt {
panic("only OrderedHeadBlockFmt is supported for V2 chunks")
}
if chunkFmt == chunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
if chunkFmt == ChunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
fmt.Println("received head fmt", head.String())
panic("only UnorderedWithNonIndexedLabelsHeadBlockFmt is supported for V4 chunks")
}
}
Expand Down Expand Up @@ -401,9 +412,9 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
}
bc.format = version
switch version {
case chunkFormatV1:
case ChunkFormatV1:
bc.encoding = EncGZIP
case chunkFormatV2, chunkFormatV3, chunkFormatV4:
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
enc := Encoding(db.byte())
if db.err() != nil {
Expand All @@ -414,6 +425,9 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
return nil, errors.Errorf("invalid version %d", version)
}

// Set the correct headblock format based on chunk format
bc.headFmt = ChunkHeadFormatFor(version)

// readSectionLenAndOffset reads len and offset for different sections within the chunk.
// Starting from chunk version 4, we have started writing offset and length of various sections within the chunk.
// These len and offset pairs would be stored together at the end of the chunk.
Expand All @@ -427,7 +441,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me

metasOffset := uint64(0)
metasLen := uint64(0)
if version >= chunkFormatV4 {
if version >= ChunkFormatV4 {
// version >= 4 starts writing length of sections after their offsets
metasLen, metasOffset = readSectionLenAndOffset(chunkMetasSectionIdx)
} else {
Expand Down Expand Up @@ -458,7 +472,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me

// Read offset and length.
blk.offset = db.uvarint()
if version >= chunkFormatV3 {
if version >= ChunkFormatV3 {
blk.uncompressedSize = db.uvarint()
}
l := db.uvarint()
Expand All @@ -481,7 +495,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
}
}

if version >= chunkFormatV4 {
if version >= ChunkFormatV4 {
nonIndexedLabelsLen, nonIndexedLabelsOffset := readSectionLenAndOffset(chunkNonIndexedLabelsSectionIdx)
lb := b[nonIndexedLabelsOffset : nonIndexedLabelsOffset+nonIndexedLabelsLen] // non-indexed labels Offset + checksum
db = decbuf{b: lb}
Expand Down Expand Up @@ -526,7 +540,7 @@ func (c *MemChunk) Bytes() ([]byte, error) {
func (c *MemChunk) BytesSize() int {
size := 4 // magic number
size++ // format
if c.format > chunkFormatV1 {
if c.format > ChunkFormatV1 {
size++ // chunk format v2+ has a byte for encoding.
}

Expand All @@ -538,7 +552,7 @@ func (c *MemChunk) BytesSize() int {
size += binary.MaxVarintLen64 // mint
size += binary.MaxVarintLen64 // maxt
size += binary.MaxVarintLen32 // offset
if c.format >= chunkFormatV3 {
if c.format >= ChunkFormatV3 {
size += binary.MaxVarintLen32 // uncompressed size
}
size += binary.MaxVarintLen32 // len(b)
Expand All @@ -550,7 +564,7 @@ func (c *MemChunk) BytesSize() int {
size += crc32.Size // metablock crc
size += 8 // metaoffset

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
size += 8 // metablock length

size += c.symbolizer.CheckpointSize() // non-indexed labels block
Expand Down Expand Up @@ -586,7 +600,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
// Write the header (magicNum + version).
eb.putBE32(magicNumber)
eb.putByte(c.format)
if c.format > chunkFormatV1 {
if c.format > ChunkFormatV1 {
// chunk format v2+ has a byte for encoding.
eb.putByte(byte(c.encoding))
}
Expand All @@ -599,7 +613,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
nonIndexedLabelsOffset := offset
nonIndexedLabelsLen := 0

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
var (
n int
crcHash []byte
Expand Down Expand Up @@ -655,7 +669,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
eb.putVarint64(b.mint)
eb.putVarint64(b.maxt)
eb.putUvarint(b.offset)
if c.format >= chunkFormatV3 {
if c.format >= ChunkFormatV3 {
eb.putUvarint(b.uncompressedSize)
}
eb.putUvarint(len(b.b))
Expand All @@ -669,7 +683,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
}
offset += int64(n)

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
// Write non-indexed labels offset and length
eb.reset()
eb.putBE64int(nonIndexedLabelsLen)
Expand All @@ -683,7 +697,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {

// Write the metasOffset.
eb.reset()
if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
eb.putBE64int(metasLen)
}
eb.putBE64int(int(metasOffset))
Expand Down Expand Up @@ -763,7 +777,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
// a great check, but it will guarantee we are always under the target size
newHBSize := c.head.UncompressedSize() + len(e.Line)
nonIndexedLabelsSize := 0
if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
newHBSize += metaLabelsLen(logproto.FromLabelAdaptersToLabels(e.NonIndexedLabels))
// non-indexed labels are compressed while serializing the chunk so we don't know what their size would be after compression.
// As adoption increases, their overall size can be non-trivial so we can't ignore them while calculating chunk size.
Expand All @@ -786,7 +800,7 @@ func (c *MemChunk) UncompressedSize() int {
size += b.uncompressedSize
}

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
size += c.symbolizer.UncompressedSize()
}

Expand All @@ -802,7 +816,7 @@ func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
size += c.head.UncompressedSize()
if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
size += c.symbolizer.UncompressedSize() // length of each symbol
}

Expand All @@ -829,7 +843,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {
return ErrOutOfOrder
}

if c.format < chunkFormatV4 {
if c.format < ChunkFormatV4 {
entry.NonIndexedLabels = nil
}
if err := c.head.Append(entryTimestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.NonIndexedLabels)); err != nil {
Expand Down Expand Up @@ -940,7 +954,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1)

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
stats := stats.FromContext(ctx)
stats.AddCompressedBytes(int64(c.symbolizer.CompressedSize()))
decompressedSize := int64(c.symbolizer.DecompressedSize())
Expand Down Expand Up @@ -1025,7 +1039,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)

if c.format >= chunkFormatV4 {
if c.format >= ChunkFormatV4 {
stats := stats.FromContext(ctx)
stats.AddCompressedBytes(int64(c.symbolizer.CompressedSize()))
decompressedSize := int64(c.symbolizer.DecompressedSize())
Expand Down Expand Up @@ -1095,12 +1109,12 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
// as close as possible, respect the block/target sizes specified. However,
// if the blockSize is not set, use reasonable defaults.
if c.blockSize > 0 {
newChunk = NewMemChunk(c.Encoding(), DefaultHeadBlockFmt, c.blockSize, c.targetSize)
newChunk = NewMemChunk(c.format, c.Encoding(), c.headFmt, c.blockSize, c.targetSize)
} else {
// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk = NewMemChunk(c.Encoding(), DefaultHeadBlockFmt, defaultBlockSize, c.CompressedSize())
newChunk = NewMemChunk(c.format, c.Encoding(), c.headFmt, defaultBlockSize, c.CompressedSize())
}

for itr.Next() {
Expand Down Expand Up @@ -1423,7 +1437,7 @@ func (si *bufferedIterator) moveNext() (int64, []byte, labels.Labels, bool) {

decompressedBytes += int64(lineSize)

if si.format < chunkFormatV4 {
if si.format < ChunkFormatV4 {
si.stats.AddDecompressedBytes(decompressedBytes)
si.stats.AddDecompressedLines(1)
return ts, si.buf[:lineSize], nil, true
Expand Down