Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
* [ENHANCEMENT] Ruler: Add DecodingConcurrency config flag for Thanos Engine. #7118
* [ENHANCEMENT] Compactor: Avoid double compaction by cleaning partition files in 2 cycles. #7129
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111

Expand Down
35 changes: 21 additions & 14 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
c.tenantCleanDuration.WithLabelValues(userID).Set(time.Since(startTime).Seconds())
}()

if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
begin := time.Now()
c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID)
level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}

// Migrate block deletion marks to the global markers location. This operation is a best-effort.
if firstRun && c.cfg.BlockDeletionMarksMigrationEnabled {
if err := bucketindex.MigrateBlockDeletionMarksToGlobalLocation(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
Expand Down Expand Up @@ -753,12 +759,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction))

if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
begin = time.Now()
c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID)
level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}
return nil
}

Expand Down Expand Up @@ -787,28 +787,35 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
}

for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {
isPartitionGroupInfoDeleted := false
partitionedGroupInfoFile := extraInfo.path

if extraInfo.status.CanDelete {
if extraInfo.status.IsCompleted {
// Try to remove all blocks included in partitioned group info
if err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID); err != nil {
deletedBlocksCount, err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID)
if err != nil {
level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
// if one block can not be marked for deletion, we should
// skip delete this partitioned group. next iteration
// would try it again.
continue
}
}

if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
if deletedBlocksCount > 0 {
level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
} else {
level.Info(userLogger).Log("msg", "deleting partition group now that all associated blocks have been deleted", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID)
if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
} else {
level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile)
isPartitionGroupInfoDeleted = true
}
}
}
}

if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker {
if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) {
// Remove partition visit markers
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
Expand Down
37 changes: 34 additions & 3 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,24 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

idx := &bucketindex.Index{
Blocks: bucketindex.Blocks{
{
ID: block1,
MinTime: startTime,
MaxTime: endTime,
Parquet: &parquet.ConverterMarkMeta{},
},
{
ID: block2,
MinTime: startTime,
MaxTime: endTime,
Parquet: &parquet.ConverterMarkMeta{},
},
},
}
err = bucketindex.WriteIndex(ctx, bucketClient, userID, nil, idx)
require.NoError(t, err)
userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)

partitionedGroupInfo := PartitionedGroupInfo{
Expand Down Expand Up @@ -955,11 +972,17 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
err = visitMarkerManager.updateVisitMarker(ctx)
require.NoError(t, err)

cleaner.cleanPartitionedGroupInfo(ctx, userBucket, logger, userID)
// first cleaning cycle deletes only the blocks
err = cleaner.cleanUser(ctx, logger, userBucket, userID, false)
require.NoError(t, err)

idx, err = bucketindex.ReadIndex(ctx, bucketClient, userID, cfgProvider, logger)
require.NoError(t, err)
require.Equal(t, []ulid.ULID{block1}, idx.BlockDeletionMarks.GetULIDs())

partitionedGroupFileExists, err := userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
require.NoError(t, err)
require.False(t, partitionedGroupFileExists)
require.True(t, partitionedGroupFileExists)

block1DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block1.String(), metadata.DeletionMarkFilename))
require.NoError(t, err)
Expand All @@ -968,6 +991,14 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename))
require.NoError(t, err)
require.False(t, block2DeletionMarkerExists)

// second cleaning cycle deletes the partition group info after all blocks are deleted
err = cleaner.cleanUser(ctx, logger, userBucket, userID, false)
require.NoError(t, err)

partitionedGroupFileExists, err = userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
require.NoError(t, err)
require.False(t, partitionedGroupFileExists)
}

func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/compactor/partitioned_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket
return noCompactMarkerExists
}

func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) error {
func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) {
blocks := p.getAllBlocks()
deleteBlocksCount := 0
defer func() {
Expand All @@ -244,13 +244,13 @@ func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, use
if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) {
if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil {
level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String())
return err
return deleteBlocksCount, err
}
deleteBlocksCount++
level.Debug(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String())
}
}
return nil
return deleteBlocksCount, nil
}

func (p *PartitionedGroupInfo) String() string {
Expand Down
Loading