/
split_by_interval.go
297 lines (255 loc) · 7.48 KB
/
split_by_interval.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package queryrange
import (
"context"
"net/http"
"time"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/credativ/vali/pkg/logproto"
)
type valiResult struct {
req queryrange.Request
ch chan *packedResp
}
type packedResp struct {
resp queryrange.Response
err error
}
type SplitByMetrics struct {
splits prometheus.Histogram
}
func NewSplitByMetrics(r prometheus.Registerer) *SplitByMetrics {
return &SplitByMetrics{
splits: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "vali",
Name: "query_frontend_partitions",
Help: "Number of time-based partitions (sub-requests) per request",
Buckets: prometheus.ExponentialBuckets(1, 4, 5), // 1 -> 1024
}),
}
}
type splitByInterval struct {
next queryrange.Handler
limits Limits
merger queryrange.Merger
metrics *SplitByMetrics
splitter Splitter
}
type Splitter func(req queryrange.Request, interval time.Duration) []queryrange.Request
// SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval.
func SplitByIntervalMiddleware(limits Limits, merger queryrange.Merger, splitter Splitter, metrics *SplitByMetrics) queryrange.Middleware {
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return &splitByInterval{
next: next,
limits: limits,
merger: merger,
metrics: metrics,
splitter: splitter,
}
})
}
func (h *splitByInterval) Feed(ctx context.Context, input []*valiResult) chan *valiResult {
ch := make(chan *valiResult)
go func() {
defer close(ch)
for _, d := range input {
select {
case <-ctx.Done():
return
case ch <- d:
continue
}
}
}()
return ch
}
func (h *splitByInterval) Process(
ctx context.Context,
parallelism int,
threshold int64,
input []*valiResult,
userID string,
) ([]queryrange.Response, error) {
var responses []queryrange.Response
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ch := h.Feed(ctx, input)
// queries with 0 limits should not be exited early
var unlimited bool
if threshold == 0 {
unlimited = true
}
// don't spawn unnecessary goroutines
var p int = parallelism
if len(input) < parallelism {
p = len(input)
}
// per request wrapped handler for limiting the amount of series.
next := newSeriesLimiter(h.limits.MaxQuerySeries(userID)).Wrap(h.next)
for i := 0; i < p; i++ {
go h.loop(ctx, ch, next)
}
for _, x := range input {
select {
case <-ctx.Done():
return nil, ctx.Err()
case data := <-x.ch:
if data.err != nil {
return nil, data.err
}
responses = append(responses, data.resp)
// see if we can exit early if a limit has been reached
if casted, ok := data.resp.(*ValiResponse); !unlimited && ok {
threshold -= casted.Count()
if threshold <= 0 {
return responses, nil
}
}
}
}
return responses, nil
}
func (h *splitByInterval) loop(ctx context.Context, ch <-chan *valiResult, next queryrange.Handler) {
for data := range ch {
sp, ctx := opentracing.StartSpanFromContext(ctx, "interval")
data.req.LogToSpan(sp)
resp, err := next.Do(ctx, data.req)
select {
case <-ctx.Done():
sp.Finish()
return
case data.ch <- &packedResp{resp, err}:
sp.Finish()
}
}
}
func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
userid, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
interval := h.limits.QuerySplitDuration(userid)
// skip split by if unset
if interval == 0 {
return h.next.Do(ctx, r)
}
intervals := h.splitter(r, interval)
h.metrics.splits.Observe(float64(len(intervals)))
// no interval should not be processed by the frontend.
if len(intervals) == 0 {
return h.next.Do(ctx, r)
}
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogFields(otlog.Int("n_intervals", len(intervals)))
}
var limit int64
switch req := r.(type) {
case *ValiRequest:
limit = int64(req.Limit)
if req.Direction == logproto.BACKWARD {
for i, j := 0, len(intervals)-1; i < j; i, j = i+1, j-1 {
intervals[i], intervals[j] = intervals[j], intervals[i]
}
}
case *ValiSeriesRequest, *ValiLabelNamesRequest:
// Set this to 0 since this is not used in Series/Labels Request.
limit = 0
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, "unknown request type")
}
input := make([]*valiResult, 0, len(intervals))
for _, interval := range intervals {
input = append(input, &valiResult{
req: interval,
ch: make(chan *packedResp),
})
}
resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input, userid)
if err != nil {
return nil, err
}
return h.merger.MergeResponse(resps...)
}
func splitByTime(req queryrange.Request, interval time.Duration) []queryrange.Request {
var reqs []queryrange.Request
switch r := req.(type) {
case *ValiRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
reqs = append(reqs, &ValiRequest{
Query: r.Query,
Limit: r.Limit,
Step: r.Step,
Direction: r.Direction,
Path: r.Path,
StartTs: start,
EndTs: end,
})
})
case *ValiSeriesRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
reqs = append(reqs, &ValiSeriesRequest{
Match: r.Match,
Path: r.Path,
StartTs: start,
EndTs: end,
})
})
case *ValiLabelNamesRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
reqs = append(reqs, &ValiLabelNamesRequest{
Path: r.Path,
StartTs: start,
EndTs: end,
})
})
default:
return nil
}
return reqs
}
func forInterval(interval time.Duration, start, end time.Time, callback func(start, end time.Time)) {
for start := start; start.Before(end); start = start.Add(interval) {
newEnd := start.Add(interval)
if newEnd.After(end) {
newEnd = end
}
callback(start, newEnd)
}
}
func splitMetricByTime(r queryrange.Request, interval time.Duration) []queryrange.Request {
var reqs []queryrange.Request
valiReq := r.(*ValiRequest)
for start := valiReq.StartTs; start.Before(valiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(valiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == valiReq.EndTs {
end = valiReq.EndTs
}
reqs = append(reqs, &ValiRequest{
Query: valiReq.Query,
Limit: valiReq.Limit,
Step: valiReq.Step,
Direction: valiReq.Direction,
Path: valiReq.Path,
StartTs: start,
EndTs: end,
})
}
return reqs
}
// Round up to the step before the next interval boundary.
func nextIntervalBoundary(t time.Time, step int64, interval time.Duration) time.Time {
stepNs := step * 1e6
nsPerInterval := interval.Nanoseconds()
startOfNextInterval := ((t.UnixNano() / nsPerInterval) + 1) * nsPerInterval
// ensure that target is a multiple of steps away from the start time
target := startOfNextInterval - ((startOfNextInterval - t.UnixNano()) % stepNs)
if target == startOfNextInterval {
target -= stepNs
}
return time.Unix(0, target)
}