-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
nearest_delta.go
144 lines (129 loc) · 3.69 KB
/
nearest_delta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package encoding
import (
"fmt"
"math/bits"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// marshalInt64NearestDelta encodes src using `nearest delta` encoding
// with the given precisionBits and appends the encoded value to dst.
//
// precisionBits must be in the range [1...64], where 1 means 50% precision,
// while 64 means 100% precision, i.e. lossless encoding.
func marshalInt64NearestDelta(dst []byte, src []int64, precisionBits uint8) (result []byte, firstValue int64) {
if len(src) < 1 {
logger.Panicf("BUG: src must contain at least 1 item; got %d items", len(src))
}
if err := CheckPrecisionBits(precisionBits); err != nil {
logger.Panicf("BUG: %s", err)
}
firstValue = src[0]
v := src[0]
src = src[1:]
is := GetInt64s(len(src))
if precisionBits == 64 {
// Fast path.
for i, next := range src {
d := next - v
v += d
is.A[i] = d
}
} else {
// Slower path.
trailingZeros := getTrailingZeros(v, precisionBits)
for i, next := range src {
d, tzs := nearestDelta(next, v, precisionBits, trailingZeros)
trailingZeros = tzs
v += d
is.A[i] = d
}
}
dst = MarshalVarInt64s(dst, is.A)
PutInt64s(is)
return dst, firstValue
}
// unmarshalInt64NearestDelta decodes src using `nearest delta` encoding,
// appends the result to dst and returns the appended result.
//
// The firstValue must be the value returned from marshalInt64NearestDelta.
func unmarshalInt64NearestDelta(dst []int64, src []byte, firstValue int64, itemsCount int) ([]int64, error) {
if itemsCount < 1 {
logger.Panicf("BUG: itemsCount must be greater than 0; got %d", itemsCount)
}
is := GetInt64s(itemsCount - 1)
defer PutInt64s(is)
tail, err := UnmarshalVarInt64s(is.A, src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal nearest delta: %s", err)
}
if len(tail) > 0 {
return nil, fmt.Errorf("unexpected tail left after unmarshaling %d items; tail size=%d, value=%X", itemsCount, len(tail), tail)
}
v := firstValue
dst = append(dst, v)
for _, d := range is.A {
v += d
dst = append(dst, v)
}
return dst, nil
}
// nearestDelta returns the nearest value for (next-prev) with the given
// precisionBits.
//
// The second returned value is the number of zeroed trailing bits
// in the returned delta.
func nearestDelta(next, prev int64, precisionBits, prevTrailingZeros uint8) (int64, uint8) {
d := next - prev
if d == 0 {
// Fast path.
return 0, decIfNonZero(prevTrailingZeros)
}
origin := next
if origin < 0 {
origin = -origin
// There is no need in handling special case origin = -1<<63.
}
originBits := 64 - uint8(bits.LeadingZeros64(uint64(origin)))
if originBits <= precisionBits {
// Cannot zero trailing bits for the given precisionBits.
return d, decIfNonZero(prevTrailingZeros)
}
// originBits > precisionBits. May zero trailing bits in d.
trailingZeros := originBits - precisionBits
if trailingZeros > prevTrailingZeros+4 {
// Probably counter reset. Return d with full precision.
return d, prevTrailingZeros + 2
}
if trailingZeros+4 < prevTrailingZeros {
// Probably counter reset. Return d with full precision.
return d, prevTrailingZeros - 2
}
// zero trailing bits in d.
minus := false
if d < 0 {
minus = true
d = -d
// There is no need in handling special case d = -1<<63.
}
nd := int64(uint64(d) & (uint64(1<<64-1) << trailingZeros))
if minus {
nd = -nd
}
return nd, trailingZeros
}
func decIfNonZero(n uint8) uint8 {
if n == 0 {
return 0
}
return n - 1
}
func getTrailingZeros(v int64, precisionBits uint8) uint8 {
if v < 0 {
v = -v
// There is no need in special case handling for v = -1<<63
}
vBits := 64 - uint8(bits.LeadingZeros64(uint64(v)))
if vBits <= precisionBits {
return 0
}
return vBits - precisionBits
}