Skip to content

Commit

Permalink
Fixes @ modifier when splitting queries by time. (cortexproject#4464)
Browse files Browse the repository at this point in the history
* Fixes @ modifier when splitting queries by time.

This will replace `start` and `end` at (`@`) modifier with the actual constant values based on the original queries.
Meaning subqueries will not wrongly use their own query start and end time.

Fixes cortexproject#4463

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update changelog.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Signed-off-by: Alvin Lin <alvinlin@amazon.com>
  • Loading branch information
cyriltovena authored and alvinlin123 committed Jan 14, 2022
1 parent ed3589a commit 7db58bf
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -55,6 +55,7 @@
* [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380
* [ENHANCEMENT] Exemplars are now emitted for all gRPC calls and many operations tracked by histograms. #4462
* [ENHANCEMENT] New options `-server.http-listen-network` and `-server.grpc-listen-network` allow binding as 'tcp4' or 'tcp6'. #4462
* [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #4464
* [BUGFIX] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
Expand Down
41 changes: 37 additions & 4 deletions pkg/querier/queryrange/split_by_interval.go
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
)

type IntervalFn func(r Request) time.Duration
Expand Down Expand Up @@ -40,7 +41,10 @@ type splitByInterval struct {
func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs := splitQuery(r, s.interval(r))
reqs, err := splitQuery(r, s.interval(r))
if err != nil {
return nil, err
}
s.splitByCounter.Add(float64(len(reqs)))

reqResps, err := DoRequests(ctx, s.next, reqs, s.limits)
Expand All @@ -60,17 +64,46 @@ func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) {
return response, nil
}

func splitQuery(r Request, interval time.Duration) []Request {
func splitQuery(r Request, interval time.Duration) ([]Request, error) {
// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
var reqs []Request
for start := r.GetStart(); start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end+r.GetStep() >= r.GetEnd() {
end = r.GetEnd()
}

reqs = append(reqs, r.WithStartEnd(start, end))
reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end))
}
return reqs
return reqs, nil
}

// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
// If the modifier is already a constant, it will be returned as is.
func evaluateAtModifierFunction(query string, start, end int64) (string, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return "", err
}
parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error {
if selector, ok := n.(*parser.VectorSelector); ok {
switch selector.StartOrEnd {
case parser.START:
selector.Timestamp = &start
case parser.END:
selector.Timestamp = &end
}
selector.StartOrEnd = 0
}
return nil
})
return expr.String(), err
}

// Round up to the step before the next interval boundary.
Expand Down
63 changes: 57 additions & 6 deletions pkg/querier/queryrange/split_by_interval_test.go
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -135,20 +136,20 @@ func TestSplitQuery(t *testing.T) {
Start: 0,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
Query: "foo @ start()",
},
expected: []Request{
&PrometheusRequest{
Start: 0,
End: (24 * 3600 * seconds) - (15 * seconds),
Step: 15 * seconds,
Query: "foo",
Query: "foo @ 0.000",
},
&PrometheusRequest{
Start: 24 * 3600 * seconds,
End: 2 * 24 * 3600 * seconds,
Step: 15 * seconds,
Query: "foo",
Query: "foo @ 0.000",
},
},
interval: day,
Expand Down Expand Up @@ -236,14 +237,14 @@ func TestSplitQuery(t *testing.T) {
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
days := splitQuery(tc.input, tc.interval)
days, err := splitQuery(tc.input, tc.interval)
require.NoError(t, err)
require.Equal(t, tc.expected, days)
})
}
}

func TestSplitByDay(t *testing.T) {

mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse)
require.NoError(t, err)

Expand All @@ -260,7 +261,6 @@ func TestSplitByDay(t *testing.T) {
{query, string(mergedHTTPResponseBody), 2},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {

var actualCount atomic.Int32
s := httptest.NewServer(
middleware.AuthenticateUser.Wrap(
Expand Down Expand Up @@ -298,3 +298,54 @@ func TestSplitByDay(t *testing.T) {
})
}
}

func Test_evaluateAtModifier(t *testing.T) {
const (
start, end = int64(1546300800), int64(1646300800)
)
for _, tt := range []struct {
in, expected string
}{
{"topk(5, rate(http_requests_total[1h] @ start()))", "topk(5, rate(http_requests_total[1h] @ 1546300.800))"},
{"topk(5, rate(http_requests_total[1h] @ 0))", "topk(5, rate(http_requests_total[1h] @ 0.000))"},
{"http_requests_total[1h] @ 10.001", "http_requests_total[1h] @ 10.001"},
{
`min_over_time(
sum by(cluster) (
rate(http_requests_total[5m] @ end())
)[10m:]
)
or
max_over_time(
stddev_over_time(
deriv(
rate(http_requests_total[10m] @ start())
[5m:1m])
[2m:])
[10m:])`,
`min_over_time(
sum by(cluster) (
rate(http_requests_total[5m] @ 1646300.800)
)[10m:]
)
or
max_over_time(
stddev_over_time(
deriv(
rate(http_requests_total[10m] @ 1546300.800)
[5m:1m])
[2m:])
[10m:])`,
},
} {
tt := tt
t.Run(tt.in, func(t *testing.T) {
t.Parallel()
expectedExpr, err := parser.ParseExpr(tt.expected)
require.NoError(t, err)
out, err := evaluateAtModifierFunction(tt.in, start, end)
require.NoError(t, err)
require.Equal(t, expectedExpr.String(), out)
})
}
}

0 comments on commit 7db58bf

Please sign in to comment.