From 2113b1b3d5d09d1f8023775dbb2ab38ce5440fe5 Mon Sep 17 00:00:00 2001 From: Charlie Le Date: Sun, 12 Apr 2026 13:12:49 -0700 Subject: [PATCH] Fix flaky TestDistributorQuerier_QueryIngestersWithinBoundary by injecting clock The test was flaky because it captured `time.Now()` at setup, but `distributorQuerier.Select()` called `time.Now()` again internally. On slow CI runners, the clock drift could exceed the 10-second margin in the "maxT well after lookback boundary" subtest, causing the query to short-circuit with an empty result. Inject a `nowFn` function into `distributorQuerier` (defaulting to `time.Now`) so tests can freeze time. Replace the `InDelta` assertion with an exact `Equal` now that the clock is deterministic. Closes #7415 Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Charlie Le --- pkg/querier/distributor_queryable.go | 11 +++++++++-- pkg/querier/distributor_queryable_test.go | 20 ++++++++++---------- pkg/querier/querier.go | 2 +- pkg/querier/querier_test.go | 8 ++++---- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 1dfc80d32db..263558e9b6f 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -43,7 +43,10 @@ type Distributor interface { MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) } -func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int, limits *validation.Overrides) QueryableWithFilter { +func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int, limits *validation.Overrides, nowFn func() time.Time) QueryableWithFilter { + if nowFn == nil { + nowFn = time.Now + } return distributorQueryable{ distributor: distributor, streamingMetdata: streamingMetdata, @@ -52,6 +55,7 @@ func newDistributorQueryable(distributor Distributor, streamingMetdata bool, lab isPartialDataEnabled: isPartialDataEnabled, ingesterQueryMaxAttempts: ingesterQueryMaxAttempts, limits: limits, + nowFn: nowFn, } } @@ -63,6 +67,7 @@ type distributorQueryable struct { isPartialDataEnabled partialdata.IsCfgEnabledFunc ingesterQueryMaxAttempts int limits *validation.Overrides + nowFn func() time.Time } func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) { @@ -76,6 +81,7 @@ func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) isPartialDataEnabled: d.isPartialDataEnabled, ingesterQueryMaxAttempts: d.ingesterQueryMaxAttempts, limits: d.limits, + nowFn: d.nowFn, }, nil } func (d distributorQueryable) UseQueryable(now time.Time, userID string, _, queryMaxT int64) bool { @@ -93,6 +99,7 @@ type distributorQuerier struct { isPartialDataEnabled partialdata.IsCfgEnabledFunc ingesterQueryMaxAttempts int limits *validation.Overrides + nowFn func() time.Time } // Select implements storage.Querier interface. @@ -116,7 +123,7 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st // optimization is particularly important for the blocks storage where the blocks retention in the // ingesters could be way higher than queryIngestersWithin. if queryIngestersWithin > 0 { - now := time.Now() + now := q.nowFn() origMinT := minT minT = max(minT, util.TimeToMillis(now.Add(-queryIngestersWithin))) diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 810185044d9..74a6f84ca99 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -96,7 +96,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) limits.QueryIngestersWithin = model.Duration(testData.queryIngestersWithin) overrides := validation.NewOverrides(limits, nil) - queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, nil, 1, overrides) + queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, nil, 1, overrides, nil) querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) require.NoError(t, err) @@ -136,7 +136,7 @@ func TestDistributorQueryableFilter(t *testing.T) { limits.QueryIngestersWithin = model.Duration(1 * time.Hour) overrides := validation.NewOverrides(limits, nil) - dq := newDistributorQueryable(d, false, true, nil, nil, 1, overrides) + dq := newDistributorQueryable(d, false, true, nil, nil, 1, overrides, nil) now := time.Now() @@ -192,7 +192,7 @@ func TestIngesterStreaming(t *testing.T) { queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool { return partialDataEnabled - }, 1, overrides) + }, 1, overrides, nil) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -363,7 +363,7 @@ func TestDistributorQuerier_Retry(t *testing.T) { queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool { return true - }, ingesterQueryMaxAttempts, overrides) + }, ingesterQueryMaxAttempts, overrides, nil) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -421,7 +421,7 @@ func TestDistributorQuerier_Select_CancelledContext_NoRetry(t *testing.T) { overrides := validation.NewOverrides(limits, nil) queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool { return true - }, ingesterQueryMaxAttempts, overrides) + }, ingesterQueryMaxAttempts, overrides, nil) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -455,7 +455,7 @@ func TestDistributorQuerier_Select_CancelledContext(t *testing.T) { overrides := validation.NewOverrides(limits, nil) queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool { return true - }, ingesterQueryMaxAttempts, overrides) + }, ingesterQueryMaxAttempts, overrides, nil) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -480,7 +480,7 @@ func TestDistributorQuerier_Labels_CancelledContext(t *testing.T) { overrides := validation.NewOverrides(limits, nil) queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool { return true - }, ingesterQueryMaxAttempts, overrides) + }, ingesterQueryMaxAttempts, overrides, nil) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -537,7 +537,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, func(string) bool { return partialDataEnabled - }, 1, overrides) + }, 1, overrides, nil) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -625,7 +625,7 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) { limits.QueryIngestersWithin = model.Duration(lookback) overrides := validation.NewOverrides(limits, nil) - queryable := newDistributorQueryable(distributor, false, true, nil, nil, 1, overrides) + queryable := newDistributorQueryable(distributor, false, true, nil, nil, 1, overrides, func() time.Time { return now }) querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT) require.NoError(t, err) @@ -636,7 +636,7 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) { assert.Len(t, distributor.Calls, 0, testData.description) } else { require.Len(t, distributor.Calls, 1, testData.description) - assert.InDelta(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), float64(15*time.Second.Milliseconds()), testData.description) + assert.Equal(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), testData.description) assert.Equal(t, testData.expectedMaxT, int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), testData.description) } }) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 5205a9eda83..5053eb971f4 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -198,7 +198,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil) ns := make([]QueryableWithFilter, len(stores)) for ix, s := range stores { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index e2f57c6cddb..3ad5f525ba3 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -301,7 +301,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { limits := DefaultLimitsConfig() testOverrides := validation.NewOverrides(limits, nil) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil) tCases := []struct { name string @@ -450,7 +450,7 @@ func TestLimits(t *testing.T) { limits := DefaultLimitsConfig() testOverrides := validation.NewOverrides(limits, nil) - distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides) + distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil) tCases := []struct { name string @@ -1824,11 +1824,11 @@ func TestQuerier_ProjectionHints(t *testing.T) { var distributorQueryable QueryableWithFilter if testData.queryIngesters { // Ingesters will be queried - distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides) + distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil) } else { // Ingesters will not be queried (time range is too old) distributorQueryable = UseBeforeTimestampQueryable( - newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides), + newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil), start.Add(-1*time.Hour), ) }