diff --git a/CHANGELOG.md b/CHANGELOG.md index 881e891539a..260094c0a3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ * [ENHANCEMENT] Add a metrics `cortex_limits_defaults` to expose the default values of limits. #173 * [ENHANCEMENT] Exemplars are now emitted for all gRPC calls and many operations tracked by histograms. #180 * [ENHANCEMENT] New options `-server.http-listen-network` and `-server.grpc-listen-network` allow binding as 'tcp4' or 'tcp6'. #180 +* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #206 * [BUGFIX] Fixes a panic in the query-tee when comparing result. #207 * [BUGFIX] Upgrade Prometheus. TSDB now waits for pending readers before truncating Head block, fixing the `chunk not found` error and preventing wrong query results. #16 * [BUGFIX] Compactor: fixed panic while collecting Prometheus metrics. #28 diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index ead7c594a91..a4b959848d5 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/promql/parser" ) type IntervalFn func(r Request) time.Duration @@ -45,7 +46,10 @@ type splitByInterval struct { func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) { // First we're going to build new requests, one for each day, taking care // to line up the boundaries with step. - reqs := splitQuery(r, s.interval(r)) + reqs, err := splitQuery(r, s.interval(r)) + if err != nil { + return nil, err + } s.splitByCounter.Add(float64(len(reqs))) reqResps, err := DoRequests(ctx, s.next, reqs, s.limits) @@ -65,7 +69,13 @@ func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) { return response, nil } -func splitQuery(r Request, interval time.Duration) []Request { +func splitQuery(r Request, interval time.Duration) ([]Request, error) { + // Replace @ modifier function to their respective constant values in the query. + // This way subqueries will be evaluated at the same time as the parent query. + query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd()) + if err != nil { + return nil, err + } var reqs []Request for start := r.GetStart(); start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() { end := nextIntervalBoundary(start, r.GetStep(), interval) @@ -73,9 +83,32 @@ func splitQuery(r Request, interval time.Duration) []Request { end = r.GetEnd() } - reqs = append(reqs, r.WithStartEnd(start, end)) + reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end)) } - return reqs + return reqs, nil +} + +// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps. +// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00` +// If the modifier is already a constant, it will be returned as is. +func evaluateAtModifierFunction(query string, start, end int64) (string, error) { + expr, err := parser.ParseExpr(query) + if err != nil { + return "", err + } + parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { + if selector, ok := n.(*parser.VectorSelector); ok { + switch selector.StartOrEnd { + case parser.START: + selector.Timestamp = &start + case parser.END: + selector.Timestamp = &end + } + selector.StartOrEnd = 0 + } + return nil + }) + return expr.String(), err } // Round up to the step before the next interval boundary. diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 7a7cb4cada3..6ef4b88f246 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -140,20 +141,20 @@ func TestSplitQuery(t *testing.T) { Start: 0, End: 2 * 24 * 3600 * seconds, Step: 15 * seconds, - Query: "foo", + Query: "foo @ start()", }, expected: []Request{ &PrometheusRequest{ Start: 0, End: (24 * 3600 * seconds) - (15 * seconds), Step: 15 * seconds, - Query: "foo", + Query: "foo @ 0.000", }, &PrometheusRequest{ Start: 24 * 3600 * seconds, End: 2 * 24 * 3600 * seconds, Step: 15 * seconds, - Query: "foo", + Query: "foo @ 0.000", }, }, interval: day, @@ -241,14 +242,14 @@ func TestSplitQuery(t *testing.T) { }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - days := splitQuery(tc.input, tc.interval) + days, err := splitQuery(tc.input, tc.interval) + require.NoError(t, err) require.Equal(t, tc.expected, days) }) } } func TestSplitByDay(t *testing.T) { - mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse) require.NoError(t, err) @@ -265,7 +266,6 @@ func TestSplitByDay(t *testing.T) { {query, string(mergedHTTPResponseBody), 2}, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - var actualCount atomic.Int32 s := httptest.NewServer( middleware.AuthenticateUser.Wrap( @@ -303,3 +303,54 @@ func TestSplitByDay(t *testing.T) { }) } } + +func Test_evaluateAtModifier(t *testing.T) { + const ( + start, end = int64(1546300800), int64(1646300800) + ) + for _, tt := range []struct { + in, expected string + }{ + {"topk(5, rate(http_requests_total[1h] @ start()))", "topk(5, rate(http_requests_total[1h] @ 1546300.800))"}, + {"topk(5, rate(http_requests_total[1h] @ 0))", "topk(5, rate(http_requests_total[1h] @ 0.000))"}, + {"http_requests_total[1h] @ 10.001", "http_requests_total[1h] @ 10.001"}, + { + `min_over_time( + sum by(cluster) ( + rate(http_requests_total[5m] @ end()) + )[10m:] + ) + or + max_over_time( + stddev_over_time( + deriv( + rate(http_requests_total[10m] @ start()) + [5m:1m]) + [2m:]) + [10m:])`, + `min_over_time( + sum by(cluster) ( + rate(http_requests_total[5m] @ 1646300.800) + )[10m:] + ) + or + max_over_time( + stddev_over_time( + deriv( + rate(http_requests_total[10m] @ 1546300.800) + [5m:1m]) + [2m:]) + [10m:])`, + }, + } { + tt := tt + t.Run(tt.in, func(t *testing.T) { + t.Parallel() + expectedExpr, err := parser.ParseExpr(tt.expected) + require.NoError(t, err) + out, err := evaluateAtModifierFunction(tt.in, start, end) + require.NoError(t, err) + require.Equal(t, expectedExpr.String(), out) + }) + } +}