From 1fc1581ff397d45354c92e3f05142530aa05bbe9 Mon Sep 17 00:00:00 2001 From: Erlan Zholdubai uulu Date: Fri, 29 Aug 2025 15:33:41 -0700 Subject: [PATCH 1/2] remove limiting query rejection to only adhoc queries for ingester. This shouldn't have been included as discussed in PR #6947 Signed-off-by: Erlan Zholdubai uulu --- pkg/ingester/ingester.go | 13 ++++++------- pkg/ingester/ingester_test.go | 8 -------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cda8e23f57b..c2dab4a54ec 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -63,7 +63,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" logutil "github.com/cortexproject/cortex/pkg/util/log" util_math "github.com/cortexproject/cortex/pkg/util/math" - "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" @@ -1697,7 +1696,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery } // We will report *this* request in the error too. - c, err := i.trackInflightQueryRequest(ctx) + c, err := i.trackInflightQueryRequest() if err != nil { return nil, err } @@ -1805,7 +1804,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu q.Close() } - c, err := i.trackInflightQueryRequest(ctx) + c, err := i.trackInflightQueryRequest() if err != nil { return nil, cleanup, err } @@ -1902,7 +1901,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR q.Close() } - c, err := i.trackInflightQueryRequest(ctx) + c, err := i.trackInflightQueryRequest() if err != nil { return nil, cleanup, err } @@ -2253,7 +2252,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } -func (i *Ingester) trackInflightQueryRequest(ctx context.Context) (func(), error) { +func (i *Ingester) trackInflightQueryRequest() (func(), error) { gl := i.getInstanceLimits() if gl != nil && gl.MaxInflightQueryRequests > 0 { if i.inflightQueryRequests.Load() >= gl.MaxInflightQueryRequests { @@ -2263,7 +2262,7 @@ func (i *Ingester) trackInflightQueryRequest(ctx context.Context) (func(), error i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) - if i.resourceBasedLimiter != nil && !requestmeta.RequestFromRuler(ctx) { + if i.resourceBasedLimiter != nil { if err := i.resourceBasedLimiter.AcceptNewRequest(); err != nil { level.Warn(i.logger).Log("msg", "failed to accept request", "err", err) return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "failed to query: %s", limiter.ErrResourceLimitReachedStr) @@ -2283,7 +2282,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th } defer q.Close() - c, err := i.trackInflightQueryRequest(ctx) + c, err := i.trackInflightQueryRequest() if err != nil { return 0, 0, 0, 0, err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index e5934ea9722..c59879a1d84 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -61,7 +61,6 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/limiter" - "github.com/cortexproject/cortex/pkg/util/requestmeta" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -3228,18 +3227,11 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { } rreq := &client.QueryRequest{} - ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceAPI) s := &mockQueryStreamServer{ctx: ctx} err = i.QueryStream(rreq, s) require.Error(t, err) exhaustedErr := limiter.ResourceLimitReachedError{} require.ErrorContains(t, err, exhaustedErr.Error()) - - // we shouldn't reject queries from ruler - ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler) - s = &mockQueryStreamServer{ctx: ctx} - err = i.QueryStream(rreq, s) - require.Nil(t, err) } func TestIngester_LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { From faa3f17f2b3c4b30e695857df5ad77d2178c9156 Mon Sep 17 00:00:00 2001 From: Erlan Zholdubai uulu Date: Fri, 29 Aug 2025 18:53:49 -0700 Subject: [PATCH 2/2] bug fixes for query rejection. fix metric and remove unused enabled flag. Signed-off-by: Erlan Zholdubai uulu --- docs/blocks-storage/store-gateway.md | 6 ------ docs/configuration/config-file-reference.md | 12 ------------ pkg/configs/query_protection.go | 2 -- pkg/frontend/transport/handler.go | 5 ++++- pkg/frontend/transport/handler_test.go | 4 ++-- 5 files changed, 6 insertions(+), 23 deletions(-) diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index f3ec87e1b53..d3ea623e80a 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -357,12 +357,6 @@ store_gateway: query_protection: rejection: - # EXPERIMENTAL: Enable query rejection feature, where the component return - # 503 to all incoming query requests when the configured thresholds are - # breached. - # CLI flag: -store-gateway.query-protection.rejection.enabled - [enabled: | default = false] - threshold: # EXPERIMENTAL: Max CPU utilization that this ingester can reach before # rejecting new query request (across all tenants) in percentage, diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index be3b38f6e65..a72fafa4e61 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3731,12 +3731,6 @@ instance_limits: query_protection: rejection: - # EXPERIMENTAL: Enable query rejection feature, where the component return - # 503 to all incoming query requests when the configured thresholds are - # breached. - # CLI flag: -ingester.query-protection.rejection.enabled - [enabled: | default = false] - threshold: # EXPERIMENTAL: Max CPU utilization that this ingester can reach before # rejecting new query request (across all tenants) in percentage, between @@ -6472,12 +6466,6 @@ sharding_ring: query_protection: rejection: - # EXPERIMENTAL: Enable query rejection feature, where the component return - # 503 to all incoming query requests when the configured thresholds are - # breached. - # CLI flag: -store-gateway.query-protection.rejection.enabled - [enabled: | default = false] - threshold: # EXPERIMENTAL: Max CPU utilization that this ingester can reach before # rejecting new query request (across all tenants) in percentage, between diff --git a/pkg/configs/query_protection.go b/pkg/configs/query_protection.go index 2dd353580db..756a9f3620b 100644 --- a/pkg/configs/query_protection.go +++ b/pkg/configs/query_protection.go @@ -14,7 +14,6 @@ type QueryProtection struct { } type rejection struct { - Enabled bool `yaml:"enabled"` Threshold threshold `yaml:"threshold"` } @@ -24,7 +23,6 @@ type threshold struct { } func (cfg *QueryProtection) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.BoolVar(&cfg.Rejection.Enabled, prefix+"query-protection.rejection.enabled", false, "EXPERIMENTAL: Enable query rejection feature, where the component return 503 to all incoming query requests when the configured thresholds are breached.") f.Float64Var(&cfg.Rejection.Threshold.CPUUtilization, prefix+"query-protection.rejection.threshold.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") f.Float64Var(&cfg.Rejection.Threshold.HeapUtilization, prefix+"query-protection.rejection.threshold.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index c91e23d99f2..edce4839413 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -584,7 +584,10 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query reason = reasonChunksLimitStoreGateway } else if strings.Contains(errMsg, limitBytesStoreGateway) { reason = reasonBytesLimitStoreGateway - } else if strings.Contains(errMsg, limiter.ErrResourceLimitReachedStr) { + } + } else if statusCode == http.StatusServiceUnavailable && error != nil { + errMsg := error.Error() + if strings.Contains(errMsg, limiter.ErrResourceLimitReachedStr) { reason = reasonResourceExhausted } } diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 8d1cada831f..b04882f893d 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -390,7 +390,7 @@ func TestHandler_ServeHTTP(t *testing.T) { roundTripperFunc: roundTripperFunc(func(req *http.Request) (*http.Response, error) { resourceLimitReachedErr := &limiter.ResourceLimitReachedError{} return &http.Response{ - StatusCode: http.StatusUnprocessableEntity, + StatusCode: http.StatusServiceUnavailable, Body: io.NopCloser(strings.NewReader(resourceLimitReachedErr.Error())), }, nil }), @@ -398,7 +398,7 @@ func TestHandler_ServeHTTP(t *testing.T) { v := promtest.ToFloat64(h.rejectedQueries.WithLabelValues(reasonResourceExhausted, requestmeta.SourceAPI, userID)) assert.Equal(t, float64(1), v) }, - expectedStatusCode: http.StatusUnprocessableEntity, + expectedStatusCode: http.StatusServiceUnavailable, }, { name: "test cortex_slow_queries_total",