Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query-frontend: customisable query splitting for queries overlapping query_ingester_within window #11535

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [11539](https://github.com/grafana/loki/pull/11539) **kaviraj,ashwanthgoli** Support caching /series and /labels query results
* [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks.
* [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache.
* [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2884,6 +2884,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -querier.split-metadata-queries-by-interval
[split_metadata_queries_by_interval: <duration> | default = 1d]

# Interval to use for time-based splitting when a request is within the
# `query_ingesters_within` window; defaults to `split-queries-by-interval` by
# setting to 0.
# CLI flag: -querier.split-ingester-queries-by-interval
[split_ingester_queries_by_interval: <duration> | default = 0s]

# Limit queries that can be sharded. Queries within the time range of now and
# now minus this sharding lookback are not sharded. The default value of 0s
# disables the lookback, causing sharding of all queries at all times.
Expand Down
13 changes: 13 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,12 +800,25 @@ func (disabledShuffleShardingLimits) MaxQueriersPerUser(_ string) uint { return

func (disabledShuffleShardingLimits) MaxQueryCapacity(_ string) float64 { return 0 }

// ingesterQueryOptions exists simply to avoid dependency cycles when using querier.Config directly in queryrange.NewMiddleware
type ingesterQueryOptions struct {
querier.Config
}

func (i ingesterQueryOptions) QueryStoreOnly() bool {
return i.Config.QueryStoreOnly
}
func (i ingesterQueryOptions) QueryIngestersWithin() time.Duration {
return i.Config.QueryIngestersWithin
}

func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")

middleware, stopper, err := queryrange.NewMiddleware(
t.Cfg.QueryRange,
t.Cfg.Querier.Engine,
ingesterQueryOptions{t.Cfg.Querier},
util_log.Logger,
t.Overrides,
t.Cfg.SchemaConfig,
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/limits/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Limits interface {
logql.Limits
QuerySplitDuration(string) time.Duration
MetadataQuerySplitDuration(string) time.Duration
IngesterQuerySplitDuration(string) time.Duration
MaxQuerySeries(context.Context, string) int
MaxEntriesLimitPerQuery(context.Context, string) int
MinShardingLookback(string) time.Duration
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg.CacheIndexStatsResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewMiddleware(cfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{
tpw, stopper, err := NewMiddleware(cfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{
Configs: testSchemas,
}, nil, false, nil, constants.Loki)
if stopper != nil {
Expand Down Expand Up @@ -228,7 +228,7 @@ func Test_MaxQueryParallelismDisable(t *testing.T) {
}

func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
maxQueryParallelism: 1,
}, config.SchemaConfig{
Expand Down
115 changes: 28 additions & 87 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func newResultsCacheFromConfig(cfg base.ResultsCacheConfig, registerer prometheu
func NewMiddleware(
cfg Config,
engineOpts logql.EngineOpts,
iqo util.IngesterQueryOptions,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
Expand Down Expand Up @@ -176,36 +177,38 @@ func NewMiddleware(

var codec base.Codec = DefaultCodec

indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, statsCache,
split := newDefaultSplitter(limits, iqo)

indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, split, statsCache,
cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
if err != nil {
return nil, nil, err
}

metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache,
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, newMetricQuerySplitter(limits, iqo), resultsCache,
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}

limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec)
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, split)
if err != nil {
return nil, nil, err
}

// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache, metrics, indexStatsTripperware, metricsNamespace)
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, split, resultsCache, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}

seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace)
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, split, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace)
if err != nil {
return nil, nil, err
}

labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, split, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace)
if err != nil {
return nil, nil, err
}
Expand All @@ -215,7 +218,7 @@ func NewMiddleware(
return nil, nil, err
}

seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, split, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -406,18 +409,7 @@ func getOperation(path string) string {
}

// NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests.
func NewLogFilterTripperware(
cfg Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
metrics *Metrics,
indexStatsTripperware base.Middleware,
metricsNamespace string,
) (base.Middleware, error) {
func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)

Expand All @@ -426,7 +418,7 @@ func NewLogFilterTripperware(
NewLimitsMiddleware(limits),
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}

if cfg.CacheResults {
Expand Down Expand Up @@ -481,16 +473,7 @@ func NewLogFilterTripperware(
}

// NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression.
func NewLimitedTripperware(
_ Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
metrics *Metrics,
indexStatsTripperware base.Middleware,
merger base.Merger,
) (base.Middleware, error) {
func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, split splitter) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)

Expand All @@ -499,7 +482,7 @@ func NewLimitedTripperware(
NewLimitsMiddleware(limits),
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, split, metrics.SplitByMetrics),
NewQuerierSizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
}

Expand All @@ -518,6 +501,7 @@ func NewSeriesTripperware(
metrics *Metrics,
schema config.SchemaConfig,
merger base.Merger,
split splitter,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
Expand Down Expand Up @@ -558,7 +542,7 @@ func NewSeriesTripperware(
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}

if cfg.CacheSeriesResults {
Expand All @@ -567,7 +551,6 @@ func NewSeriesTripperware(
base.InstrumentMiddleware("series_results_cache", metrics.InstrumentMiddlewareMetrics),
cacheMiddleware,
)

}

if cfg.MaxRetries > 0 {
Expand Down Expand Up @@ -601,6 +584,7 @@ func NewLabelsTripperware(
log log.Logger,
limits Limits,
merger base.Merger,
split splitter,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
Expand Down Expand Up @@ -643,7 +627,7 @@ func NewLabelsTripperware(
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}

if cfg.CacheLabelResults {
Expand All @@ -652,7 +636,6 @@ func NewLabelsTripperware(
base.InstrumentMiddleware("label_results_cache", metrics.InstrumentMiddlewareMetrics),
cacheMiddleware,
)

}

if cfg.MaxRetries > 0 {
Expand All @@ -669,21 +652,7 @@ func NewLabelsTripperware(
}

// NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewMetricTripperware(
cfg Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
extractor base.Extractor,
metrics *Metrics,
indexStatsTripperware base.Middleware,
metricsNamespace string,
) (base.Middleware, error) {
func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
cacheKey := cacheKeyLimits{limits, cfg.Transformer}
var queryCacheMiddleware base.Middleware
if cfg.CacheResults {
Expand Down Expand Up @@ -737,7 +706,7 @@ func NewMetricTripperware(
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitMetricByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
)

if cfg.CacheResults {
Expand Down Expand Up @@ -793,16 +762,7 @@ func NewMetricTripperware(
}

// NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewInstantMetricTripperware(
cfg Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
metrics *Metrics,
indexStatsTripperware base.Middleware,
metricsNamespace string,
) (base.Middleware, error) {
func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)

Expand Down Expand Up @@ -844,21 +804,10 @@ func NewInstantMetricTripperware(
}), nil
}

func NewVolumeTripperware(
cfg Config,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
metrics *Metrics,
metricsNamespace string,
) (base.Middleware, error) {
func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
// Parallelize the volume requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d).
// Indices are sharded by 24 hours, so we split the volume request in 24h intervals.
limits = WithSplitByLimits(limits, 24*time.Hour)
limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval)
var cacheMiddleware base.Middleware
if cfg.CacheVolumeResults {
var err error
Expand Down Expand Up @@ -894,6 +843,7 @@ func NewVolumeTripperware(
cacheMiddleware,
cfg,
merger,
split,
limits,
log,
metrics,
Expand Down Expand Up @@ -962,18 +912,7 @@ func volumeFeatureFlagRoundTripper(nextTW base.Middleware, limits Limits) base.M
})
}

func NewIndexStatsTripperware(
cfg Config,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
metrics *Metrics,
metricsNamespace string,
) (base.Middleware, error) {
func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval)

var cacheMiddleware base.Middleware
Expand Down Expand Up @@ -1011,6 +950,7 @@ func NewIndexStatsTripperware(
cacheMiddleware,
cfg,
merger,
split,
limits,
log,
metrics,
Expand All @@ -1028,6 +968,7 @@ func sharedIndexTripperware(
cacheMiddleware base.Middleware,
cfg Config,
merger base.Merger,
split splitter,
limits Limits,
log log.Logger,
metrics *Metrics,
Expand All @@ -1038,7 +979,7 @@ func sharedIndexTripperware(
middlewares := []base.Middleware{
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}

if cacheMiddleware != nil {
Expand Down