/
limits.go
276 lines (222 loc) · 10.5 KB
/
limits.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/limits.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
package querymiddleware
import (
"context"
"fmt"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cancellation"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/model/timestamp"
"golang.org/x/sync/semaphore"
apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/util"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)
var errExecutingParallelQueriesFinished = cancellation.NewErrorf("executing parallel queries finished")
// Limits allows us to specify per-tenant runtime limits on the behavior of
// the query handling code.
type Limits interface {
// MaxQueryLookback returns the max lookback period of queries.
MaxQueryLookback(userID string) time.Duration
// MaxTotalQueryLength returns the limit of the length (in time) of a query.
MaxTotalQueryLength(userID string) time.Duration
// MaxQueryParallelism returns the limit to the number of split queries the
// frontend will process in parallel.
MaxQueryParallelism(userID string) int
// MaxQueryExpressionSizeBytes returns the limit of the max number of bytes long a raw
// query may be. 0 means "unlimited".
MaxQueryExpressionSizeBytes(userID string) int
// MaxCacheFreshness returns the period after which results are cacheable,
// to prevent caching of very recent results.
MaxCacheFreshness(userID string) time.Duration
// QueryShardingTotalShards returns the number of shards to use for a given tenant.
QueryShardingTotalShards(userID string) int
// QueryShardingMaxShardedQueries returns the max number of sharded queries that can
// be run for a given received query. 0 to disable limit.
QueryShardingMaxShardedQueries(userID string) int
// QueryShardingMaxRegexpSizeBytes returns the limit to the max number of bytes allowed
// for a regexp matcher in a shardable query. If a query contains a regexp matcher longer
// than this limit, the query will not be sharded. 0 to disable limit.
QueryShardingMaxRegexpSizeBytes(userID string) int
// SplitInstantQueriesByInterval returns the time interval to split instant queries for a given tenant.
SplitInstantQueriesByInterval(userID string) time.Duration
// CompactorSplitAndMergeShards returns the number of shards to use when splitting blocks
// This method is copied from compactor.ConfigProvider.
CompactorSplitAndMergeShards(userID string) int
// CompactorBlocksRetentionPeriod returns the retention period for a given user.
CompactorBlocksRetentionPeriod(userID string) time.Duration
// OutOfOrderTimeWindow returns the out-of-order time window for the user.
OutOfOrderTimeWindow(userID string) time.Duration
// CreationGracePeriod returns the time interval to control how far into the future
// incoming samples are accepted compared to the wall clock.
CreationGracePeriod(userID string) time.Duration
// NativeHistogramsIngestionEnabled returns whether to ingest native histograms in the ingester
NativeHistogramsIngestionEnabled(userID string) bool
// ResultsCacheTTL returns TTL for cached results for query that doesn't fall into out of order window, or
// if out of order ingestion is disabled.
ResultsCacheTTL(userID string) time.Duration
// ResultsCacheTTLForOutOfOrderTimeWindow returns TTL for cached results for query that falls into out-of-order ingestion window.
ResultsCacheTTLForOutOfOrderTimeWindow(userID string) time.Duration
// ResultsCacheTTLForCardinalityQuery returns TTL for cached results for cardinality queries.
ResultsCacheTTLForCardinalityQuery(userID string) time.Duration
// ResultsCacheTTLForLabelsQuery returns TTL for cached results for label names and values queries.
ResultsCacheTTLForLabelsQuery(userID string) time.Duration
// ResultsCacheForUnalignedQueryEnabled returns whether to cache results for queries that are not step-aligned
ResultsCacheForUnalignedQueryEnabled(userID string) bool
// BlockedQueries returns the blocked queries.
BlockedQueries(userID string) []*validation.BlockedQuery
// AlignQueriesWithStep returns if queries should be adjusted to be step-aligned
AlignQueriesWithStep(userID string) bool
// QueryIngestersWithin returns the maximum lookback beyond which queries are not sent to ingester.
QueryIngestersWithin(userID string) time.Duration
}
type limitsMiddleware struct {
Limits
next MetricsQueryHandler
logger log.Logger
}
// newLimitsMiddleware creates a new MetricsQueryMiddleware that enforces query limits.
func newLimitsMiddleware(l Limits, logger log.Logger) MetricsQueryMiddleware {
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
return limitsMiddleware{
next: next,
Limits: l,
logger: logger,
}
})
}
func (l limitsMiddleware) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) {
log, ctx := spanlogger.NewWithLogger(ctx, l.logger, "limits")
defer log.Finish()
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, apierror.New(apierror.TypeBadData, err.Error())
}
// Clamp the time range based on the max query lookback and block retention period.
blocksRetentionPeriod := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.CompactorBlocksRetentionPeriod)
maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLookback)
maxLookback := util_math.Min(blocksRetentionPeriod, maxQueryLookback)
if maxLookback > 0 {
minStartTime := util.TimeToMillis(time.Now().Add(-maxLookback))
if r.GetEnd() < minStartTime {
// The request is fully outside the allowed range, so we can return an
// empty response.
level.Debug(log).Log(
"msg", "skipping the execution of the query because its time range is before the 'max query lookback' or 'blocks retention period' setting",
"reqStart", util.FormatTimeMillis(r.GetStart()),
"redEnd", util.FormatTimeMillis(r.GetEnd()),
"maxQueryLookback", maxQueryLookback,
"blocksRetentionPeriod", blocksRetentionPeriod)
return newEmptyPrometheusResponse(), nil
}
if r.GetStart() < minStartTime {
// Replace the start time in the request.
level.Debug(log).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' or 'blocks retention period' setting",
"original", util.FormatTimeMillis(r.GetStart()),
"updated", util.FormatTimeMillis(minStartTime),
"maxQueryLookback", maxQueryLookback,
"blocksRetentionPeriod", blocksRetentionPeriod)
r = r.WithStartEnd(minStartTime, r.GetEnd())
}
}
// Enforce max query size, in bytes.
if maxQuerySize := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, l.MaxQueryExpressionSizeBytes); maxQuerySize > 0 {
querySize := len(r.GetQuery())
if querySize > maxQuerySize {
return nil, newMaxQueryExpressionSizeBytesError(querySize, maxQuerySize)
}
}
// Enforce the max query length.
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxTotalQueryLength); maxQueryLength > 0 {
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
if queryLen > maxQueryLength {
return nil, newMaxTotalQueryLengthError(queryLen, maxQueryLength)
}
}
return l.next.Do(ctx, r)
}
type limitedParallelismRoundTripper struct {
downstream MetricsQueryHandler
limits Limits
codec Codec
middleware MetricsQueryMiddleware
}
// newLimitedParallelismRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`.
func newLimitedParallelismRoundTripper(next http.RoundTripper, codec Codec, limits Limits, middlewares ...MetricsQueryMiddleware) http.RoundTripper {
return limitedParallelismRoundTripper{
downstream: roundTripperHandler{
next: next,
codec: codec,
},
codec: codec,
limits: limits,
middleware: MergeMetricsQueryMiddlewares(middlewares...),
}
}
func (rt limitedParallelismRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
ctx, cancel := context.WithCancelCause(r.Context())
defer cancel(errExecutingParallelQueriesFinished)
request, err := rt.codec.DecodeMetricsQueryRequest(ctx, r)
if err != nil {
return nil, err
}
if span := opentracing.SpanFromContext(ctx); span != nil {
request.AddSpanTags(span)
}
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, apierror.New(apierror.TypeBadData, err.Error())
}
// Limit the amount of parallel sub-requests according to the MaxQueryParallelism tenant setting.
parallelism := validation.SmallestPositiveIntPerTenant(tenantIDs, rt.limits.MaxQueryParallelism)
sem := semaphore.NewWeighted(int64(parallelism))
// Wraps middlewares with a final handler, which will receive sub-requests in
// parallel from upstream handlers and ensure that no more than MaxQueryParallelism
// sub-requests run in parallel.
response, err := rt.middleware.Wrap(
HandlerFunc(func(ctx context.Context, r MetricsQueryRequest) (Response, error) {
if err := sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("could not acquire work: %w", err)
}
defer sem.Release(1)
return rt.downstream.Do(ctx, r)
})).Do(ctx, request)
if err != nil {
return nil, err
}
return rt.codec.EncodeResponse(ctx, r, response)
}
// roundTripperHandler is an adapter that implements the MetricsQueryHandler interface using a http.RoundTripper to perform
// the requests and a Codec to translate between http Request/Response model and this package's Request/Response model.
// It basically encodes a MetricsQueryRequest from MetricsQueryHandler.Do and decodes response from next roundtripper.
type roundTripperHandler struct {
logger log.Logger
next http.RoundTripper
codec Codec
}
func (rth roundTripperHandler) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) {
request, err := rth.codec.EncodeMetricsQueryRequest(ctx, r)
if err != nil {
return nil, err
}
if err := user.InjectOrgIDIntoHTTPRequest(ctx, request); err != nil {
return nil, apierror.New(apierror.TypeBadData, err.Error())
}
response, err := rth.next.RoundTrip(request)
if err != nil {
return nil, err
}
defer func() { _ = response.Body.Close() }()
return rth.codec.DecodeResponse(ctx, response, r, rth.logger)
}