-
Notifications
You must be signed in to change notification settings - Fork 781
/
split_by_interval.go
133 lines (117 loc) · 4.53 KB
/
split_by_interval.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package queryrange
import (
"context"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
)
type IntervalFn func(r tripperware.Request) time.Duration
// SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval.
func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer) tripperware.Middleware {
return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
return splitByInterval{
next: next,
limits: limits,
merger: merger,
interval: interval,
splitByCounter: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "frontend_split_queries_total",
Help: "Total number of underlying query requests after the split by interval is applied",
}),
}
})
}
type splitByInterval struct {
next tripperware.Handler
limits tripperware.Limits
merger tripperware.Merger
interval IntervalFn
// Metrics.
splitByCounter prometheus.Counter
}
func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs, err := splitQuery(r, s.interval(r))
if err != nil {
// If the query itself is bad, we don't return error but send the query
// to querier to return the expected error message. This is not very efficient
// but should be okay for now.
// TODO(yeya24): query frontend can reuse the Prometheus API handler and return
// expected error message locally without passing it to the querier through network.
return s.next.Do(ctx, r)
}
s.splitByCounter.Add(float64(len(reqs)))
reqResps, err := tripperware.DoRequests(ctx, s.next, reqs, s.limits)
if err != nil {
return nil, err
}
resps := make([]tripperware.Response, 0, len(reqResps))
for _, reqResp := range reqResps {
resps = append(resps, reqResp.Response)
}
response, err := s.merger.MergeResponse(ctx, nil, resps...)
if err != nil {
return nil, err
}
return response, nil
}
func splitQuery(r tripperware.Request, interval time.Duration) ([]tripperware.Request, error) {
// If Start == end we should just run the original request
if r.GetStart() == r.GetEnd() {
return []tripperware.Request{r}, nil
}
// Replace @ modifier function to their respective constant values in the query.
// This way subqueries will be evaluated at the same time as the parent query.
query, err := evaluateAtModifierFunction(r.GetQuery(), r.GetStart(), r.GetEnd())
if err != nil {
return nil, err
}
var reqs []tripperware.Request
for start := r.GetStart(); start < r.GetEnd(); start = nextIntervalBoundary(start, r.GetStep(), interval) + r.GetStep() {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end+r.GetStep() >= r.GetEnd() {
end = r.GetEnd()
}
reqs = append(reqs, r.WithQuery(query).WithStartEnd(start, end))
}
return reqs, nil
}
// evaluateAtModifierFunction parse the query and evaluates the `start()` and `end()` at modifier functions into actual constant timestamps.
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
// If the modifier is already a constant, it will be returned as is.
func evaluateAtModifierFunction(query string, start, end int64) (string, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, "%s", err)
}
parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error {
if selector, ok := n.(*parser.VectorSelector); ok {
switch selector.StartOrEnd {
case parser.START:
selector.Timestamp = &start
case parser.END:
selector.Timestamp = &end
}
selector.StartOrEnd = 0
}
return nil
})
return expr.String(), err
}
// Round up to the step before the next interval boundary.
func nextIntervalBoundary(t, step int64, interval time.Duration) int64 {
msPerInterval := int64(interval / time.Millisecond)
startOfNextInterval := ((t / msPerInterval) + 1) * msPerInterval
// ensure that target is a multiple of steps away from the start time
target := startOfNextInterval - ((startOfNextInterval - t) % step)
if target == startOfNextInterval {
target -= step
}
return target
}