Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add -store.max-query-length support to blocks storage #2826

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Metric `cortex_ingester_flush_reasons` has been renamed to `cortex_ingester_series_flushed_total`, and is now incremented during flush, not when series is enqueued for flushing. #2802
* [CHANGE] Experimental Delete Series: Metric `cortex_purger_oldest_pending_delete_request_age_seconds` would track age of delete requests since they are over their cancellation period instead of their creation time. #2806
* [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778
* [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826
* [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783
* [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783
* [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783
Expand All @@ -32,6 +33,7 @@
* [ENHANCEMENT] Query-tee: Forward `X-Scope-OrgId` header to backend, if present in the request. #2815
* [ENHANCEMENT] Experimental TSDB: Added `-experimental.tsdb.head-compaction-idle-timeout` option to force compaction of data in memory into a block. #2803
* [ENHANCEMENT] Experimental TSDB: Added support for flushing blocks via `/flush`, `/shutdown` (previously these only worked for chunks storage) and by using `-experimental.tsdb.flush-blocks-on-shutdown` option. #2794
* [ENHANCEMENT] Experimental TSDB: Added support to enforce max query time range length via `-store.max-query-length`. #2826
* [ENHANCEMENT] Ingester: Added new metric `cortex_ingester_flush_series_in_progress` that reports number of ongoing flush-series operations. Useful when calling `/flush` handler: if `cortex_ingester_flush_queue_length + cortex_ingester_flush_series_in_progress` is 0, all flushes are finished. #2778
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
Expand Down
7 changes: 7 additions & 0 deletions development/tsdb-blocks-storage-s3/config/cortex.yaml
Expand Up @@ -101,3 +101,10 @@ store_gateway:

frontend_worker:
frontend_address: "query-frontend:9007"

query_range:
split_queries_by_interval: 24h

limits:
# Limit max query time range to 31d
max_query_length: 744h
2 changes: 1 addition & 1 deletion development/tsdb-blocks-storage-s3/docker-compose.yml
Expand Up @@ -225,7 +225,7 @@ services:
context: .
dockerfile: dev.dockerfile
image: cortex
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18007 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007"]
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18007 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007 -store.max-query-length=8760h"]
depends_on:
- consul
- minio
Expand Down
14 changes: 8 additions & 6 deletions docs/configuration/config-file-reference.md
Expand Up @@ -112,9 +112,9 @@ api:
# The query_frontend_config configures the Cortex query-frontend.
[frontend: <query_frontend_config>]

# The queryrange_config configures the query splitting and caching in the Cortex
# query-frontend.
[query_range: <queryrange_config>]
# The query_range_config configures the query splitting and caching in the
# Cortex query-frontend.
[query_range: <query_range_config>]

# The table_manager_config configures the Cortex table-manager.
[table_manager: <table_manager_config>]
Expand Down Expand Up @@ -708,9 +708,9 @@ The `query_frontend_config` configures the Cortex query-frontend.
[log_queries_longer_than: <duration> | default = 0s]
```

### `queryrange_config`
### `query_range_config`

The `queryrange_config` configures the query splitting and caching in the Cortex query-frontend.
The `query_range_config` configures the query splitting and caching in the Cortex query-frontend.

```yaml
# Split queries by an interval and execute in parallel, 0 disables it. You
Expand Down Expand Up @@ -2514,7 +2514,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -store.query-chunk-limit
[max_chunks_per_query: <int> | default = 2000000]

# Limit to length of chunk store queries, 0 to disable.
# Limit the query time range (end - start time). This limit is enforced in the
# query-frontend (on the received query), in the querier (on the query possibly
# split by the query-frontend) and in the chunks storage. 0 to disable.
# CLI flag: -store.max-query-length
[max_query_length: <duration> | default = 0s]

Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/chunk_store_test.go
Expand Up @@ -918,7 +918,7 @@ func TestChunkStoreError(t *testing.T) {
query: "foo",
from: model.Time(0),
through: model.Time(0).Add(31 * 24 * time.Hour),
err: "invalid query, length > limit (744h0m0s > 720h0m0s)",
err: "the query time range exceeds the limit (query length: 744h0m0s, limit: 720h0m0s)",
},
{
query: "{foo=\"bar\"}",
Expand Down
8 changes: 4 additions & 4 deletions pkg/cortex/modules.go
Expand Up @@ -169,7 +169,7 @@ func (t *Cortex) initDistributor() (serv services.Service, err error) {
}

func (t *Cortex) initQuerier() (serv services.Service, err error) {
queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryables, t.TombstonesLoader, prometheus.DefaultRegisterer)
queryable, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, prometheus.DefaultRegisterer)

// Prometheus histograms for requests to the querier.
querierRequestDuration := promauto.With(prometheus.DefaultRegisterer).NewHistogramVec(prometheus.HistogramOpts{
Expand Down Expand Up @@ -455,7 +455,7 @@ func (t *Cortex) initTableManager() (services.Service, error) {
func (t *Cortex) initRuler() (serv services.Service, err error) {
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryables, t.TombstonesLoader, prometheus.DefaultRegisterer)
queryable, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, prometheus.DefaultRegisterer)

t.Ruler, err = ruler.NewRuler(t.Cfg.Ruler, engine, queryable, t.Distributor, prometheus.DefaultRegisterer, util.Logger)
if err != nil {
Expand Down Expand Up @@ -590,11 +590,11 @@ func (t *Cortex) setupModuleManager() error {
Store: {Overrides, DeleteRequestsStore},
Ingester: {Overrides, Store, API, RuntimeConfig, MemberlistKV},
Flusher: {Store, API},
Querier: {Distributor, Store, Ring, API, StoreQueryable},
Querier: {Overrides, Distributor, Store, Ring, API, StoreQueryable},
StoreQueryable: {Store},
QueryFrontend: {API, Overrides, DeleteRequestsStore},
TableManager: {API},
Ruler: {Distributor, Store, StoreQueryable},
Ruler: {Overrides, Distributor, Store, StoreQueryable},
Configs: {API},
AlertManager: {API},
Compactor: {API},
Expand Down
23 changes: 17 additions & 6 deletions pkg/querier/querier.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"flag"
"fmt"
"strings"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/tls"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// Config contains the configuration require to create a querier
Expand Down Expand Up @@ -132,7 +134,7 @@ func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storag
}

// New builds a queryable and promql engine.
func New(cfg Config, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader *purger.TombstonesLoader, reg prometheus.Registerer) (storage.SampleAndChunkQueryable, *promql.Engine) {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader *purger.TombstonesLoader, reg prometheus.Registerer) (storage.SampleAndChunkQueryable, *promql.Engine) {
iteratorFunc := getChunksIteratorFunction(cfg)

distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, iteratorFunc, cfg.QueryIngestersWithin)
Expand All @@ -145,7 +147,7 @@ func New(cfg Config, distributor Distributor, stores []QueryableWithFilter, tomb
}
}

queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, tombstonesLoader)
queryable := NewQueryable(distributorQueryable, ns, iteratorFunc, cfg, limits, tombstonesLoader)

lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) {
querier, err := queryable.Querier(ctx, mint, maxt)
Expand Down Expand Up @@ -205,7 +207,7 @@ type QueryableWithFilter interface {
}

// NewQueryable creates a new Queryable for cortex.
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, tombstonesLoader *purger.TombstonesLoader) storage.Queryable {
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides, tombstonesLoader *purger.TombstonesLoader) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
now := time.Now()

Expand All @@ -226,6 +228,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
maxt: maxt,
chunkIterFn: chunkIterFn,
tombstonesLoader: tombstonesLoader,
limits: limits,
}

dqr, err := distributor.Querier(ctx, mint, maxt)
Expand Down Expand Up @@ -268,6 +271,7 @@ type querier struct {
mint, maxt int64

tombstonesLoader *purger.TombstonesLoader
limits *validation.Overrides
}

// Select implements storage.Querier interface.
Expand All @@ -293,7 +297,14 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
}

tombstones, err := q.tombstonesLoader.GetPendingTombstonesForInterval(userID, model.Time(sp.Start), model.Time(sp.End))
// Validate query time range.
startTime := model.Time(sp.Start)
endTime := model.Time(sp.End)
if maxQueryLength := q.limits.MaxQueryLength(userID); maxQueryLength > 0 && endTime.Sub(startTime) > maxQueryLength {
return storage.ErrSeriesSet(fmt.Errorf(validation.ErrQueryTooLong, endTime.Sub(startTime), maxQueryLength))
}

tombstones, err := q.tombstonesLoader.GetPendingTombstonesForInterval(userID, startTime, endTime)
if err != nil {
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
}
Expand All @@ -302,7 +313,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
seriesSet := q.queriers[0].Select(true, sp, matchers...)

if tombstones.Len() != 0 {
seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: model.Time(sp.Start), End: model.Time(sp.End)})
seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime})
}

return seriesSet
Expand Down Expand Up @@ -331,7 +342,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
seriesSet := q.mergeSeriesSets(result)

if tombstones.Len() != 0 {
seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: model.Time(sp.Start), End: model.Time(sp.End)})
seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime})
}
return seriesSet
}
Expand Down