forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shard_summer.go
314 lines (271 loc) · 8.42 KB
/
shard_summer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
package astmapper
import (
"fmt"
"regexp"
"strconv"
"strings"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)
const (
// ShardLabel is a reserved label referencing a cortex shard
ShardLabel = "__cortex_shard__"
// ShardLabelFmt is the fmt of the ShardLabel key.
ShardLabelFmt = "%d_of_%d"
)
var (
// ShardLabelRE matches a value in ShardLabelFmt
ShardLabelRE = regexp.MustCompile("^[0-9]+_of_[0-9]+$")
)
type squasher = func(...parser.Node) (parser.Expr, error)
type shardSummer struct {
shards int
currentShard *int
squash squasher
// Metrics.
shardedQueries prometheus.Counter
}
// NewShardSummer instantiates an ASTMapper which will fan out sum queries by shard
func NewShardSummer(shards int, squasher squasher, shardedQueries prometheus.Counter) (ASTMapper, error) {
if squasher == nil {
return nil, errors.Errorf("squasher required and not passed")
}
return NewASTNodeMapper(&shardSummer{
shards: shards,
squash: squasher,
currentShard: nil,
shardedQueries: shardedQueries,
}), nil
}
// CopyWithCurShard clones a shardSummer with a new current shard.
func (summer *shardSummer) CopyWithCurShard(curshard int) *shardSummer {
s := *summer
s.currentShard = &curshard
return &s
}
// shardSummer expands a query AST by sharding and re-summing when possible
func (summer *shardSummer) MapNode(node parser.Node) (parser.Node, bool, error) {
switch n := node.(type) {
case *parser.AggregateExpr:
if CanParallelize(n) && n.Op == parser.SUM {
result, err := summer.shardSum(n)
return result, true, err
}
return n, false, nil
case *parser.VectorSelector:
if summer.currentShard != nil {
mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, n)
return mapped, true, err
}
return n, true, nil
case *parser.MatrixSelector:
if summer.currentShard != nil {
mapped, err := shardMatrixSelector(*summer.currentShard, summer.shards, n)
return mapped, true, err
}
return n, true, nil
default:
return n, false, nil
}
}
// shardSum contains the logic for how we split/stitch legs of a parallelized sum query
func (summer *shardSummer) shardSum(expr *parser.AggregateExpr) (parser.Node, error) {
parent, subSums, err := summer.splitSum(expr)
if err != nil {
return nil, err
}
combinedSums, err := summer.squash(subSums...)
if err != nil {
return nil, err
}
parent.Expr = combinedSums
return parent, nil
}
// splitSum forms the parent and child legs of a parallel query
func (summer *shardSummer) splitSum(
expr *parser.AggregateExpr,
) (
parent *parser.AggregateExpr,
children []parser.Node,
err error,
) {
parent = &parser.AggregateExpr{
Op: expr.Op,
Param: expr.Param,
}
var mkChild func(sharded *parser.AggregateExpr) parser.Expr
if expr.Without {
/*
parallelizing a sum using without(foo) is representable naively as
sum without(foo) (
sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or
sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m]))
)
or (more optimized):
sum without(__cortex_shard__) (
sum without(foo) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or
sum without(foo) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m]))
)
*/
parent.Grouping = []string{ShardLabel}
parent.Without = true
mkChild = func(sharded *parser.AggregateExpr) parser.Expr {
sharded.Grouping = expr.Grouping
sharded.Without = true
return sharded
}
} else if len(expr.Grouping) > 0 {
/*
parallelizing a sum using by(foo) is representable as
sum by(foo) (
sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or
sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m]))
)
*/
parent.Grouping = expr.Grouping
mkChild = func(sharded *parser.AggregateExpr) parser.Expr {
groups := make([]string, 0, len(expr.Grouping)+1)
groups = append(groups, expr.Grouping...)
groups = append(groups, ShardLabel)
sharded.Grouping = groups
return sharded
}
} else {
/*
parallelizing a non-parameterized sum is representable as
sum(
sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or
sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m]))
)
or (more optimized):
sum without(__cortex_shard__) (
sum by(__cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or
sum by(__cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m]))
)
*/
parent.Grouping = []string{ShardLabel}
parent.Without = true
mkChild = func(sharded *parser.AggregateExpr) parser.Expr {
sharded.Grouping = []string{ShardLabel}
return sharded
}
}
// iterate across shardFactor to create children
for i := 0; i < summer.shards; i++ {
cloned, err := CloneNode(expr.Expr)
if err != nil {
return parent, children, err
}
subSummer := NewASTNodeMapper(summer.CopyWithCurShard(i))
sharded, err := subSummer.Map(cloned)
if err != nil {
return parent, children, err
}
subSum := mkChild(&parser.AggregateExpr{
Op: expr.Op,
Expr: sharded.(parser.Expr),
})
children = append(children,
subSum,
)
}
summer.recordShards(float64(summer.shards))
return parent, children, nil
}
// ShardSummer is explicitly passed a prometheus.Counter during construction
// in order to prevent duplicate metric registerings (ShardSummers are created per request).
//recordShards prevents calling nil interfaces (commonly used in tests).
func (summer *shardSummer) recordShards(_ float64) {
if summer.shardedQueries != nil {
summer.shardedQueries.Add(float64(summer.shards))
}
}
func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Node, error) {
shardMatcher, err := labels.NewMatcher(labels.MatchEqual, ShardLabel, fmt.Sprintf(ShardLabelFmt, curshard, shards))
if err != nil {
return nil, err
}
return &parser.VectorSelector{
Name: selector.Name,
Offset: selector.Offset,
LabelMatchers: append(
[]*labels.Matcher{shardMatcher},
selector.LabelMatchers...,
),
}, nil
}
func shardMatrixSelector(curshard, shards int, selector *parser.MatrixSelector) (parser.Node, error) {
shardMatcher, err := labels.NewMatcher(labels.MatchEqual, ShardLabel, fmt.Sprintf(ShardLabelFmt, curshard, shards))
if err != nil {
return nil, err
}
if vs, ok := selector.VectorSelector.(*parser.VectorSelector); ok {
return &parser.MatrixSelector{
VectorSelector: &parser.VectorSelector{
Name: vs.Name,
Offset: vs.Offset,
LabelMatchers: append(
[]*labels.Matcher{shardMatcher},
vs.LabelMatchers...,
),
PosRange: vs.PosRange,
},
Range: selector.Range,
EndPos: selector.EndPos,
}, nil
}
return nil, fmt.Errorf("invalid selector type: %T", selector.VectorSelector)
}
// ParseShard will extract the shard information encoded in ShardLabelFmt
func ParseShard(input string) (parsed ShardAnnotation, err error) {
if !ShardLabelRE.MatchString(input) {
return parsed, errors.Errorf("Invalid ShardLabel value: [%s]", input)
}
matches := strings.Split(input, "_")
x, err := strconv.Atoi(matches[0])
if err != nil {
return parsed, err
}
of, err := strconv.Atoi(matches[2])
if err != nil {
return parsed, err
}
if x >= of {
return parsed, errors.Errorf("Shards out of bounds: [%d] >= [%d]", x, of)
}
return ShardAnnotation{
Shard: x,
Of: of,
}, err
}
// ShardAnnotation is a convenience struct which holds data from a parsed shard label
type ShardAnnotation struct {
Shard int
Of int
}
// String encodes a shardAnnotation into a label value
func (shard ShardAnnotation) String() string {
return fmt.Sprintf(ShardLabelFmt, shard.Shard, shard.Of)
}
// Label generates the ShardAnnotation as a label
func (shard ShardAnnotation) Label() labels.Label {
return labels.Label{
Name: ShardLabel,
Value: shard.String(),
}
}
// ShardFromMatchers extracts a ShardAnnotation and the index it was pulled from in the matcher list
func ShardFromMatchers(matchers []*labels.Matcher) (shard *ShardAnnotation, idx int, err error) {
for i, matcher := range matchers {
if matcher.Name == ShardLabel && matcher.Type == labels.MatchEqual {
shard, err := ParseShard(matcher.Value)
if err != nil {
return nil, i, err
}
return &shard, i, nil
}
}
return nil, 0, nil
}