Skip to content

Commit

Permalink
Add initial implementation of per-query limits (#8727)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Sometimes we want to limit the impact of a single query by imposing
limits that are stricter than the current tenant limit. E.g. the maximum
query length could be seven days but based on the query or an admins
decision a query should just have a maximum length of one day. This is
where per-request limits come into play. They are passed via the
`X-Loki-Query-Limit` header and extracted into the requests context.

It is the responsibility of the operator or admin that the header is
valid.

**Which issue(s) this PR fixes**:
Fixes #8762

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Karsten Jeschkies <karsten.jeschkies@grafana.com>
  • Loading branch information
cstyan and jeschkies committed Mar 10, 2023
1 parent 10e3d80 commit 5a85f66
Show file tree
Hide file tree
Showing 22 changed files with 819 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [8727](https://github.com/grafana/loki/pull/8727) **cstyan** **jeschkies**: Propagate per-request limit header to querier.
* [8682](https://github.com/grafana/loki/pull/8682) **dannykopping**: Add fetched chunk size distribution metric `loki_chunk_fetcher_fetched_size_bytes`.
* [8532](https://github.com/grafana/loki/pull/8532) **justcompile**: Adds Storage Class option to S3 objects
* [7951](https://github.com/grafana/loki/pull/7951) **MichelHollands**: Add a count template function to line_format and label_format.
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ engine:
# When true, allow queries to span multiple tenants.
# CLI flag: -querier.multi-tenant-queries-enabled
[multi_tenant_queries_enabled: <boolean> | default = false]

# When true, querier limits sent via a header are enforced.
# CLI flag: -querier.per-request-limits-enabled
[per_request_limits_enabled: <boolean> | default = false]
```

### query_scheduler
Expand Down
12 changes: 5 additions & 7 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
for _, v := range values {
req.Header.Add(key, v)
}

fmt.Println(req.Header.Values(key))
}

return r.next.RoundTrip(req)
Expand Down Expand Up @@ -443,16 +441,16 @@ func (c *Client) GetRules(ctx context.Context) (*RulesResponse, error) {
}

func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}
lokiResp := Response{}
err := json.Unmarshal(buf, &lokiResp)
if err != nil {
return nil, fmt.Errorf("error parsing response data: %w", err)
return nil, fmt.Errorf("error parsing response data '%s': %w", string(buf), err)
}

if statusCode/100 == 2 {
return &lokiResp, nil
}
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
return &lokiResp, nil
}

func (c *Client) rangeQueryURL(query string) string {
Expand Down
55 changes: 55 additions & 0 deletions integration/per_request_limits_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package integration

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/util/querylimits"
)

func TestPerRequestLimits(t *testing.T) {
clu := cluster.New()
defer func() {
assert.NoError(t, clu.Cleanup())
}()

var (
tAll = clu.AddComponent(
"all",
"-target=all",
"-log.level=debug",
"-querier.per-request-limits-enabled=true",
)
)

require.NoError(t, clu.Run())

queryLimitsPolicy := client.InjectHeadersOption(map[string][]string{querylimits.HTTPHeaderQueryLimitsKey: {`{"maxQueryLength": "1m"}`}})
cliTenant := client.New("org1", "", tAll.HTTPURL(), queryLimitsPolicy)

// ingest log lines for tenant 1 and tenant 2.
require.NoError(t, cliTenant.PushLogLineWithTimestamp("lineA", cliTenant.Now.Add(-45*time.Minute), map[string]string{"job": "fake"}))

// check that per-rquest-limits are enforced
_, err := cliTenant.RunRangeQuery(context.Background(), `{job="fake"}`)
require.ErrorContains(t, err, "the query time range exceeds the limit (query length")

// check without policy header
cliTenant = client.New("org1", "", tAll.HTTPURL())
resp, err := cliTenant.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
require.ElementsMatch(t, []string{"lineA"}, lines)
}
1 change: 0 additions & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
tenants, _ := tenant.TenantIDs(ctx)
timeoutCapture := func(id string) time.Duration { return q.limits.QueryTimeout(ctx, id) }
queryTimeout := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, timeoutCapture)

ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()

Expand Down
34 changes: 21 additions & 13 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/fakeauth"
"github.com/grafana/loki/pkg/util/limiter"
util_log "github.com/grafana/loki/pkg/util/log"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -329,17 +330,6 @@ type Frontend interface {
CheckReady(_ context.Context) error
}

type CombinedLimits interface {
compactor.Limits
distributor.Limits
ingester.Limits
querier.Limits
queryrange.Limits
ruler.RulesLimits
scheduler.Limits
storage.StoreLimits
}

// Loki is the root datastructure for Loki.
type Loki struct {
Cfg Config
Expand All @@ -353,7 +343,7 @@ type Loki struct {
Server *server.Server
InternalServer *server.Server
ring *ring.Ring
Overrides CombinedLimits
Overrides limiter.CombinedLimits
tenantConfigs *runtime.TenantConfigs
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
Expand Down Expand Up @@ -658,7 +648,7 @@ func (t *Loki) setupModuleManager() error {
Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport},
Store: {Overrides, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport, CacheGenerationLoader},
Querier: {Store, Ring, Server, IngesterQuerier, Overrides, UsageReport, CacheGenerationLoader},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, UsageReport, CacheGenerationLoader},
QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport},
Expand All @@ -675,6 +665,24 @@ func (t *Loki) setupModuleManager() error {
MemberlistKV: {Server},
}

if t.Cfg.Querier.PerRequestLimitsEnabled {
level.Debug(util_log.Logger).Log("msg", "per-query request limits support enabled")
mm.RegisterModule(QueryLimiter, t.initQueryLimiter, modules.UserInvisibleModule)
mm.RegisterModule(QueryLimitsInterceptors, t.initQueryLimitsInterceptors, modules.UserInvisibleModule)
mm.RegisterModule(QueryLimitsTripperware, t.initQueryLimitsTripperware, modules.UserInvisibleModule)

deps[Querier] = append(deps[Querier], QueryLimiter)
deps[QueryFrontend] = append(deps[QueryFrontend], QueryLimitsTripperware)

deps[QueryLimiter] = []string{Overrides}
deps[QueryLimitsInterceptors] = []string{}
deps[QueryLimitsTripperware] = []string{QueryFrontendTripperware}

if err := mm.AddDependency(Server, QueryLimitsInterceptors); err != nil {
return err
}
}

// Add IngesterQuerier as a dependency for store when target is either querier, ruler, read, or backend.
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(Backend) {
deps[Store] = append(deps[Store], IngesterQuerier)
Expand Down
51 changes: 48 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ import (
"github.com/grafana/loki/pkg/storage/stores/tsdb"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util/httpreq"
"github.com/grafana/loki/pkg/util/limiter"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/querylimits"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/validation"
)
Expand All @@ -87,6 +89,9 @@ const (
IngesterQuerier string = "ingester-querier"
QueryFrontend string = "query-frontend"
QueryFrontendTripperware string = "query-frontend-tripperware"
QueryLimiter string = "query-limiter"
QueryLimitsInterceptors string = "query-limits-interceptors"
QueryLimitsTripperware string = "query-limits-tripper"
RulerStorage string = "ruler-storage"
Ruler string = "ruler"
Store string = "store"
Expand Down Expand Up @@ -359,6 +364,15 @@ func (t *Loki) initQuerier() (services.Service, error) {
queryrangebase.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader),
)
}

// TODO: Probably only needed when not target all
if t.Cfg.Querier.PerRequestLimitsEnabled {
toMerge = append(
toMerge,
querylimits.NewQueryLimitsMiddleware(log.With(util_log.Logger, "component", "query-limits-middleware")),
)
}

httpMiddleware := middleware.Merge(toMerge...)

logger := log.With(util_log.Logger, "component", "querier")
Expand Down Expand Up @@ -769,14 +783,21 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
frontendHandler = gziphandler.GzipHandler(frontendHandler)
}

frontendHandler = middleware.Merge(
toMerge := []middleware.Interface{
httpreq.ExtractQueryTagsMiddleware(),
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
).Wrap(frontendHandler)
}

if t.Cfg.Querier.PerRequestLimitsEnabled {
logger := log.With(util_log.Logger, "component", "query-limiter-middleware")
toMerge = append(toMerge, querylimits.NewQueryLimitsMiddleware(logger))
}

frontendHandler = middleware.Merge(toMerge...).Wrap(frontendHandler)

var defaultHandler http.Handler
// If this process also acts as a Querier we don't do any proxying of tail requests
Expand Down Expand Up @@ -1109,6 +1130,30 @@ func (t *Loki) initQueryScheduler() (services.Service, error) {
return s, nil
}

func (t *Loki) initQueryLimiter() (services.Service, error) {
_ = level.Debug(util_log.Logger).Log("msg", "initializing query limiter")
logger := log.With(util_log.Logger, "component", "query-limiter")
t.Overrides = querylimits.NewLimiter(logger, t.Overrides)
return nil, nil
}

func (t *Loki) initQueryLimitsInterceptors() (services.Service, error) {
_ = level.Debug(util_log.Logger).Log("msg", "initializing query limits interceptors")
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, querylimits.ServerQueryLimitsInterceptor)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, querylimits.StreamServerQueryLimitsInterceptor)

return nil, nil
}

func (t *Loki) initQueryLimitsTripperware() (services.Service, error) {
_ = level.Debug(util_log.Logger).Log("msg", "initializing query limits tripperware")
t.QueryFrontEndTripperware = querylimits.WrapTripperware(
t.QueryFrontEndTripperware,
)

return nil, nil
}

func (t *Loki) initUsageReport() (services.Service, error) {
if !t.Cfg.UsageReport.Enabled {
return nil, nil
Expand Down Expand Up @@ -1138,7 +1183,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}

func (t *Loki) deleteRequestsClient(clientType string, limits CombinedLimits) (deletion.DeleteRequestsClient, error) {
func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {
if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (q *QuerierAPI) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
}

ctx := r.Context()
if err := q.validateEntriesLimits(ctx, request.Query, request.Limit); err != nil {
if err := q.validateMaxEntriesLimits(ctx, request.Query, request.Limit); err != nil {
serverutil.WriteError(err, w)
return
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (q *QuerierAPI) InstantQueryHandler(w http.ResponseWriter, r *http.Request)
}

ctx := r.Context()
if err := q.validateEntriesLimits(ctx, request.Query, request.Limit); err != nil {
if err := q.validateMaxEntriesLimits(ctx, request.Query, request.Limit); err != nil {
serverutil.WriteError(err, w)
return
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (q *QuerierAPI) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
}

ctx := r.Context()
if err := q.validateEntriesLimits(ctx, request.Query, request.Limit); err != nil {
if err := q.validateMaxEntriesLimits(ctx, request.Query, request.Limit); err != nil {
serverutil.WriteError(err, w)
return
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func parseRegexQuery(httpRequest *http.Request) (string, error) {
return query, nil
}

func (q *QuerierAPI) validateEntriesLimits(ctx context.Context, query string, limit uint32) error {
func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, query string, limit uint32) error {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down
9 changes: 6 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Config struct {
QueryIngesterOnly bool `yaml:"query_ingester_only"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
QueryTimeout time.Duration `yaml:"query_timeout" doc:"hidden"`
PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"`
}

// RegisterFlags register flags.
Expand All @@ -66,6 +67,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.QueryStoreOnly, "querier.query-store-only", false, "Only query the store, and not attempt any ingesters. This is useful for running a standalone querier pool operating only against stored data.")
f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.")
f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.")
f.BoolVar(&cfg.PerRequestLimitsEnabled, "querier.per-request-limits-enabled", false, "When true, querier limits sent via a header are enforced.")
}

// Validate validates the config.
Expand Down Expand Up @@ -668,8 +670,8 @@ type timeRangeLimits interface {
func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits timeRangeLimits, from, through time.Time) (time.Time, time.Time, error) {
now := nowFunc()
// Clamp the time range based on the max query lookback.
var maxQueryLookback time.Duration
if maxQueryLookback = limits.MaxQueryLookback(ctx, userID); maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) {
maxQueryLookback := limits.MaxQueryLookback(ctx, userID)
if maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) {
origStartTime := from
from = now.Add(-maxQueryLookback)

Expand All @@ -679,7 +681,8 @@ func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits tim
"updated", from)

}
if maxQueryLength := limits.MaxQueryLength(ctx, userID); maxQueryLength > 0 && (through).Sub(from) > maxQueryLength {
maxQueryLength := limits.MaxQueryLength(ctx, userID)
if maxQueryLength > 0 && (through).Sub(from) > maxQueryLength {
return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, util_validation.ErrQueryTooLong, (through).Sub(from), maxQueryLength)
}
if through.Before(from) {
Expand Down
1 change: 0 additions & 1 deletion pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que
}

// Clamp the time range based on the max query lookback.

lookbackCapture := func(id string) time.Duration { return l.MaxQueryLookback(ctx, id) }
if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, lookbackCapture); maxQueryLookback > 0 {
minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback))
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil {
if err := validateMaxEntriesLimits(req, rangeQuery.Limit, r.limits); err != nil {
return nil, err
}
// Only filter expressions are query sharded
Expand Down Expand Up @@ -239,14 +239,15 @@ func transformRegexQuery(req *http.Request, expr syntax.LogSelectorExpr) (syntax
}

// validates log entries limits
func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error {
func validateMaxEntriesLimits(req *http.Request, reqLimit uint32, limits Limits) error {
tenantIDs, err := tenant.TenantIDs(req.Context())
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

maxEntriesCapture := func(id string) int { return limits.MaxEntriesLimitPerQuery(req.Context(), id) }
maxEntriesLimit := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, maxEntriesCapture)

if int(reqLimit) > maxEntriesLimit && maxEntriesLimit != 0 {
return httpgrpc.Errorf(http.StatusBadRequest,
"max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit)
Expand Down
Loading

0 comments on commit 5a85f66

Please sign in to comment.