forked from prometheus/prometheus
/
histogram_meta.go
500 lines (463 loc) · 14.7 KB
/
histogram_meta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"math"
"github.com/emmalidtdg/prometheus/model/histogram"
)
func writeHistogramChunkLayout(
b *bstream, schema int32, zeroThreshold float64,
positiveSpans, negativeSpans []histogram.Span,
) {
putZeroThreshold(b, zeroThreshold)
putVarbitInt(b, int64(schema))
putHistogramChunkLayoutSpans(b, positiveSpans)
putHistogramChunkLayoutSpans(b, negativeSpans)
}
func readHistogramChunkLayout(b *bstreamReader) (
schema int32, zeroThreshold float64,
positiveSpans, negativeSpans []histogram.Span,
err error,
) {
zeroThreshold, err = readZeroThreshold(b)
if err != nil {
return
}
v, err := readVarbitInt(b)
if err != nil {
return
}
schema = int32(v)
positiveSpans, err = readHistogramChunkLayoutSpans(b)
if err != nil {
return
}
negativeSpans, err = readHistogramChunkLayoutSpans(b)
if err != nil {
return
}
return
}
func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) {
putVarbitUint(b, uint64(len(spans)))
for _, s := range spans {
putVarbitUint(b, uint64(s.Length))
putVarbitInt(b, int64(s.Offset))
}
}
func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
var spans []histogram.Span
num, err := readVarbitUint(b)
if err != nil {
return nil, err
}
for i := 0; i < int(num); i++ {
length, err := readVarbitUint(b)
if err != nil {
return nil, err
}
offset, err := readVarbitInt(b)
if err != nil {
return nil, err
}
spans = append(spans, histogram.Span{
Length: uint32(length),
Offset: int32(offset),
})
}
return spans, nil
}
// putZeroThreshold writes the zero threshold to the bstream. It stores typical
// values in just one byte, but needs 9 bytes for other values. In detail:
// - If the threshold is 0, store a single zero byte.
// - If the threshold is a power of 2 between (and including) 2^-243 and 2^10,
// take the exponent from the IEEE 754 representation of the threshold, which
// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242
// in IEEE 754 representation, and 2^10 is 0.5*2^11.) Add 243 to the exponent
// and store the result (which will be between 1 and 254) as a single
// byte. Note that small powers of two are preferred values for the zero
// threshold. The default value for the zero threshold is 2^-128 (or
// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a
// single byte (with value 116).
// - In all other cases, store 255 as a single byte, followed by the 8 bytes of
// the threshold as a float64, i.e. taking 9 bytes in total.
func putZeroThreshold(b *bstream, threshold float64) {
if threshold == 0 {
b.writeByte(0)
return
}
frac, exp := math.Frexp(threshold)
if frac != 0.5 || exp < -242 || exp > 11 {
b.writeByte(255)
b.writeBits(math.Float64bits(threshold), 64)
return
}
b.writeByte(byte(exp + 243))
}
// readZeroThreshold reads the zero threshold written with putZeroThreshold.
func readZeroThreshold(br *bstreamReader) (float64, error) {
b, err := br.ReadByte()
if err != nil {
return 0, err
}
switch b {
case 0:
return 0, nil
case 255:
v, err := br.readBits(64)
if err != nil {
return 0, err
}
return math.Float64frombits(v), nil
default:
return math.Ldexp(0.5, int(b)-243), nil
}
}
type bucketIterator struct {
spans []histogram.Span
span int // Span position of last yielded bucket.
bucket int // Bucket position within span of last yielded bucket.
idx int // Bucket index (globally across all spans) of last yielded bucket.
}
func newBucketIterator(spans []histogram.Span) *bucketIterator {
b := bucketIterator{
spans: spans,
span: 0,
bucket: -1,
idx: -1,
}
if len(spans) > 0 {
b.idx += int(spans[0].Offset)
}
return &b
}
func (b *bucketIterator) Next() (int, bool) {
// We're already out of bounds.
if b.span >= len(b.spans) {
return 0, false
}
if b.bucket < int(b.spans[b.span].Length)-1 { // Try to move within same span.
b.bucket++
b.idx++
return b.idx, true
}
for b.span < len(b.spans)-1 { // Try to move from one span to the next.
b.span++
b.idx += int(b.spans[b.span].Offset + 1)
b.bucket = 0
if b.spans[b.span].Length == 0 {
b.idx--
continue
}
return b.idx, true
}
// We're out of options.
return 0, false
}
// An Insert describes how many new buckets have to be inserted before
// processing the pos'th bucket from the original slice.
type Insert struct {
pos int
num int
}
// expandSpansForward returns the inserts to expand the bucket spans 'a' so that
// they match the spans in 'b'. 'b' must cover the same or more buckets than
// 'a', otherwise the function will return false.
//
// Example:
//
// Let's say the old buckets look like this:
//
// span syntax: [offset, length]
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
// raw values 6 3 3 2 4 5 1
// deltas 6 -3 0 -1 2 1 -4
//
// But now we introduce a new bucket layout. (Carefully chosen example where we
// have a span appended, one unchanged[*], one prepended, and two merge - in
// that order.)
//
// [*] unchanged in terms of which bucket indices they represent. but to achieve
// that, their offset needs to change if "disrupted" by spans changing ahead of
// them
//
// \/ this one is "unchanged"
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
// raw values 6 3 0 3 0 0 2 4 5 0 1
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
// delta mods: / \ / \ / \
//
// Note for histograms with delta-encoded buckets: Whenever any new buckets are
// introduced, the subsequent "old" bucket needs to readjust its delta to the
// new base of 0. Thus, for the caller who wants to transform the set of
// original deltas to a new set of deltas to match a new span layout that adds
// buckets, we simply need to generate a list of inserts.
//
// Note: Within expandSpansForward we don't have to worry about the changes to the
// spans themselves, thanks to the iterators we get to work with the more useful
// bucket indices (which of course directly correspond to the buckets we have to
// adjust).
func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
var inserts []Insert
// When inter.num becomes > 0, this becomes a valid insert that should
// be yielded when we finish a streak of new buckets.
var inter Insert
av, aOK := ai.Next()
bv, bOK := bi.Next()
loop:
for {
switch {
case aOK && bOK:
switch {
case av == bv: // Both have an identical value. move on!
// Finish WIP insert and reset.
if inter.num > 0 {
inserts = append(inserts, inter)
}
inter.num = 0
av, aOK = ai.Next()
bv, bOK = bi.Next()
inter.pos++
case av < bv: // b misses a value that is in a.
return inserts, false
case av > bv: // a misses a value that is in b. Forward b and recompare.
inter.num++
bv, bOK = bi.Next()
}
case aOK && !bOK: // b misses a value that is in a.
return inserts, false
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
inter.num++
bv, bOK = bi.Next()
default: // Both iterators ran out. We're done.
if inter.num > 0 {
inserts = append(inserts, inter)
}
break loop
}
}
return inserts, true
}
// expandSpansBothWays is similar to expandSpansForward, but now b may also
// cover an entirely different set of buckets. The function returns the
// “forward” inserts to expand 'a' to also cover all the buckets exclusively
// covered by 'b', and it returns the “backward” inserts to expand 'b' to also
// cover all the buckets exclusively covered by 'a'.
func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) {
ai := newBucketIterator(a)
bi := newBucketIterator(b)
var fInserts, bInserts []Insert
var lastBucket int
addBucket := func(b int) {
offset := b - lastBucket - 1
if offset == 0 && len(mergedSpans) > 0 {
mergedSpans[len(mergedSpans)-1].Length++
} else {
if len(mergedSpans) == 0 {
offset++
}
mergedSpans = append(mergedSpans, histogram.Span{
Offset: int32(offset),
Length: 1,
})
}
lastBucket = b
}
// When fInter.num (or bInter.num, respectively) becomes > 0, this
// becomes a valid insert that should be yielded when we finish a streak
// of new buckets.
var fInter, bInter Insert
av, aOK := ai.Next()
bv, bOK := bi.Next()
loop:
for {
switch {
case aOK && bOK:
switch {
case av == bv: // Both have an identical value. move on!
// Finish WIP insert and reset.
if fInter.num > 0 {
fInserts = append(fInserts, fInter)
fInter.num = 0
}
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
addBucket(av)
av, aOK = ai.Next()
bv, bOK = bi.Next()
fInter.pos++
bInter.pos++
case av < bv: // b misses a value that is in a.
bInter.num++
// Collect the forward inserts before advancing
// the position of 'a'.
if fInter.num > 0 {
fInserts = append(fInserts, fInter)
fInter.num = 0
}
addBucket(av)
fInter.pos++
av, aOK = ai.Next()
case av > bv: // a misses a value that is in b. Forward b and recompare.
fInter.num++
// Collect the backward inserts before advancing the
// position of 'b'.
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
bInter.num = 0
}
addBucket(bv)
bInter.pos++
bv, bOK = bi.Next()
}
case aOK && !bOK: // b misses a value that is in a.
bInter.num++
addBucket(av)
av, aOK = ai.Next()
case !aOK && bOK: // a misses a value that is in b. Forward b and recompare.
fInter.num++
addBucket(bv)
bv, bOK = bi.Next()
default: // Both iterators ran out. We're done.
if fInter.num > 0 {
fInserts = append(fInserts, fInter)
}
if bInter.num > 0 {
bInserts = append(bInserts, bInter)
}
break loop
}
}
return fInserts, bInserts, mergedSpans
}
type bucketValue interface {
int64 | float64
}
// insert merges 'in' with the provided inserts and writes them into 'out',
// which must already have the appropriate length. 'out' is also returned for
// convenience.
func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV {
var (
oi int // Position in out.
v BV // The last value seen.
ii int // The next insert to process.
)
for i, d := range in {
if ii < len(inserts) && i == inserts[ii].pos {
// We have an insert!
// Add insert.num new delta values such that their
// bucket values equate 0. When deltas==false, it means
// that it is an absolute value. So we set it to 0
// directly.
if deltas {
out[oi] = -v
} else {
out[oi] = 0
}
oi++
for x := 1; x < inserts[ii].num; x++ {
out[oi] = 0
oi++
}
ii++
// Now save the value from the input. The delta value we
// should save is the original delta value + the last
// value of the point before the insert (to undo the
// delta that was introduced by the insert). When
// deltas==false, it means that it is an absolute value,
// so we set it directly to the value in the 'in' slice.
if deltas {
out[oi] = d + v
} else {
out[oi] = d
}
oi++
v = d + v
continue
}
// If there was no insert, the original delta is still valid.
out[oi] = d
oi++
v += d
}
switch ii {
case len(inserts):
// All inserts processed. Nothing more to do.
case len(inserts) - 1:
// One more insert to process at the end.
if deltas {
out[oi] = -v
} else {
out[oi] = 0
}
oi++
for x := 1; x < inserts[ii].num; x++ {
out[oi] = 0
oi++
}
default:
panic("unprocessed inserts left")
}
return out
}
// counterResetHint returns a CounterResetHint based on the CounterResetHeader
// and on the position into the chunk.
func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterResetHint {
switch {
case crh == GaugeType:
// A gauge histogram chunk only contains gauge histograms.
return histogram.GaugeType
case numRead > 1:
// In a counter histogram chunk, there will not be any counter
// resets after the first histogram.
return histogram.NotCounterReset
case crh == CounterReset:
// If the chunk was started because of a counter reset, we can
// safely return that hint. This histogram always has to be
// treated as a counter reset.
return histogram.CounterReset
default:
// Sadly, we have to return "unknown" as the hint for all other
// cases, even if we know that the chunk was started without a
// counter reset. But we cannot be sure that the previous chunk
// still exists in the TSDB, so we conservatively return
// "unknown". On the bright side, this case should be relatively
// rare.
//
// TODO(beorn7): Nevertheless, if the current chunk is in the
// middle of a block (not the first chunk in the block for this
// series), it's probably safe to assume that the previous chunk
// will exist in the TSDB for as long as the current chunk
// exist, and we could safely return
// "histogram.NotCounterReset". This needs some more work and
// might not be worth the effort and/or risk. To be vetted...
return histogram.UnknownCounterReset
}
}
// Handle pathological case of empty span when advancing span idx.
// Call it with idx==-1 to find the first non empty span.
func nextNonEmptySpanSliceIdx(idx int, bucketIdx int32, spans []histogram.Span) (newIdx int, newBucketIdx int32) {
for idx++; idx < len(spans); idx++ {
if spans[idx].Length > 0 {
return idx, bucketIdx + spans[idx].Offset + 1
}
bucketIdx += spans[idx].Offset
}
return idx, 0
}