From 1968b800dfb639d27df971d6a7750ad9102a7ef9 Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Mon, 3 Nov 2025 14:50:24 -0800 Subject: [PATCH 1/4] Fix metric name validation to use correct validation scheme method (#7087) Signed-off-by: Anna Tran --- CHANGELOG.md | 1 + pkg/util/validation/validate.go | 2 +- pkg/util/validation/validate_test.go | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cda9554056..b3fbd687d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -109,6 +109,7 @@ * [BUGFIX] Distributor: Fix `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026 * [BUGFIX] Runtime-config: Fix panic when the runtime config is `null`. #7062 * [BUGFIX] Scheduler: Avoid all queriers reserved for prioritized requests. #7057 +* [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086 ## 1.19.1 2025-09-20 diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 436cf3dfab..42f7e75c37 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -285,7 +285,7 @@ func ValidateLabels(validateMetrics *ValidateMetrics, limits *Limits, userID str return newNoMetricNameError() } - if !nameValidationScheme.IsValidLabelName(unsafeMetricName) { + if !nameValidationScheme.IsValidMetricName(unsafeMetricName) { validateMetrics.DiscardedSamples.WithLabelValues(invalidMetricName, userID).Inc() return newInvalidMetricNameError(unsafeMetricName) } diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index 861934eb66..65e0bdee8f 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -134,7 +134,7 @@ func TestValidateLabels(t *testing.T) { }, "foo "), }, { - map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid"}, + map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid:name"}, false, nil, }, @@ -201,7 +201,7 @@ func TestValidateLabels(t *testing.T) { # HELP cortex_label_size_bytes The combined size in bytes of all labels and label values for a time series. # TYPE cortex_label_size_bytes histogram cortex_label_size_bytes_bucket{user="testUser",le="+Inf"} 3 - cortex_label_size_bytes_sum{user="testUser"} 148 + cortex_label_size_bytes_sum{user="testUser"} 153 cortex_label_size_bytes_count{user="testUser"} 3 `), "cortex_label_size_bytes")) From 6338c4f2578399af5975b855b81be8e60e0ffcbb Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 3 Nov 2025 23:52:12 -0800 Subject: [PATCH 2/4] bump to 1.20.0-rc.1 Signed-off-by: yeya24 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 457059eed6..586f46b830 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.20.0-rc.0 +1.20.0-rc.1 From 8ec67622c515a04d6e7e69105ad90ab2258d7129 Mon Sep 17 00:00:00 2001 From: Daniel Blando Date: Fri, 31 Oct 2025 15:47:51 -0700 Subject: [PATCH 3/4] Fix visit marker race condition (#7082) --- CHANGELOG.md | 3 + pkg/compactor/compactor_paritioning_test.go | 173 +++++++++++++++++- pkg/compactor/partition_compaction_grouper.go | 13 ++ pkg/compactor/partition_compaction_planner.go | 28 +++ .../partition_compaction_planner_test.go | 24 ++- 5 files changed, 236 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3fbd687d7..94b959dd07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## master / unreleased +* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 + + ## 1.20.0 in progress * [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603 diff --git a/pkg/compactor/compactor_paritioning_test.go b/pkg/compactor/compactor_paritioning_test.go index 593e94d2ae..6904faeb85 100644 --- a/pkg/compactor/compactor_paritioning_test.go +++ b/pkg/compactor/compactor_paritioning_test.go @@ -1299,7 +1299,24 @@ func TestPartitionCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingE // Get all of the unique group hashes so that they can be used to ensure all groups were compacted groupHashes[groupHash]++ - bucketClient.MockGet(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", "", nil) + + // Create mock partitioned group info for the new validation check + partitionedGroupInfo := PartitionedGroupInfo{ + PartitionedGroupID: groupHash, + PartitionCount: 1, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ulid.MustParse(blockID)}, + }, + }, + RangeStart: blockTimes["startTime"], + RangeEnd: blockTimes["endTime"], + CreationTime: time.Now().Unix(), + Version: PartitionedGroupInfoVersion1, + } + partitionedGroupInfoContent, _ := json.Marshal(partitionedGroupInfo) + bucketClient.MockGet(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", string(partitionedGroupInfoContent), nil) bucketClient.MockUpload(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", nil) } @@ -1826,3 +1843,157 @@ func TestPartitionCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFrom require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) } + +func TestPartitionCompactionRaceCondition(t *testing.T) { + t.Run("planner_detects_missing_partition_group", func(t *testing.T) { + setup := newRaceConditionTestSetup(12345) + + // Create a planner that will try to process blocks but find missing partition group + planner := setup.createPlanner() + cortexMetaExtensions := setup.createCortexMetaExtensions(time.Now().Unix()) + metasByMinTime := setup.createTestMetadata() + + result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil) + + require.Error(t, err, "Planner should fail when partition group is missing") + require.Nil(t, result, "Should not return any result when partition group is missing") + require.ErrorIs(t, err, plannerCompletedPartitionError, "Error should be completed partition error when partition group is missing") + }) + + t.Run("planner_detects_creation_time_mismatch", func(t *testing.T) { + setup := newRaceConditionTestSetup(54321) + originalCreationTime := time.Now().Unix() + + // Create initial partition group + partitionedGroupInfo := setup.createPartitionedGroupInfo(originalCreationTime) + _, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo) + require.NoError(t, err) + + // Simulate cleaner deleting partition group + partitionGroupFile := GetPartitionedGroupFile(setup.partitionedGroupID) + err = setup.bucket.Delete(setup.ctx, partitionGroupFile) + require.NoError(t, err) + + // Create new partition group with same ID but different creation time + newCreationTime := time.Now().Unix() + 200 + newPartitionedGroupInfo := setup.createPartitionedGroupInfo(newCreationTime) + _, err = UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *newPartitionedGroupInfo) + require.NoError(t, err) + + // Test planner creation time validation + planner := setup.createPlanner() + cortexMetaExtensions := setup.createCortexMetaExtensions(originalCreationTime) // OLD creation time + metasByMinTime := setup.createTestMetadata() + + result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil) + + require.Error(t, err, "Planner should detect creation time mismatch") + require.ErrorIs(t, err, plannerCompletedPartitionError, "Should abort with completed partition error") + require.Nil(t, result, "Should not return any result when aborting") + }) + + t.Run("normal_operation_with_matching_creation_time", func(t *testing.T) { + setup := newRaceConditionTestSetup(99999) + creationTime := time.Now().Unix() + + // Create partition group + partitionedGroupInfo := setup.createPartitionedGroupInfo(creationTime) + _, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo) + require.NoError(t, err) + + // Create planner and test with matching creation time + planner := setup.createPlanner() + cortexMetaExtensions := setup.createCortexMetaExtensions(creationTime) // MATCHING creation time + metasByMinTime := setup.createTestMetadata() + + result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil) + + require.NoError(t, err, "Should not fail when creation times match") + require.NotNil(t, result, "Should return result when creation times match") + }) +} + +// raceConditionTestSetup provides common setup for race condition tests +type raceConditionTestSetup struct { + ctx context.Context + logger log.Logger + bucket objstore.InstrumentedBucket + userID string + partitionedGroupID uint32 + partitionID int + partitionCount int + ranges []int64 + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark +} + +func newRaceConditionTestSetup(partitionedGroupID uint32) *raceConditionTestSetup { + return &raceConditionTestSetup{ + ctx: context.Background(), + logger: log.NewNopLogger(), + bucket: objstore.WithNoopInstr(objstore.NewInMemBucket()), + userID: "test-user", + partitionedGroupID: partitionedGroupID, + partitionID: 0, + partitionCount: 2, + ranges: []int64{2 * 60 * 60 * 1000}, // 2 hours in milliseconds + noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark { return nil }, + } +} + +func (s *raceConditionTestSetup) createPartitionedGroupInfo(creationTime int64) *PartitionedGroupInfo { + return &PartitionedGroupInfo{ + PartitionedGroupID: s.partitionedGroupID, + PartitionCount: s.partitionCount, + Partitions: []Partition{ + {PartitionID: 0, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}}, + {PartitionID: 1, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}}, + }, + RangeStart: 0, + RangeEnd: 2 * 60 * 60 * 1000, + CreationTime: creationTime, + Version: PartitionedGroupInfoVersion1, + } +} + +func (s *raceConditionTestSetup) createPlanner() *PartitionCompactionPlanner { + // Use the same metrics pattern as other tests + registerer := prometheus.NewPedanticRegistry() + metrics := newCompactorMetrics(registerer) + + return NewPartitionCompactionPlanner( + s.ctx, + s.bucket, + s.logger, + s.ranges, + s.noCompBlocksFunc, + "test-compactor", + s.userID, + time.Second, + 10*time.Minute, + time.Minute, + metrics, + ) +} + +func (s *raceConditionTestSetup) createCortexMetaExtensions(creationTime int64) *cortex_tsdb.CortexMetaExtensions { + return &cortex_tsdb.CortexMetaExtensions{ + PartitionInfo: &cortex_tsdb.PartitionInfo{ + PartitionedGroupID: s.partitionedGroupID, + PartitionCount: s.partitionCount, + PartitionID: s.partitionID, + PartitionedGroupCreationTime: creationTime, + }, + } +} + +func (s *raceConditionTestSetup) createTestMetadata() []*metadata.Meta { + return []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: ulid.MustNew(ulid.Now(), nil), + MinTime: 0, + MaxTime: 2 * 60 * 60 * 1000, + }, + }, + } +} diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 2d308fb636..4f1d955bfc 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -639,6 +639,19 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition is visited") continue } + + // Validate that the partition group still exists before creating a visit marker + // This prevents the race condition where the cleaner deletes the partition group + // between the visit marker check and the visit marker creation + if _, err := ReadPartitionedGroupInfo(g.ctx, g.bkt, g.logger, partitionedGroupID); err != nil { + if errors.Is(err, ErrorPartitionedGroupInfoNotFound) { + level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition group was deleted by cleaner", "partitioned_group_id", partitionedGroupID) + } else { + level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition group info", "err", err, "partitioned_group_id", partitionedGroupID) + } + continue + } + partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup) level.Info(partitionedGroupLogger).Log("msg", "found compactable group for user", "group", partitionedGroup.String()) diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go index d9750a86e4..566a62e87e 100644 --- a/pkg/compactor/partition_compaction_planner.go +++ b/pkg/compactor/partition_compaction_planner.go @@ -109,6 +109,34 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB } } + // Double-check that the partition group still exists and is the same one we started with + // to prevent race condition with cleaner. If the cleaner deleted the partition group + // after we created the visit marker in the grouper, we should abort the compaction + // to avoid orphaned visit markers. + currentPartitionedGroupInfo, err := ReadPartitionedGroupInfo(p.ctx, p.bkt, p.logger, partitionedGroupID) + if err != nil { + if errors.Is(err, ErrorPartitionedGroupInfoNotFound) { + level.Warn(p.logger).Log("msg", "partition group was deleted by cleaner, aborting compaction", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID) + return nil, plannerCompletedPartitionError + } else { + level.Warn(p.logger).Log("msg", "unable to read partition group info during planning", "err", err, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID) + return nil, fmt.Errorf("unable to read partition group info for partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error()) + } + } + + // Verify that this is the same partition group that the grouper created the visit marker for + // by comparing creation times. If they don't match, it means the cleaner deleted the old + // partition group and a new one was created with the same ID. + expectedCreationTime := partitionInfo.PartitionedGroupCreationTime + if currentPartitionedGroupInfo.CreationTime != expectedCreationTime { + level.Warn(p.logger).Log("msg", "partition group creation time mismatch, cleaner deleted old group and new one was created, aborting compaction", + "partitioned_group_id", partitionedGroupID, + "partition_id", partitionID, + "expected_creation_time", expectedCreationTime, + "current_creation_time", currentPartitionedGroupInfo.CreationTime) + return nil, plannerCompletedPartitionError + } + // Ensure all blocks fits within the largest range. This is a double check // to ensure there's no bug in the previous blocks grouping, given this Plan() // is just a pass-through. diff --git a/pkg/compactor/partition_compaction_planner_test.go b/pkg/compactor/partition_compaction_planner_test.go index 2f62937dbc..a4659ff2a0 100644 --- a/pkg/compactor/partition_compaction_planner_test.go +++ b/pkg/compactor/partition_compaction_planner_test.go @@ -289,9 +289,24 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) { Version: PartitionVisitMarkerVersion1, } visitMarkerFileContent, _ := json.Marshal(visitMarker) + // Mock partition group info for race condition fix + partitionedGroupInfo := PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 1, + Partitions: []Partition{ + {PartitionID: partitionID, Blocks: []ulid.ULID{}}, + }, + RangeStart: 0, + RangeEnd: 2 * time.Hour.Milliseconds(), + CreationTime: time.Now().Unix(), + Version: PartitionedGroupInfoVersion1, + } + partitionedGroupContent, _ := json.Marshal(partitionedGroupInfo) + partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) + bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil) + bkt.MockGet(partitionedGroupFile, string(partitionedGroupContent), nil) bkt.MockUpload(mock.Anything, nil) - bkt.MockGet(mock.Anything, "", nil) registerer := prometheus.NewPedanticRegistry() @@ -316,9 +331,10 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) { ) actual, err := p.Plan(context.Background(), testData.blocks, nil, &cortextsdb.CortexMetaExtensions{ PartitionInfo: &cortextsdb.PartitionInfo{ - PartitionCount: 1, - PartitionID: partitionID, - PartitionedGroupID: partitionedGroupID, + PartitionCount: 1, + PartitionID: partitionID, + PartitionedGroupID: partitionedGroupID, + PartitionedGroupCreationTime: partitionedGroupInfo.CreationTime, }, }) From cfea42afe87e6c0066e3f6415bf5b753f5bf0112 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 4 Nov 2025 11:30:48 -0800 Subject: [PATCH 4/4] update changelog Signed-off-by: yeya24 --- CHANGELOG.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94b959dd07..41e115de02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,6 @@ ## master / unreleased -* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 - - ## 1.20.0 in progress * [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603 @@ -113,6 +110,7 @@ * [BUGFIX] Runtime-config: Fix panic when the runtime config is `null`. #7062 * [BUGFIX] Scheduler: Avoid all queriers reserved for prioritized requests. #7057 * [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086 +* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082 ## 1.19.1 2025-09-20