Skip to content

Commit

Permalink
Add MaxLookBack and MaxLength Query Limits (grafana/phlare#741)
Browse files Browse the repository at this point in the history
* Setup better default

* Add MaxLookBack and Length Query Limits

* Fixes double config flag registration

* make reference-help
  • Loading branch information
cyriltovena committed Jun 2, 2023
1 parent 3fbfb8b commit 5e04c41
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 32 deletions.
4 changes: 2 additions & 2 deletions cmd/phlare/help-all.txt.tmpl
Expand Up @@ -390,9 +390,9 @@ Usage of ./phlare:
-querier.max-concurrent int
The maximum number of concurrent queries allowed. (default 4)
-querier.max-query-length duration
The limit to length of queries. 0 to disable. (default 30d1h)
The limit to length of queries. 0 to disable. (default 1d)
-querier.max-query-lookback duration
Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. The default value of 0 does not set a limit.
Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. 0 to disable, default to 7d. (default 1w)
-querier.max-query-parallelism int
Maximum number of queries that will be scheduled in parallel by the frontend.
-querier.query-store-after duration
Expand Down
4 changes: 2 additions & 2 deletions cmd/phlare/help.txt.tmpl
Expand Up @@ -130,9 +130,9 @@ Usage of ./phlare:
-querier.health-check-timeout duration
Timeout for ingester client healthcheck RPCs. (default 5s)
-querier.max-query-length duration
The limit to length of queries. 0 to disable. (default 30d1h)
The limit to length of queries. 0 to disable. (default 1d)
-querier.max-query-lookback duration
Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. The default value of 0 does not set a limit.
Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. 0 to disable, default to 7d. (default 1w)
-querier.max-query-parallelism int
Maximum number of queries that will be scheduled in parallel by the frontend.
-querier.split-queries-by-interval duration
Expand Down
2 changes: 2 additions & 0 deletions pkg/frontend/frontend.go
Expand Up @@ -96,6 +96,8 @@ type Frontend struct {
type Limits interface {
QuerySplitDuration(string) time.Duration
MaxQueryParallelism(string) int
MaxQueryLength(tenantID string) time.Duration
MaxQueryLookback(tenantID string) time.Duration
}

type frontendRequest struct {
Expand Down
18 changes: 18 additions & 0 deletions pkg/frontend/frontend_select_merge_profile.go
Expand Up @@ -2,14 +2,32 @@ package frontend

import (
"context"
"net/http"

"github.com/bufbuild/connect-go"
"github.com/prometheus/common/model"

"github.com/grafana/dskit/tenant"

profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/pkg/util/connectgrpc"
"github.com/grafana/phlare/pkg/validation"
)

func (f *Frontend) SelectMergeProfile(ctx context.Context, c *connect.Request[querierv1.SelectMergeProfileRequest]) (*connect.Response[profilev1.Profile], error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, connect.NewError(http.StatusBadRequest, err)
}
validated, err := validation.ValidateRangeRequest(f.limits, tenantIDs, model.Interval{Start: model.Time(c.Msg.Start), End: model.Time(c.Msg.End)}, model.Now())
if err != nil {
return nil, connect.NewError(http.StatusBadRequest, err)
}
if validated.IsEmpty {
return connect.NewResponse(&profilev1.Profile{}), nil
}
c.Msg.Start = int64(validated.Start)
c.Msg.End = int64(validated.End)
return connectgrpc.RoundTripUnary[querierv1.SelectMergeProfileRequest, profilev1.Profile](ctx, f, c)
}
24 changes: 18 additions & 6 deletions pkg/frontend/frontend_select_merge_stacktraces.go
Expand Up @@ -7,30 +7,42 @@ import (

"github.com/bufbuild/connect-go"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/common/model"
"golang.org/x/sync/errgroup"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
phlaremodel "github.com/grafana/phlare/pkg/model"
"github.com/grafana/phlare/pkg/util/connectgrpc"
"github.com/grafana/phlare/pkg/util/httpgrpc"
"github.com/grafana/phlare/pkg/util/validation"
validationutil "github.com/grafana/phlare/pkg/util/validation"
"github.com/grafana/phlare/pkg/validation"
)

func (f *Frontend) SelectMergeStacktraces(ctx context.Context,
c *connect.Request[querierv1.SelectMergeStacktracesRequest]) (
*connect.Response[querierv1.SelectMergeStacktracesResponse], error) {
*connect.Response[querierv1.SelectMergeStacktracesResponse], error,
) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
return nil, connect.NewError(http.StatusBadRequest, err)
}

validated, err := validation.ValidateRangeRequest(f.limits, tenantIDs, model.Interval{Start: model.Time(c.Msg.Start), End: model.Time(c.Msg.End)}, model.Now())
if err != nil {
return nil, connect.NewError(http.StatusBadRequest, err)
}
if validated.IsEmpty {
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{}), nil
}
c.Msg.Start = int64(validated.Start)
c.Msg.End = int64(validated.End)

g, ctx := errgroup.WithContext(ctx)
if maxConcurrent := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
if maxConcurrent := validationutil.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
g.SetLimit(maxConcurrent)
}

m := phlaremodel.NewFlameGraphMerger()
interval := validation.MaxDurationOrZeroPerTenant(tenantIDs, f.limits.QuerySplitDuration)
interval := validationutil.MaxDurationOrZeroPerTenant(tenantIDs, f.limits.QuerySplitDuration)
intervals := NewTimeIntervalIterator(time.UnixMilli(c.Msg.Start), time.UnixMilli(c.Msg.End), interval)

for intervals.Next() {
Expand Down
24 changes: 18 additions & 6 deletions pkg/frontend/frontend_select_series.go
Expand Up @@ -7,30 +7,42 @@ import (

"github.com/bufbuild/connect-go"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/common/model"
"golang.org/x/sync/errgroup"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
phlaremodel "github.com/grafana/phlare/pkg/model"
"github.com/grafana/phlare/pkg/util/connectgrpc"
"github.com/grafana/phlare/pkg/util/httpgrpc"
"github.com/grafana/phlare/pkg/util/validation"
validationutil "github.com/grafana/phlare/pkg/util/validation"
"github.com/grafana/phlare/pkg/validation"
)

func (f *Frontend) SelectSeries(ctx context.Context,
c *connect.Request[querierv1.SelectSeriesRequest]) (
*connect.Response[querierv1.SelectSeriesResponse], error) {
*connect.Response[querierv1.SelectSeriesResponse], error,
) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
return nil, connect.NewError(http.StatusBadRequest, err)
}

validated, err := validation.ValidateRangeRequest(f.limits, tenantIDs, model.Interval{Start: model.Time(c.Msg.Start), End: model.Time(c.Msg.End)}, model.Now())
if err != nil {
return nil, connect.NewError(http.StatusBadRequest, err)
}
if validated.IsEmpty {
return connect.NewResponse(&querierv1.SelectSeriesResponse{}), nil
}
c.Msg.Start = int64(validated.Start)
c.Msg.End = int64(validated.End)

g, ctx := errgroup.WithContext(ctx)
if maxConcurrent := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
if maxConcurrent := validationutil.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueryParallelism); maxConcurrent > 0 {
g.SetLimit(maxConcurrent)
}

m := phlaremodel.NewSeriesMerger(false)
interval := validation.MaxDurationOrZeroPerTenant(tenantIDs, f.limits.QuerySplitDuration)
interval := validationutil.MaxDurationOrZeroPerTenant(tenantIDs, f.limits.QuerySplitDuration)
intervals := NewTimeIntervalIterator(time.UnixMilli(c.Msg.Start), time.UnixMilli(c.Msg.End), interval,
WithAlignment(time.Second*time.Duration(c.Msg.Step)))

Expand Down
9 changes: 2 additions & 7 deletions pkg/frontend/frontend_test.go
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/grafana/phlare/pkg/scheduler/schedulerpb/schedulerpbconnect"
"github.com/grafana/phlare/pkg/util/httpgrpc"
"github.com/grafana/phlare/pkg/util/servicediscovery"
"github.com/grafana/phlare/pkg/validation"
)

const testFrontendWorkerConcurrency = 5
Expand Down Expand Up @@ -71,7 +72,7 @@ func setupFrontendWithConcurrencyAndServerOptions(t *testing.T, reg prometheus.R
cfg.Port = port

logger := log.NewLogfmtLogger(os.Stdout)
f, err := NewFrontend(cfg, mockLimits{}, logger, reg)
f, err := NewFrontend(cfg, validation.MockLimits{MaxQueryParallelismValue: 1}, logger, reg)
require.NoError(t, err)

frontendpbconnect.RegisterFrontendForQuerierHandler(mux, f)
Expand Down Expand Up @@ -338,12 +339,6 @@ func TestFrontendFailedCancellation(t *testing.T) {
})
}

type mockLimits struct{}

func (mockLimits) QuerySplitDuration(string) time.Duration { return 0 }

func (mockLimits) MaxQueryParallelism(string) int { return 1 }

type mockScheduler struct {
t *testing.T
f *Frontend
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/validation/limits.go
Expand Up @@ -38,3 +38,21 @@ func MaxDurationOrZeroPerTenant(tenantIDs []string, f func(string) time.Duration
}
return *result
}

// SmallestPositiveNonZeroDurationPerTenant is returning the minimal positive
// and non-zero value of the supplied limit function for all given tenants. In
// many limits a value of 0 means unlimited so the method will return 0 only if
// all inputs have a limit of 0 or an empty tenant list is given.
func SmallestPositiveNonZeroDurationPerTenant(tenantIDs []string, f func(string) time.Duration) time.Duration {
var result *time.Duration
for _, tenantID := range tenantIDs {
v := f(tenantID)
if v > 0 && (result == nil || v < *result) {
result = &v
}
}
if result == nil {
return 0
}
return *result
}
6 changes: 3 additions & 3 deletions pkg/validation/limits.go
Expand Up @@ -69,11 +69,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLocalSeriesPerTenant, "ingester.max-local-series-per-tenant", 0, "Maximum number of active series of profiles per tenant, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalSeriesPerTenant, "ingester.max-global-series-per-tenant", 5000, "Maximum number of active series of profiles per tenant, across the cluster. 0 to disable. When the global limit is enabled, each ingester is configured with a dynamic local limit based on the replication factor and the current number of healthy ingesters, and is kept updated whenever the number of ingesters change.")

_ = l.MaxQueryLength.Set("721h")
_ = l.MaxQueryLength.Set("24h")
f.Var(&l.MaxQueryLength, "querier.max-query-length", "The limit to length of queries. 0 to disable.")

_ = l.MaxQueryLookback.Set("0s")
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. The default value of 0 does not set a limit.")
_ = l.MaxQueryLookback.Set("7d")
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how far back in profiling data can be queried, up until lookback duration ago. This limit is enforced in the query frontend. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. 0 to disable, default to 7d.")

f.IntVar(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The tenant's shard size, used when store-gateway sharding is enabled. Value of 0 disables shuffle sharding for the tenant, that is all tenant blocks are sharded across all store-gateway replicas.")

Expand Down
21 changes: 21 additions & 0 deletions pkg/validation/testutil.go
@@ -0,0 +1,21 @@
package validation

import "time"

type MockLimits struct {
QuerySplitDurationValue time.Duration
MaxQueryParallelismValue int
MaxQueryLengthValue time.Duration
MaxQueryLookbackValue time.Duration
MaxLabelNameLengthValue int
MaxLabelValueLengthValue int
MaxLabelNamesPerSeriesValue int
}

func (m MockLimits) QuerySplitDuration(string) time.Duration { return m.QuerySplitDurationValue }
func (m MockLimits) MaxQueryParallelism(string) int { return m.MaxQueryParallelismValue }
func (m MockLimits) MaxQueryLength(tenantID string) time.Duration { return m.MaxQueryLengthValue }
func (m MockLimits) MaxQueryLookback(tenantID string) time.Duration { return m.MaxQueryLookbackValue }
func (m MockLimits) MaxLabelNameLength(userID string) int { return m.MaxLabelNameLengthValue }
func (m MockLimits) MaxLabelValueLength(userID string) int { return m.MaxLabelValueLengthValue }
func (m MockLimits) MaxLabelNamesPerSeries(userID string) int { return m.MaxLabelNamesPerSeriesValue }
54 changes: 54 additions & 0 deletions pkg/validation/validate.go
Expand Up @@ -4,14 +4,18 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"

typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
phlaremodel "github.com/grafana/phlare/pkg/model"
"github.com/grafana/phlare/pkg/util"
"github.com/grafana/phlare/pkg/util/validation"
)

type Reason string
Expand Down Expand Up @@ -39,6 +43,7 @@ const (
// SeriesLimit is a reason for discarding lines when we can't create a new stream
// because the limit of active streams has been reached.
SeriesLimit Reason = "series_limit"
QueryLimit Reason = "query_limit"

SeriesLimitErrorMsg = "Maximum active series limit exceeded (%d/%d), reduce the number of active streams (reduce labels or reduce label values), or contact your administrator to see if the limit can be increased"
MissingLabelsErrorMsg = "error at least one label pair is required per profile"
Expand All @@ -47,6 +52,7 @@ const (
LabelNameTooLongErrorMsg = "profile with labels '%s' has label name too long: '%s'"
LabelValueTooLongErrorMsg = "profile with labels '%s' has label value too long: '%s'"
DuplicateLabelNamesErrorMsg = "profile with labels '%s' has duplicate label name: '%s'"
QueryTooLongErrorMsg = "the query time range exceeds the limit (query length: %s, limit: %s)"
)

var (
Expand Down Expand Up @@ -136,3 +142,51 @@ func ReasonOf(err error) Reason {
}
return validationErr.Reason
}

type RangeRequestLimits interface {
MaxQueryLength(tenantID string) time.Duration
MaxQueryLookback(tenantID string) time.Duration
}

type ValidatedRangeRequest struct {
model.Interval
IsEmpty bool
}

func ValidateRangeRequest(limits RangeRequestLimits, tenantIDs []string, req model.Interval, now model.Time) (ValidatedRangeRequest, error) {
if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, limits.MaxQueryLookback); maxQueryLookback > 0 {
minStartTime := now.Add(-maxQueryLookback)

if req.End < minStartTime {
// The request is fully outside the allowed range, so we can return an
// empty response.
level.Debug(util.Logger).Log(
"msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting",
"reqStart", util.FormatTimeMillis(int64(req.Start)),
"redEnd", util.FormatTimeMillis(int64(req.End)),
"maxQueryLookback", maxQueryLookback)

return ValidatedRangeRequest{IsEmpty: true, Interval: req}, nil
}

if req.Start < minStartTime {
// Replace the start time in the request.
level.Debug(util.Logger).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
"original", util.FormatTimeMillis(int64(req.Start)),
"updated", util.FormatTimeMillis(int64(minStartTime)))

req.Start = minStartTime
}
}

// Enforce the max query length.
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, limits.MaxQueryLength); maxQueryLength > 0 {
queryLen := req.End.Sub(req.Start)
if queryLen > maxQueryLength {
return ValidatedRangeRequest{}, NewErrorf(QueryLimit, QueryTooLongErrorMsg, queryLen, model.Duration(maxQueryLength))
}
}

return ValidatedRangeRequest{Interval: req}, nil
}

0 comments on commit 5e04c41

Please sign in to comment.