Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pin chunk and index format to schema version. #10213

Merged
merged 25 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
59f90b3
Hack: passing tests in /pkg/chunkenc
kavirajk Aug 7, 2023
c3037de
idk
kavirajk Aug 8, 2023
c904bf9
Fix loki build
kavirajk Aug 10, 2023
bc2238f
test passes except /index/tsdb package
kavirajk Aug 14, 2023
1779a6c
idk
kavirajk Aug 14, 2023
5f637b8
Merge branch 'main' into kavirajk/schema-v13
kavirajk Aug 19, 2023
559ec35
fix unexported chunk format usage
kavirajk Aug 19, 2023
d7382df
pass `TestBuildLegacyWALs` test
kavirajk Aug 20, 2023
5e4a46f
pass `TestCompactor_Compact`
kavirajk Aug 20, 2023
8297725
pass `TestChunkStats`
kavirajk Aug 20, 2023
3cc3ff4
Break cycle dependency with `pkg/storage/chunk` with `pkg/storage/con…
kavirajk Aug 20, 2023
40401c1
`make format`
kavirajk Aug 20, 2023
1714319
Rename `chunkVersion` -> `chunkFormat`
kavirajk Aug 20, 2023
35c0419
Rename `indexVersion()` -> `indexFormat()`
kavirajk Aug 20, 2023
644876b
expose both chunkformat and headformat in single API
kavirajk Aug 21, 2023
0a104cf
Fix `headfmt` initialize correctly on `MemChunk`
kavirajk Aug 21, 2023
5d43b9d
fix failing `Test_SeriesQuery`
kavirajk Aug 21, 2023
8b6787f
Fix TODOs and error handlings
kavirajk Aug 21, 2023
662e733
Introduce v13 entries on index package
kavirajk Aug 21, 2023
431117d
Merge branch 'main' into kavirajk/schema-v13
kavirajk Aug 22, 2023
61ba7df
PR remarks
kavirajk Aug 22, 2023
105f4df
remove unused `latestFormat`
kavirajk Aug 22, 2023
ba2d3ac
PR remarks
kavirajk Aug 22, 2023
dd4cbd9
Make chunk format selection `stream` based instead of `instance` based
kavirajk Aug 23, 2023
8a4ec25
Handle missing error
kavirajk Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 49 additions & 38 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,22 @@ var (
const DefaultTestHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt

func TestBlocksInclusive(t *testing.T) {
chk := NewMemChunk(ChunkFormatV3, EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
err := chk.Append(logprotoEntry(1, "1"))
require.Nil(t, err)
err = chk.cut()
require.Nil(t, err)
for _, enc := range testEncoding {
enc := enc
for _, format := range allPossibleFormats {
chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt
chk := NewMemChunk(chunkfmt, enc, headfmt, testBlockSize, testTargetSize)
err := chk.Append(logprotoEntry(1, "1"))
require.Nil(t, err)
err = chk.cut()
require.Nil(t, err)

blocks := chk.Blocks(time.Unix(0, 1), time.Unix(0, 1))
require.Equal(t, 1, len(blocks))
require.Equal(t, 1, blocks[0].Entries())
}
}

blocks := chk.Blocks(time.Unix(0, 1), time.Unix(0, 1))
require.Equal(t, 1, len(blocks))
require.Equal(t, 1, blocks[0].Entries())
}

func TestBlock(t *testing.T) {
Expand Down Expand Up @@ -240,33 +247,37 @@ func TestBlock(t *testing.T) {
func TestCorruptChunk(t *testing.T) {
for _, enc := range testEncoding {
enc := enc
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
for _, format := range allPossibleFormats {
chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt

chk := NewMemChunk(ChunkFormatV3, enc, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
cases := []struct {
data []byte
}{
// Data that should not decode as lines from a chunk in any encoding.
{data: []byte{0}},
{data: []byte{1}},
{data: []byte("asdfasdfasdfqwyteqwtyeq")},
}
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()

chk := NewMemChunk(chunkfmt, enc, headfmt, testBlockSize, testTargetSize)
cases := []struct {
data []byte
}{
// Data that should not decode as lines from a chunk in any encoding.
{data: []byte{0}},
{data: []byte{1}},
{data: []byte("asdfasdfasdfqwyteqwtyeq")},
}

ctx, start, end := context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64)
for i, c := range cases {
chk.blocks = []block{{b: c.data}}
it, err := chk.Iterator(ctx, start, end, logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err, "case %d", i)
ctx, start, end := context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64)
for i, c := range cases {
chk.blocks = []block{{b: c.data}}
it, err := chk.Iterator(ctx, start, end, logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err, "case %d", i)

idx := 0
for it.Next() {
idx++
idx := 0
for it.Next() {
idx++
}
require.Error(t, it.Error(), "case %d", i)
require.NoError(t, it.Close())
}
require.Error(t, it.Error(), "case %d", i)
require.NoError(t, it.Close())
}
})
})
}
}
}

Expand All @@ -275,7 +286,7 @@ func TestReadFormatV1(t *testing.T) {

c := NewMemChunk(ChunkFormatV3, EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
// overrides default v2 format
// overrides to v1 for testing that specific version.
c.format = ChunkFormatV1

b, err := c.Bytes()
Expand Down Expand Up @@ -369,14 +380,14 @@ func testNameWithFormats(enc Encoding, chunkFormat byte, headBlockFmt HeadBlockF
}

func TestRoundtripV3(t *testing.T) {
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
enc := enc
t.Run(fmt.Sprintf("%v-%v", f, enc), func(t *testing.T) {
for _, enc := range testEncoding {
enc := enc
for _, format := range allPossibleFormats {
chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt
t.Run(fmt.Sprintf("%v-%v", format, enc), func(t *testing.T) {
t.Parallel()

c := NewMemChunk(ChunkFormatV3, enc, f, testBlockSize, testTargetSize)
c.format = ChunkFormatV3
c := NewMemChunk(chunkfmt, enc, headfmt, testBlockSize, testTargetSize)
_ = fillChunk(c)

b, err := c.Bytes()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Test_EncodingChunks(t *testing.T) {
}
}

_, headfmt := defaultChunkFormat()
_, headfmt := defaultChunkFormat(t)

backAgain, err := fromWireChunks(&conf, headfmt, chunks)
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (r *ingesterRecoverer) Close() {
s.unorderedWrites = isAllowed

if !isAllowed && old {
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed))
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(s.chunkFormat, isAllowed))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headBlockType actually doesn't make sense to me because isAllowed here is always going to be false. Anyways, not going to block the PR on this.

if err != nil {
level.Warn(util_log.Logger).Log(
"msg", "error converting headblock",
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,11 @@ func (s *stream) resetCounter() {
s.entryCt = 0
}

func headBlockType(unorderedWrites bool) chunkenc.HeadBlockFmt {
func headBlockType(chunkfmt byte, unorderedWrites bool) chunkenc.HeadBlockFmt {
if unorderedWrites {
return chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt
if chunkfmt >= chunkenc.ChunkFormatV3 {
return chunkenc.ChunkHeadFormatFor(chunkfmt)
}
}
return chunkenc.OrderedHeadBlockFmt
}
32 changes: 20 additions & 12 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
cfg := defaultConfig()
cfg.MaxReturnedErrors = tc.limit

chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)
s := newStream(
chunkfmt,
headfmt,
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestPushDeduplication(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

s := newStream(
chunkfmt,
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestPushRejectOldCounter(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

s := newStream(
chunkfmt,
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestStreamIterator(t *testing.T) {
new func() *chunkenc.MemChunk
}{
{"gzipChunk", func() *chunkenc.MemChunk {
chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

return chunkenc.NewMemChunk(chunkfmt, chunkenc.EncGZIP, headfmt, 256*1024, 0)
}},
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

s := newStream(
chunkfmt,
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestUnorderedPush(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

s := newStream(
chunkfmt,
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestPushRateLimit(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

s := newStream(
chunkfmt,
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)

cfg := defaultConfig()
chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

s := newStream(
chunkfmt,
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) {

cfg := defaultConfig()
cfg.MaxChunkAge = time.Minute
chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

s := newStream(
chunkfmt,
Expand Down Expand Up @@ -521,7 +521,7 @@ func Benchmark_PushStream(b *testing.B) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(b)

s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10)
Expand All @@ -544,6 +544,14 @@ func Benchmark_PushStream(b *testing.B) {
}
}

func defaultChunkFormat() (byte, chunkenc.HeadBlockFmt) {
return chunkenc.ChunkFormatV4, chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt
func defaultChunkFormat(t testing.TB) (byte, chunkenc.HeadBlockFmt) {
t.Helper()

cfg := defaultPeriodConfigs[0]

chunkfmt, headfmt, err := cfg.ChunkFormat()

require.NoError(t, err)

return chunkfmt, headfmt
}
2 changes: 1 addition & 1 deletion pkg/ingester/streams_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestStreamsMap(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
chunkfmt, headfmt := defaultChunkFormat()
chunkfmt, headfmt := defaultChunkFormat(t)

ss := []*stream{
newStream(
Expand Down