Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
* [BUGFIX] Distributor: Fix `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026
* [BUGFIX] Runtime-config: Fix panic when the runtime config is `null`. #7062
* [BUGFIX] Scheduler: Avoid all queriers reserved for prioritized requests. #7057
* [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082

## 1.19.1 2025-09-20

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.20.0-rc.0
1.20.0-rc.1
173 changes: 172 additions & 1 deletion pkg/compactor/compactor_paritioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,24 @@ func TestPartitionCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingE

// Get all of the unique group hashes so that they can be used to ensure all groups were compacted
groupHashes[groupHash]++
bucketClient.MockGet(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", "", nil)

// Create mock partitioned group info for the new validation check
partitionedGroupInfo := PartitionedGroupInfo{
PartitionedGroupID: groupHash,
PartitionCount: 1,
Partitions: []Partition{
{
PartitionID: 0,
Blocks: []ulid.ULID{ulid.MustParse(blockID)},
},
},
RangeStart: blockTimes["startTime"],
RangeEnd: blockTimes["endTime"],
CreationTime: time.Now().Unix(),
Version: PartitionedGroupInfoVersion1,
}
partitionedGroupInfoContent, _ := json.Marshal(partitionedGroupInfo)
bucketClient.MockGet(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", string(partitionedGroupInfoContent), nil)
bucketClient.MockUpload(userID+"/partitioned-groups/"+fmt.Sprint(groupHash)+".json", nil)
}

Expand Down Expand Up @@ -1826,3 +1843,157 @@ func TestPartitionCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFrom

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
}

func TestPartitionCompactionRaceCondition(t *testing.T) {
t.Run("planner_detects_missing_partition_group", func(t *testing.T) {
setup := newRaceConditionTestSetup(12345)

// Create a planner that will try to process blocks but find missing partition group
planner := setup.createPlanner()
cortexMetaExtensions := setup.createCortexMetaExtensions(time.Now().Unix())
metasByMinTime := setup.createTestMetadata()

result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)

require.Error(t, err, "Planner should fail when partition group is missing")
require.Nil(t, result, "Should not return any result when partition group is missing")
require.ErrorIs(t, err, plannerCompletedPartitionError, "Error should be completed partition error when partition group is missing")
})

t.Run("planner_detects_creation_time_mismatch", func(t *testing.T) {
setup := newRaceConditionTestSetup(54321)
originalCreationTime := time.Now().Unix()

// Create initial partition group
partitionedGroupInfo := setup.createPartitionedGroupInfo(originalCreationTime)
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
require.NoError(t, err)

// Simulate cleaner deleting partition group
partitionGroupFile := GetPartitionedGroupFile(setup.partitionedGroupID)
err = setup.bucket.Delete(setup.ctx, partitionGroupFile)
require.NoError(t, err)

// Create new partition group with same ID but different creation time
newCreationTime := time.Now().Unix() + 200
newPartitionedGroupInfo := setup.createPartitionedGroupInfo(newCreationTime)
_, err = UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *newPartitionedGroupInfo)
require.NoError(t, err)

// Test planner creation time validation
planner := setup.createPlanner()
cortexMetaExtensions := setup.createCortexMetaExtensions(originalCreationTime) // OLD creation time
metasByMinTime := setup.createTestMetadata()

result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)

require.Error(t, err, "Planner should detect creation time mismatch")
require.ErrorIs(t, err, plannerCompletedPartitionError, "Should abort with completed partition error")
require.Nil(t, result, "Should not return any result when aborting")
})

t.Run("normal_operation_with_matching_creation_time", func(t *testing.T) {
setup := newRaceConditionTestSetup(99999)
creationTime := time.Now().Unix()

// Create partition group
partitionedGroupInfo := setup.createPartitionedGroupInfo(creationTime)
_, err := UpdatePartitionedGroupInfo(setup.ctx, setup.bucket, setup.logger, *partitionedGroupInfo)
require.NoError(t, err)

// Create planner and test with matching creation time
planner := setup.createPlanner()
cortexMetaExtensions := setup.createCortexMetaExtensions(creationTime) // MATCHING creation time
metasByMinTime := setup.createTestMetadata()

result, err := planner.PlanWithPartition(setup.ctx, metasByMinTime, cortexMetaExtensions, nil)

require.NoError(t, err, "Should not fail when creation times match")
require.NotNil(t, result, "Should return result when creation times match")
})
}

// raceConditionTestSetup provides common setup for race condition tests
type raceConditionTestSetup struct {
ctx context.Context
logger log.Logger
bucket objstore.InstrumentedBucket
userID string
partitionedGroupID uint32
partitionID int
partitionCount int
ranges []int64
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
}

func newRaceConditionTestSetup(partitionedGroupID uint32) *raceConditionTestSetup {
return &raceConditionTestSetup{
ctx: context.Background(),
logger: log.NewNopLogger(),
bucket: objstore.WithNoopInstr(objstore.NewInMemBucket()),
userID: "test-user",
partitionedGroupID: partitionedGroupID,
partitionID: 0,
partitionCount: 2,
ranges: []int64{2 * 60 * 60 * 1000}, // 2 hours in milliseconds
noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark { return nil },
}
}

func (s *raceConditionTestSetup) createPartitionedGroupInfo(creationTime int64) *PartitionedGroupInfo {
return &PartitionedGroupInfo{
PartitionedGroupID: s.partitionedGroupID,
PartitionCount: s.partitionCount,
Partitions: []Partition{
{PartitionID: 0, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
{PartitionID: 1, Blocks: []ulid.ULID{ulid.MustNew(ulid.Now(), nil)}},
},
RangeStart: 0,
RangeEnd: 2 * 60 * 60 * 1000,
CreationTime: creationTime,
Version: PartitionedGroupInfoVersion1,
}
}

func (s *raceConditionTestSetup) createPlanner() *PartitionCompactionPlanner {
// Use the same metrics pattern as other tests
registerer := prometheus.NewPedanticRegistry()
metrics := newCompactorMetrics(registerer)

return NewPartitionCompactionPlanner(
s.ctx,
s.bucket,
s.logger,
s.ranges,
s.noCompBlocksFunc,
"test-compactor",
s.userID,
time.Second,
10*time.Minute,
time.Minute,
metrics,
)
}

func (s *raceConditionTestSetup) createCortexMetaExtensions(creationTime int64) *cortex_tsdb.CortexMetaExtensions {
return &cortex_tsdb.CortexMetaExtensions{
PartitionInfo: &cortex_tsdb.PartitionInfo{
PartitionedGroupID: s.partitionedGroupID,
PartitionCount: s.partitionCount,
PartitionID: s.partitionID,
PartitionedGroupCreationTime: creationTime,
},
}
}

func (s *raceConditionTestSetup) createTestMetadata() []*metadata.Meta {
return []*metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(ulid.Now(), nil),
MinTime: 0,
MaxTime: 2 * 60 * 60 * 1000,
},
},
}
}
13 changes: 13 additions & 0 deletions pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,19 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition is visited")
continue
}

// Validate that the partition group still exists before creating a visit marker
// This prevents the race condition where the cleaner deletes the partition group
// between the visit marker check and the visit marker creation
if _, err := ReadPartitionedGroupInfo(g.ctx, g.bkt, g.logger, partitionedGroupID); err != nil {
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
level.Info(partitionedGroupLogger).Log("msg", "skipping group because partition group was deleted by cleaner", "partitioned_group_id", partitionedGroupID)
} else {
level.Warn(partitionedGroupLogger).Log("msg", "unable to read partition group info", "err", err, "partitioned_group_id", partitionedGroupID)
}
continue
}

partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup)

level.Info(partitionedGroupLogger).Log("msg", "found compactable group for user", "group", partitionedGroup.String())
Expand Down
28 changes: 28 additions & 0 deletions pkg/compactor/partition_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,34 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
}
}

// Double-check that the partition group still exists and is the same one we started with
// to prevent race condition with cleaner. If the cleaner deleted the partition group
// after we created the visit marker in the grouper, we should abort the compaction
// to avoid orphaned visit markers.
currentPartitionedGroupInfo, err := ReadPartitionedGroupInfo(p.ctx, p.bkt, p.logger, partitionedGroupID)
if err != nil {
if errors.Is(err, ErrorPartitionedGroupInfoNotFound) {
level.Warn(p.logger).Log("msg", "partition group was deleted by cleaner, aborting compaction", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
return nil, plannerCompletedPartitionError
} else {
level.Warn(p.logger).Log("msg", "unable to read partition group info during planning", "err", err, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID)
return nil, fmt.Errorf("unable to read partition group info for partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error())
}
}

// Verify that this is the same partition group that the grouper created the visit marker for
// by comparing creation times. If they don't match, it means the cleaner deleted the old
// partition group and a new one was created with the same ID.
expectedCreationTime := partitionInfo.PartitionedGroupCreationTime
if currentPartitionedGroupInfo.CreationTime != expectedCreationTime {
level.Warn(p.logger).Log("msg", "partition group creation time mismatch, cleaner deleted old group and new one was created, aborting compaction",
"partitioned_group_id", partitionedGroupID,
"partition_id", partitionID,
"expected_creation_time", expectedCreationTime,
"current_creation_time", currentPartitionedGroupInfo.CreationTime)
return nil, plannerCompletedPartitionError
}

// Ensure all blocks fits within the largest range. This is a double check
// to ensure there's no bug in the previous blocks grouping, given this Plan()
// is just a pass-through.
Expand Down
24 changes: 20 additions & 4 deletions pkg/compactor/partition_compaction_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,24 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) {
Version: PartitionVisitMarkerVersion1,
}
visitMarkerFileContent, _ := json.Marshal(visitMarker)
// Mock partition group info for race condition fix
partitionedGroupInfo := PartitionedGroupInfo{
PartitionedGroupID: partitionedGroupID,
PartitionCount: 1,
Partitions: []Partition{
{PartitionID: partitionID, Blocks: []ulid.ULID{}},
},
RangeStart: 0,
RangeEnd: 2 * time.Hour.Milliseconds(),
CreationTime: time.Now().Unix(),
Version: PartitionedGroupInfoVersion1,
}
partitionedGroupContent, _ := json.Marshal(partitionedGroupInfo)
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID)

bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil)
bkt.MockGet(partitionedGroupFile, string(partitionedGroupContent), nil)
bkt.MockUpload(mock.Anything, nil)
bkt.MockGet(mock.Anything, "", nil)

registerer := prometheus.NewPedanticRegistry()

Expand All @@ -316,9 +331,10 @@ func TestPartitionCompactionPlanner_Plan(t *testing.T) {
)
actual, err := p.Plan(context.Background(), testData.blocks, nil, &cortextsdb.CortexMetaExtensions{
PartitionInfo: &cortextsdb.PartitionInfo{
PartitionCount: 1,
PartitionID: partitionID,
PartitionedGroupID: partitionedGroupID,
PartitionCount: 1,
PartitionID: partitionID,
PartitionedGroupID: partitionedGroupID,
PartitionedGroupCreationTime: partitionedGroupInfo.CreationTime,
},
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func ValidateLabels(validateMetrics *ValidateMetrics, limits *Limits, userID str
return newNoMetricNameError()
}

if !nameValidationScheme.IsValidLabelName(unsafeMetricName) {
if !nameValidationScheme.IsValidMetricName(unsafeMetricName) {
validateMetrics.DiscardedSamples.WithLabelValues(invalidMetricName, userID).Inc()
return newInvalidMetricNameError(unsafeMetricName)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/validation/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestValidateLabels(t *testing.T) {
}, "foo "),
},
{
map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid"},
map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid:name"},
false,
nil,
},
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestValidateLabels(t *testing.T) {
# HELP cortex_label_size_bytes The combined size in bytes of all labels and label values for a time series.
# TYPE cortex_label_size_bytes histogram
cortex_label_size_bytes_bucket{user="testUser",le="+Inf"} 3
cortex_label_size_bytes_sum{user="testUser"} 148
cortex_label_size_bytes_sum{user="testUser"} 153
cortex_label_size_bytes_count{user="testUser"} 3
`), "cortex_label_size_bytes"))

Expand Down