Skip to content

Commit

Permalink
Limit series for metric queries. (#2903)
Browse files Browse the repository at this point in the history
* Work in progress limit series.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* First draft of the series limiter.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add limiter to query-frontend.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix build

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fmted

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint and build fix

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update docs/sources/configuration/_index.md

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>

* Review feedback.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Reduce // for to fix flaky test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
  • Loading branch information
cyriltovena and achatterjee-grafana authored Nov 18, 2020
1 parent 901562e commit 61ba02e
Show file tree
Hide file tree
Showing 23 changed files with 352 additions and 41 deletions.
7 changes: 6 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,11 @@ logs in Loki.
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 14]
# Limit the maximum of unique series that is returned by a metric query.
# When the limit is reached an error is returned.
# CLI flag: -querier.max-query-series
[max_query_series: <int> | default = 500]
# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
Expand Down Expand Up @@ -1881,4 +1886,4 @@ multi_kv_config:
mirror-enabled: false
primary: consul
```
### Generic placeholders
### Generic placeholders
2 changes: 1 addition & 1 deletion pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}

eng := logql.NewEngine(conf.Querier.Engine, querier)
eng := logql.NewEngine(conf.Querier.Engine, querier, limits)
var query logql.Query

if q.isInstant() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logcli/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ type testQueryClient struct {

func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
q := logql.NewMockQuerier(0, testStreams)
e := logql.NewEngine(logql.EngineOpts{}, q)
e := logql.NewEngine(logql.EngineOpts{}, q, logql.NoLimits)
return &testQueryClient{
engine: e,
queryRangeCalls: 0,
Expand Down
25 changes: 23 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -85,14 +86,16 @@ func (opts *EngineOpts) applyDefault() {
type Engine struct {
timeout time.Duration
evaluator Evaluator
limits Limits
}

// NewEngine creates a new LogQL Engine.
func NewEngine(opts EngineOpts, q Querier) *Engine {
func NewEngine(opts EngineOpts, q Querier, l Limits) *Engine {
opts.applyDefault()
return &Engine{
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
limits: l,
}
}

Expand All @@ -106,6 +109,7 @@ func (ng *Engine) Query(params Params) Query {
return ParseExpr(query)
},
record: true,
limits: ng.limits,
}
}

Expand All @@ -119,6 +123,7 @@ type query struct {
timeout time.Duration
params Params
parse func(context.Context, string) (Expr, error)
limits Limits
evaluator Evaluator
record bool
}
Expand All @@ -145,7 +150,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
status := "200"
if err != nil {
status = "500"
if IsParseError(err) || IsPipelineError(err) {
if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) {
status = "400"
}
}
Expand Down Expand Up @@ -194,18 +199,30 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
return q.evalLiteral(ctx, lit)
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
if err != nil {
return nil, err
}
defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

seriesIndex := map[uint64]*promql.Series{}
maxSeries := q.limits.MaxQuerySeries(userID)

next, ts, vec := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}

// fail fast for the first step or instant query
if len(vec) > maxSeries {
return nil, newSeriesLimitError(maxSeries)
}

if GetRangeType(q.params) == InstantType {
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec, nil
Expand Down Expand Up @@ -237,6 +254,10 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
V: p.V,
})
}
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, newSeriesLimitError(maxSeries)
}
next, ts, vec = stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
Expand Down
55 changes: 45 additions & 10 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -460,15 +461,15 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()

eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params))
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits)
q := eng.Query(LiteralParams{
qs: test.qs,
start: test.ts,
end: test.ts,
direction: test.direction,
limit: test.limit,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1513,7 +1514,7 @@ func TestEngine_RangeQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()

eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params))
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits)

q := eng.Query(LiteralParams{
qs: test.qs,
Expand All @@ -1524,7 +1525,7 @@ func TestEngine_RangeQuery(t *testing.T) {
direction: test.direction,
limit: test.limit,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -1549,7 +1550,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it

func TestEngine_Stats(t *testing.T) {

eng := NewEngine(EngineOpts{}, &statsQuerier{})
eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits)

q := eng.Query(LiteralParams{
qs: `{foo="bar"}`,
Expand All @@ -1558,7 +1559,7 @@ func TestEngine_Stats(t *testing.T) {
direction: logproto.BACKWARD,
limit: 1000,
})
r, err := q.Exec(context.Background())
r, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
require.NoError(t, err)
require.Equal(t, int64(1), r.Statistics.Store.DecompressedBytes)
}
Expand Down Expand Up @@ -1621,19 +1622,53 @@ func TestStepEvaluator_Error(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
tc := tc
eng := NewEngine(EngineOpts{}, tc.querier)
eng := NewEngine(EngineOpts{}, tc.querier, NoLimits)
q := eng.Query(LiteralParams{
qs: tc.qs,
start: time.Unix(0, 0),
end: time.Unix(180, 0),
step: 1 * time.Second,
})
_, err := q.Exec(context.Background())
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
require.Equal(t, tc.err, err)
})
}
}

func TestEngine_MaxSeries(t *testing.T) {
eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), &fakeLimits{maxSeries: 1})

for _, test := range []struct {
qs string
direction logproto.Direction
expectLimitErr bool
}{
{`topk(1,rate(({app=~"foo|bar"})[1m]))`, logproto.FORWARD, true},
{`{app="foo"}`, logproto.FORWARD, false},
{`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD, false},
{`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD, true},
{`rate({app="foo"}[30s])`, logproto.FORWARD, true},
{`count_over_time({app="foo|bar"} |~".+bar" [1m])`, logproto.BACKWARD, true},
{`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD, false},
} {
q := eng.Query(LiteralParams{
qs: test.qs,
start: time.Unix(0, 0),
end: time.Unix(100000, 0),
step: 60 * time.Second,
direction: test.direction,
limit: 1000,
})
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if test.expectLimitErr {
require.NotNil(t, err)
require.True(t, errors.Is(err, ErrLimit))
return
}
require.Nil(t, err)
}
}

// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
func BenchmarkRangeQuery100000(b *testing.B) {
benchmarkRangeQuery(int64(100000), b)
Expand All @@ -1653,7 +1688,7 @@ var result promql_parser.Value

func benchmarkRangeQuery(testsize int64, b *testing.B) {
b.ReportAllocs()
eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize))
eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize), NoLimits)
start := time.Unix(0, 0)
end := time.Unix(testsize, 0)
b.ResetTimer()
Expand Down Expand Up @@ -1692,7 +1727,7 @@ func benchmarkRangeQuery(testsize int64, b *testing.B) {
direction: test.direction,
limit: 1000,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
b.Fatal(err)
}
Expand Down
42 changes: 32 additions & 10 deletions pkg/logql/error.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package logql

import (
"errors"
"fmt"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/logql/log"
)

// Those errors are useful for comparing error returned by the engine.
// e.g. errors.Is(err,logql.ErrParse) let you know if this is a ast parsing error.
var (
ErrParse = errors.New("failed to parse the log query")
ErrPipeline = errors.New("failed execute pipeline")
ErrLimit = errors.New("limit reached while evaluating the query")
)

// ParseError is what is returned when we failed to parse.
type ParseError struct {
msg string
Expand All @@ -21,6 +30,11 @@ func (p ParseError) Error() string {
return fmt.Sprintf("parse error at line %d, col %d: %s", p.line, p.col, p.msg)
}

// Is allows to use errors.Is(err,ErrParse) on this error.
func (p ParseError) Is(target error) bool {
return target == ErrParse
}

func newParseError(msg string, line, col int) ParseError {
return ParseError{
msg: msg,
Expand All @@ -37,12 +51,6 @@ func newStageError(expr Expr, err error) ParseError {
}
}

// IsParseError returns true if the err is a ast parsing error.
func IsParseError(err error) bool {
_, ok := err.(ParseError)
return ok
}

type pipelineError struct {
metric labels.Labels
errorType string
Expand All @@ -64,8 +72,22 @@ func (e pipelineError) Error() string {
e.errorType, e.metric, e.errorType)
}

// IsPipelineError tells if the error is generated by a Pipeline.
func IsPipelineError(err error) bool {
_, ok := err.(*pipelineError)
return ok
// Is allows to use errors.Is(err,ErrPipeline) on this error.
func (e pipelineError) Is(target error) bool {
return target == ErrPipeline
}

type limitError struct {
error
}

func newSeriesLimitError(limit int) *limitError {
return &limitError{
error: fmt.Errorf("maximum of series (%d) reached for a single query", limit),
}
}

// Is allows to use errors.Is(err,ErrLimit) on this error.
func (e limitError) Is(target error) bool {
return target == ErrLimit
}
22 changes: 22 additions & 0 deletions pkg/logql/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package logql

import (
"math"
)

var (
NoLimits = &fakeLimits{maxSeries: math.MaxInt32}
)

// Limits allow the engine to fetch limits for a given users.
type Limits interface {
MaxQuerySeries(userID string) int
}

type fakeLimits struct {
maxSeries int
}

func (f fakeLimits) MaxQuerySeries(userID string) int {
return f.maxSeries
}
2 changes: 1 addition & 1 deletion pkg/logql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ParseExpr(input string) (expr Expr, err error) {
if r := recover(); r != nil {
var ok bool
if err, ok = r.(error); ok {
if IsParseError(err) {
if errors.Is(err, ErrParse) {
return
}
err = newParseError(err.Error(), 0, 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,7 @@ func TestIsParseError(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsParseError(tt.errFn()); got != tt.want {
if got := errors.Is(tt.errFn(), ErrParse); got != tt.want {
t.Errorf("IsParseError() = %v, want %v", got, tt.want)
}
})
Expand Down
Loading

0 comments on commit 61ba02e

Please sign in to comment.