Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CLE changes #13033

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableN

builder.chunksFinalized = true

return newCompactedIndex(ctx, tableName, userID, workingDir, periodConfig, builder), nil
return NewCompactedIndex(ctx, tableName, userID, workingDir, periodConfig, builder), nil
}

type tableCompactor struct {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (t *tableCompactor) CompactTable() error {
return err
}

compactedIndex := newCompactedIndex(t.ctx, existingUserIndexSet.GetTableName(), userID, existingUserIndexSet.GetWorkingDir(), t.periodConfig, builder)
compactedIndex := NewCompactedIndex(t.ctx, existingUserIndexSet.GetTableName(), userID, existingUserIndexSet.GetWorkingDir(), t.periodConfig, builder)
t.compactedIndexes[userID] = compactedIndex

if err := existingUserIndexSet.SetCompactedIndex(compactedIndex, true); err != nil {
Expand All @@ -190,7 +190,7 @@ func (t *tableCompactor) CompactTable() error {
return err
}

compactedIndex := newCompactedIndex(t.ctx, srcIdxSet.GetTableName(), userID, srcIdxSet.GetWorkingDir(), t.periodConfig, builder)
compactedIndex := NewCompactedIndex(t.ctx, srcIdxSet.GetTableName(), userID, srcIdxSet.GetWorkingDir(), t.periodConfig, builder)
t.compactedIndexes[userID] = compactedIndex
if err := srcIdxSet.SetCompactedIndex(compactedIndex, true); err != nil {
return err
Expand Down Expand Up @@ -261,7 +261,7 @@ func setupBuilder(ctx context.Context, indexType int, userID string, sourceIndex
return builder, nil
}

type compactedIndex struct {
type CompactedIndex struct {
ctx context.Context
userID string
builder *Builder
Expand All @@ -274,8 +274,8 @@ type compactedIndex struct {
seriesToCleanup map[string]struct{}
}

func newCompactedIndex(ctx context.Context, tableName, userID, workingDir string, periodConfig config.PeriodConfig, builder *Builder) *compactedIndex {
return &compactedIndex{
func NewCompactedIndex(ctx context.Context, tableName, userID, workingDir string, periodConfig config.PeriodConfig, builder *Builder) *CompactedIndex {
return &CompactedIndex{
ctx: ctx,
userID: userID,
builder: builder,
Expand All @@ -288,7 +288,7 @@ func newCompactedIndex(ctx context.Context, tableName, userID, workingDir string
}

// ForEachChunk iterates over all the chunks in the builder and calls the callback function.
func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error {
func (c *CompactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{c.periodConfig},
}
Expand Down Expand Up @@ -333,7 +333,7 @@ func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.Ch

// IndexChunk adds the chunk to the list of chunks to index.
// Before accepting the chunk it checks if it falls within the tableInterval and rejects it if not.
func (c *compactedIndex) IndexChunk(chk chunk.Chunk) (bool, error) {
func (c *CompactedIndex) IndexChunk(chk chunk.Chunk) (bool, error) {
if chk.From > c.tableInterval.End || c.tableInterval.Start > chk.Through {
return false, nil
}
Expand All @@ -344,7 +344,7 @@ func (c *compactedIndex) IndexChunk(chk chunk.Chunk) (bool, error) {
}

// CleanupSeries removes the series from the builder(including its chunks) and deletes the list of chunks lined up for deletion.
func (c *compactedIndex) CleanupSeries(_ []byte, lbls labels.Labels) error {
func (c *CompactedIndex) CleanupSeries(_ []byte, lbls labels.Labels) error {
seriesID := lbls.String()
if _, ok := c.builder.streams[seriesID]; !ok {
return fmt.Errorf("series cleanup not allowed on non-existing series %s", seriesID)
Expand All @@ -354,11 +354,11 @@ func (c *compactedIndex) CleanupSeries(_ []byte, lbls labels.Labels) error {
return nil
}

func (c *compactedIndex) Cleanup() {}
func (c *CompactedIndex) Cleanup() {}

// ToIndexFile creates an indexFile from the chunksmetas stored in the builder.
// Before building the index, it takes care of the lined up updates i.e deletes and adding of new chunks.
func (c *compactedIndex) ToIndexFile() (shipperindex.Index, error) {
func (c *CompactedIndex) ToIndexFile() (shipperindex.Index, error) {
for seriesID, chks := range c.deleteChunks {
for _, chk := range chks {
chunkFound, err := c.builder.DropChunk(seriesID, chk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func TestCompactor_Compact(t *testing.T) {

// verify the chunkmetas in the builder
actualChunks := map[string]index.ChunkMetas{}
for seriesID, stream := range initializedIndexSets[userID].(*mockIndexSet).compactedIndex.(*compactedIndex).builder.streams {
for seriesID, stream := range initializedIndexSets[userID].(*mockIndexSet).compactedIndex.(*CompactedIndex).builder.streams {
actualChunks[seriesID] = stream.chunks
}

Expand Down Expand Up @@ -859,7 +859,7 @@ type testContext struct {
userID string
tableInterval model.Interval
shiftTableStart func(ms int64) int64
buildCompactedIndex func() *compactedIndex
buildCompactedIndex func() *CompactedIndex
expectedChunkEntries map[string][]retention.ChunkEntry
}

Expand Down Expand Up @@ -888,7 +888,7 @@ func setupCompactedIndex(t *testing.T) *testContext {
lbls2 := mustParseLabels(`{fizz="buzz", a="b"}`)
userID := buildUserID(0)

buildCompactedIndex := func() *compactedIndex {
buildCompactedIndex := func() *CompactedIndex {
indexFormat, err := periodConfig.TSDBFormat()
require.NoError(t, err)
builder := NewBuilder(indexFormat)
Expand All @@ -900,7 +900,7 @@ func setupCompactedIndex(t *testing.T) *testContext {

builder.FinalizeChunks()

return newCompactedIndex(context.Background(), tableName.prefix, buildUserID(0), t.TempDir(), periodConfig, builder)
return NewCompactedIndex(context.Background(), tableName.prefix, buildUserID(0), t.TempDir(), periodConfig, builder)
}

expectedChunkEntries := map[string][]retention.ChunkEntry{
Expand Down
Loading