Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ type Distributor interface {
MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error)
}

func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int, limits *validation.Overrides) QueryableWithFilter {
func newDistributorQueryable(distributor Distributor, streamingMetdata bool, labelNamesWithMatchers bool, iteratorFn chunkIteratorFunc, isPartialDataEnabled partialdata.IsCfgEnabledFunc, ingesterQueryMaxAttempts int, limits *validation.Overrides, nowFn func() time.Time) QueryableWithFilter {
if nowFn == nil {
nowFn = time.Now
}
return distributorQueryable{
distributor: distributor,
streamingMetdata: streamingMetdata,
Expand All @@ -52,6 +55,7 @@ func newDistributorQueryable(distributor Distributor, streamingMetdata bool, lab
isPartialDataEnabled: isPartialDataEnabled,
ingesterQueryMaxAttempts: ingesterQueryMaxAttempts,
limits: limits,
nowFn: nowFn,
}
}

Expand All @@ -63,6 +67,7 @@ type distributorQueryable struct {
isPartialDataEnabled partialdata.IsCfgEnabledFunc
ingesterQueryMaxAttempts int
limits *validation.Overrides
nowFn func() time.Time
}

func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
Expand All @@ -76,6 +81,7 @@ func (d distributorQueryable) Querier(mint, maxt int64) (storage.Querier, error)
isPartialDataEnabled: d.isPartialDataEnabled,
ingesterQueryMaxAttempts: d.ingesterQueryMaxAttempts,
limits: d.limits,
nowFn: d.nowFn,
}, nil
}
func (d distributorQueryable) UseQueryable(now time.Time, userID string, _, queryMaxT int64) bool {
Expand All @@ -93,6 +99,7 @@ type distributorQuerier struct {
isPartialDataEnabled partialdata.IsCfgEnabledFunc
ingesterQueryMaxAttempts int
limits *validation.Overrides
nowFn func() time.Time
}

// Select implements storage.Querier interface.
Expand All @@ -116,7 +123,7 @@ func (q *distributorQuerier) Select(ctx context.Context, sortSeries bool, sp *st
// optimization is particularly important for the blocks storage where the blocks retention in the
// ingesters could be way higher than queryIngestersWithin.
if queryIngestersWithin > 0 {
now := time.Now()
now := q.nowFn()
origMinT := minT
minT = max(minT, util.TimeToMillis(now.Add(-queryIngestersWithin)))

Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
limits.QueryIngestersWithin = model.Duration(testData.queryIngestersWithin)
overrides := validation.NewOverrides(limits, nil)

queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, nil, 1, overrides)
queryable := newDistributorQueryable(distributor, streamingMetadataEnabled, true, nil, nil, 1, overrides, nil)
querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

Expand Down Expand Up @@ -136,7 +136,7 @@ func TestDistributorQueryableFilter(t *testing.T) {
limits.QueryIngestersWithin = model.Duration(1 * time.Hour)
overrides := validation.NewOverrides(limits, nil)

dq := newDistributorQueryable(d, false, true, nil, nil, 1, overrides)
dq := newDistributorQueryable(d, false, true, nil, nil, 1, overrides, nil)

now := time.Now()

Expand Down Expand Up @@ -192,7 +192,7 @@ func TestIngesterStreaming(t *testing.T) {

queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
return partialDataEnabled
}, 1, overrides)
}, 1, overrides, nil)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -363,7 +363,7 @@ func TestDistributorQuerier_Retry(t *testing.T) {

queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
return true
}, ingesterQueryMaxAttempts, overrides)
}, ingesterQueryMaxAttempts, overrides, nil)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -421,7 +421,7 @@ func TestDistributorQuerier_Select_CancelledContext_NoRetry(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
return true
}, ingesterQueryMaxAttempts, overrides)
}, ingesterQueryMaxAttempts, overrides, nil)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -455,7 +455,7 @@ func TestDistributorQuerier_Select_CancelledContext(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
return true
}, ingesterQueryMaxAttempts, overrides)
}, ingesterQueryMaxAttempts, overrides, nil)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand All @@ -480,7 +480,7 @@ func TestDistributorQuerier_Labels_CancelledContext(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, func(string) bool {
return true
}, ingesterQueryMaxAttempts, overrides)
}, ingesterQueryMaxAttempts, overrides, nil)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -537,7 +537,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {

queryable := newDistributorQueryable(d, streamingEnabled, labelNamesWithMatchers, nil, func(string) bool {
return partialDataEnabled
}, 1, overrides)
}, 1, overrides, nil)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -625,7 +625,7 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) {
limits.QueryIngestersWithin = model.Duration(lookback)
overrides := validation.NewOverrides(limits, nil)

queryable := newDistributorQueryable(distributor, false, true, nil, nil, 1, overrides)
queryable := newDistributorQueryable(distributor, false, true, nil, nil, 1, overrides, func() time.Time { return now })
querier, err := queryable.Querier(testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

Expand All @@ -636,7 +636,7 @@ func TestDistributorQuerier_QueryIngestersWithinBoundary(t *testing.T) {
assert.Len(t, distributor.Calls, 0, testData.description)
} else {
require.Len(t, distributor.Calls, 1, testData.description)
assert.InDelta(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), float64(15*time.Second.Milliseconds()), testData.description)
assert.Equal(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), testData.description)
assert.Equal(t, testData.expectedMaxT, int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), testData.description)
}
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) {
iteratorFunc := getChunksIteratorFunction(cfg)

distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits)
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil)

ns := make([]QueryableWithFilter, len(stores))
for ix, s := range stores {
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
limits := DefaultLimitsConfig()
testOverrides := validation.NewOverrides(limits, nil)

distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides)
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil)

tCases := []struct {
name string
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestLimits(t *testing.T) {
limits := DefaultLimitsConfig()
testOverrides := validation.NewOverrides(limits, nil)

distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides)
distributorQueryableStreaming := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil)

tCases := []struct {
name string
Expand Down Expand Up @@ -1824,11 +1824,11 @@ func TestQuerier_ProjectionHints(t *testing.T) {
var distributorQueryable QueryableWithFilter
if testData.queryIngesters {
// Ingesters will be queried
distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides)
distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil)
} else {
// Ingesters will not be queried (time range is too old)
distributorQueryable = UseBeforeTimestampQueryable(
newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides),
newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, testOverrides, nil),
start.Add(-1*time.Hour),
)
}
Expand Down
Loading