Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit series for metric queries. #2903

Merged
merged 12 commits into from
Nov 18, 2020
7 changes: 6 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,11 @@ logs in Loki.
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 14]

# Limit the maximum of unique series that is returned by a metric query.
# When the limit is reached an error is returned.
# CLI flag: -querier.max-query-series
[max_query_series: <int> | default = 500]

# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
Expand Down Expand Up @@ -1881,4 +1886,4 @@ multi_kv_config:
mirror-enabled: false
primary: consul
```
### Generic placeholders
### Generic placeholders
2 changes: 1 addition & 1 deletion pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}

eng := logql.NewEngine(conf.Querier.Engine, querier)
eng := logql.NewEngine(conf.Querier.Engine, querier, limits)
var query logql.Query

if q.isInstant() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logcli/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ type testQueryClient struct {

func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
q := logql.NewMockQuerier(0, testStreams)
e := logql.NewEngine(logql.EngineOpts{}, q)
e := logql.NewEngine(logql.EngineOpts{}, q, logql.NoLimits)
return &testQueryClient{
engine: e,
queryRangeCalls: 0,
Expand Down
25 changes: 23 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -85,14 +86,16 @@ func (opts *EngineOpts) applyDefault() {
type Engine struct {
timeout time.Duration
evaluator Evaluator
limits Limits
}

// NewEngine creates a new LogQL Engine.
func NewEngine(opts EngineOpts, q Querier) *Engine {
func NewEngine(opts EngineOpts, q Querier, l Limits) *Engine {
opts.applyDefault()
return &Engine{
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
limits: l,
}
}

Expand All @@ -106,6 +109,7 @@ func (ng *Engine) Query(params Params) Query {
return ParseExpr(query)
},
record: true,
limits: ng.limits,
}
}

Expand All @@ -119,6 +123,7 @@ type query struct {
timeout time.Duration
params Params
parse func(context.Context, string) (Expr, error)
limits Limits
evaluator Evaluator
record bool
}
Expand All @@ -145,7 +150,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
status := "200"
if err != nil {
status = "500"
if IsParseError(err) || IsPipelineError(err) {
if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) {
status = "400"
}
}
Expand Down Expand Up @@ -194,18 +199,30 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
return q.evalLiteral(ctx, lit)
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
if err != nil {
return nil, err
}
defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)

seriesIndex := map[uint64]*promql.Series{}
maxSeries := q.limits.MaxQuerySeries(userID)

next, ts, vec := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}

// fail fast for the first step or instant query
if len(vec) > maxSeries {
return nil, newSeriesLimitError(maxSeries)
}

if GetRangeType(q.params) == InstantType {
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec, nil
Expand Down Expand Up @@ -237,6 +254,10 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
V: p.V,
})
}
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, newSeriesLimitError(maxSeries)
}
next, ts, vec = stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
Expand Down
55 changes: 45 additions & 10 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -460,15 +461,15 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()

eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params))
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits)
q := eng.Query(LiteralParams{
qs: test.qs,
start: test.ts,
end: test.ts,
direction: test.direction,
limit: test.limit,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1513,7 +1514,7 @@ func TestEngine_RangeQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()

eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params))
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits)

q := eng.Query(LiteralParams{
qs: test.qs,
Expand All @@ -1524,7 +1525,7 @@ func TestEngine_RangeQuery(t *testing.T) {
direction: test.direction,
limit: test.limit,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -1549,7 +1550,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it

func TestEngine_Stats(t *testing.T) {

eng := NewEngine(EngineOpts{}, &statsQuerier{})
eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits)

q := eng.Query(LiteralParams{
qs: `{foo="bar"}`,
Expand All @@ -1558,7 +1559,7 @@ func TestEngine_Stats(t *testing.T) {
direction: logproto.BACKWARD,
limit: 1000,
})
r, err := q.Exec(context.Background())
r, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
require.NoError(t, err)
require.Equal(t, int64(1), r.Statistics.Store.DecompressedBytes)
}
Expand Down Expand Up @@ -1621,19 +1622,53 @@ func TestStepEvaluator_Error(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
tc := tc
eng := NewEngine(EngineOpts{}, tc.querier)
eng := NewEngine(EngineOpts{}, tc.querier, NoLimits)
q := eng.Query(LiteralParams{
qs: tc.qs,
start: time.Unix(0, 0),
end: time.Unix(180, 0),
step: 1 * time.Second,
})
_, err := q.Exec(context.Background())
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
require.Equal(t, tc.err, err)
})
}
}

func TestEngine_MaxSeries(t *testing.T) {
eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), &fakeLimits{maxSeries: 1})

for _, test := range []struct {
qs string
direction logproto.Direction
expectLimitErr bool
}{
{`topk(1,rate(({app=~"foo|bar"})[1m]))`, logproto.FORWARD, true},
{`{app="foo"}`, logproto.FORWARD, false},
{`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD, false},
{`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD, true},
{`rate({app="foo"}[30s])`, logproto.FORWARD, true},
{`count_over_time({app="foo|bar"} |~".+bar" [1m])`, logproto.BACKWARD, true},
{`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD, false},
} {
q := eng.Query(LiteralParams{
qs: test.qs,
start: time.Unix(0, 0),
end: time.Unix(100000, 0),
step: 60 * time.Second,
direction: test.direction,
limit: 1000,
})
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if test.expectLimitErr {
require.NotNil(t, err)
require.True(t, errors.Is(err, ErrLimit))
return
}
require.Nil(t, err)
}
}

// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
func BenchmarkRangeQuery100000(b *testing.B) {
benchmarkRangeQuery(int64(100000), b)
Expand All @@ -1653,7 +1688,7 @@ var result promql_parser.Value

func benchmarkRangeQuery(testsize int64, b *testing.B) {
b.ReportAllocs()
eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize))
eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize), NoLimits)
start := time.Unix(0, 0)
end := time.Unix(testsize, 0)
b.ResetTimer()
Expand Down Expand Up @@ -1692,7 +1727,7 @@ func benchmarkRangeQuery(testsize int64, b *testing.B) {
direction: test.direction,
limit: 1000,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
b.Fatal(err)
}
Expand Down
42 changes: 32 additions & 10 deletions pkg/logql/error.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package logql

import (
"errors"
"fmt"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/logql/log"
)

// Those errors are useful for comparing error returned by the engine.
// e.g. errors.Is(err,logql.ErrParse) let you know if this is a ast parsing error.
var (
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
ErrParse = errors.New("failed to parse the log query")
ErrPipeline = errors.New("failed execute pipeline")
ErrLimit = errors.New("limit reached while evaluating the query")
)

// ParseError is what is returned when we failed to parse.
type ParseError struct {
msg string
Expand All @@ -21,6 +30,11 @@ func (p ParseError) Error() string {
return fmt.Sprintf("parse error at line %d, col %d: %s", p.line, p.col, p.msg)
}

// Is allows to use errors.Is(err,ErrParse) on this error.
func (p ParseError) Is(target error) bool {
return target == ErrParse
}

func newParseError(msg string, line, col int) ParseError {
return ParseError{
msg: msg,
Expand All @@ -37,12 +51,6 @@ func newStageError(expr Expr, err error) ParseError {
}
}

// IsParseError returns true if the err is a ast parsing error.
func IsParseError(err error) bool {
_, ok := err.(ParseError)
return ok
}

type pipelineError struct {
metric labels.Labels
errorType string
Expand All @@ -64,8 +72,22 @@ func (e pipelineError) Error() string {
e.errorType, e.metric, e.errorType)
}

// IsPipelineError tells if the error is generated by a Pipeline.
func IsPipelineError(err error) bool {
_, ok := err.(*pipelineError)
return ok
// Is allows to use errors.Is(err,ErrPipeline) on this error.
func (e pipelineError) Is(target error) bool {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
return target == ErrPipeline
}

type limitError struct {
error
}

func newSeriesLimitError(limit int) *limitError {
return &limitError{
error: fmt.Errorf("maximum of series (%d) reached for a single query", limit),
}
}

// Is allows to use errors.Is(err,ErrLimit) on this error.
func (e limitError) Is(target error) bool {
return target == ErrLimit
}
22 changes: 22 additions & 0 deletions pkg/logql/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package logql

import (
"math"
)

var (
NoLimits = &fakeLimits{maxSeries: math.MaxInt32}
)

// Limits allow the engine to fetch limits for a given users.
type Limits interface {
MaxQuerySeries(userID string) int
}

type fakeLimits struct {
maxSeries int
}

func (f fakeLimits) MaxQuerySeries(userID string) int {
return f.maxSeries
}
2 changes: 1 addition & 1 deletion pkg/logql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ParseExpr(input string) (expr Expr, err error) {
if r := recover(); r != nil {
var ok bool
if err, ok = r.(error); ok {
if IsParseError(err) {
if errors.Is(err, ErrParse) {
return
}
err = newParseError(err.Error(), 0, 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,7 @@ func TestIsParseError(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsParseError(tt.errFn()); got != tt.want {
if got := errors.Is(tt.errFn(), ErrParse); got != tt.want {
t.Errorf("IsParseError() = %v, want %v", got, tt.want)
}
})
Expand Down
Loading