diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fb31738690..e75ed629180 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 * [ENHANCEMENT] Ruler: Add DecodingConcurrency config flag for Thanos Engine. #7118 +* [ENHANCEMENT] Compactor: Avoid double compaction by cleaning partition files in 2 cycles. #7129 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 8e1e9a60551..4a265e35b72 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -609,6 +609,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us c.tenantCleanDuration.WithLabelValues(userID).Set(time.Since(startTime).Seconds()) }() + if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { + begin := time.Now() + c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID) + level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + } + // Migrate block deletion marks to the global markers location. This operation is a best-effort. if firstRun && c.cfg.BlockDeletionMarksMigrationEnabled { if err := bucketindex.MigrateBlockDeletionMarksToGlobalLocation(ctx, c.bucketClient, userID, c.cfgProvider); err != nil { @@ -753,12 +759,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) } c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction)) - - if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning { - begin = time.Now() - c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID) - level.Info(userLogger).Log("msg", "finish cleaning partitioned group info files", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) - } return nil } @@ -787,28 +787,35 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke } for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo { + isPartitionGroupInfoDeleted := false partitionedGroupInfoFile := extraInfo.path if extraInfo.status.CanDelete { if extraInfo.status.IsCompleted { // Try to remove all blocks included in partitioned group info - if err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID); err != nil { + deletedBlocksCount, err := partitionedGroupInfo.markAllBlocksForDeletion(ctx, userBucket, userLogger, c.blocksMarkedForDeletion, userID) + if err != nil { level.Warn(userLogger).Log("msg", "unable to mark all blocks in partitioned group info for deletion", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) // if one block can not be marked for deletion, we should // skip delete this partitioned group. next iteration // would try it again. continue } - } - - if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { - level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) - } else { - level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) + if deletedBlocksCount > 0 { + level.Info(userLogger).Log("msg", "parent blocks deleted, will delete partition group file in next cleaning cycle", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + } else { + level.Info(userLogger).Log("msg", "deleting partition group now that all associated blocks have been deleted", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { + level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + } else { + level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) + isPartitionGroupInfoDeleted = true + } + } } } - if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker { + if isPartitionGroupInfoDeleted && (extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker) { // Remove partition visit markers if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil { level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err) diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 9b13d7c1b91..8eb09c99a76 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -925,7 +925,24 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) - + idx := &bucketindex.Index{ + Blocks: bucketindex.Blocks{ + { + ID: block1, + MinTime: startTime, + MaxTime: endTime, + Parquet: &parquet.ConverterMarkMeta{}, + }, + { + ID: block2, + MinTime: startTime, + MaxTime: endTime, + Parquet: &parquet.ConverterMarkMeta{}, + }, + }, + } + err = bucketindex.WriteIndex(ctx, bucketClient, userID, nil, idx) + require.NoError(t, err) userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider) partitionedGroupInfo := PartitionedGroupInfo{ @@ -955,11 +972,17 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { err = visitMarkerManager.updateVisitMarker(ctx) require.NoError(t, err) - cleaner.cleanPartitionedGroupInfo(ctx, userBucket, logger, userID) + // first cleaning cycle deletes only the blocks + err = cleaner.cleanUser(ctx, logger, userBucket, userID, false) + require.NoError(t, err) + + idx, err = bucketindex.ReadIndex(ctx, bucketClient, userID, cfgProvider, logger) + require.NoError(t, err) + require.Equal(t, []ulid.ULID{block1}, idx.BlockDeletionMarks.GetULIDs()) partitionedGroupFileExists, err := userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID)) require.NoError(t, err) - require.False(t, partitionedGroupFileExists) + require.True(t, partitionedGroupFileExists) block1DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block1.String(), metadata.DeletionMarkFilename)) require.NoError(t, err) @@ -968,6 +991,14 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { block2DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block2.String(), metadata.DeletionMarkFilename)) require.NoError(t, err) require.False(t, block2DeletionMarkerExists) + + // second cleaning cycle deletes the partition group info after all blocks are deleted + err = cleaner.cleanUser(ctx, logger, userBucket, userID, false) + require.NoError(t, err) + + partitionedGroupFileExists, err = userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID)) + require.NoError(t, err) + require.False(t, partitionedGroupFileExists) } func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) { diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index f783fcd2052..9d9d1fd7859 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -234,7 +234,7 @@ func (p *PartitionedGroupInfo) isBlockNoCompact(ctx context.Context, userBucket return noCompactMarkerExists } -func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) error { +func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blocksMarkedForDeletion *prometheus.CounterVec, userID string) (int, error) { blocks := p.getAllBlocks() deleteBlocksCount := 0 defer func() { @@ -244,13 +244,13 @@ func (p *PartitionedGroupInfo) markAllBlocksForDeletion(ctx context.Context, use if p.doesBlockExist(ctx, userBucket, userLogger, blockID) && !p.isBlockDeleted(ctx, userBucket, userLogger, blockID) && !p.isBlockNoCompact(ctx, userBucket, userLogger, blockID) { if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) - return err + return deleteBlocksCount, err } deleteBlocksCount++ level.Debug(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", p.PartitionedGroupID, "block", blockID.String()) } } - return nil + return deleteBlocksCount, nil } func (p *PartitionedGroupInfo) String() string {