This repository has been archived by the owner on Jul 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 73
/
bitprefix.go
247 lines (209 loc) 路 7.4 KB
/
bitprefix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package tsdb
import (
"fmt"
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
phlaremodel "github.com/grafana/phlare/pkg/model"
"github.com/grafana/phlare/pkg/phlaredb/tsdb/index"
"github.com/grafana/phlare/pkg/phlaredb/tsdb/shard"
)
// BitPrefixInvertedIndex is another inverted index implementation
// that uses the bit prefix sharding algorithm in tsdb/index/shard.go
// instead of a modulo approach.
// This is the standard for TSDB compatibility because
// the same series must resolve to the same shard (for each period config),
// whether it's resolved on the ingester or via the store.
type BitPrefixInvertedIndex struct {
totalShards uint32
shards []*indexShard
}
func ValidateBitPrefixShardFactor(factor uint32) error {
if requiredBits := index.NewShard(0, factor).RequiredBits(); 1<<requiredBits != factor {
return fmt.Errorf("Incompatible inverted index shard factor on ingester: It must be a power of two, got %d", factor)
}
return nil
}
func NewBitPrefixWithShards(totalShards uint32) (*BitPrefixInvertedIndex, error) {
if err := ValidateBitPrefixShardFactor(totalShards); err != nil {
return nil, err
}
shards := make([]*indexShard, totalShards)
for i := uint32(0); i < totalShards; i++ {
shards[i] = &indexShard{
idx: map[string]indexEntry{},
shard: i,
}
}
return &BitPrefixInvertedIndex{
totalShards: totalShards,
shards: shards,
}, nil
}
func (ii *BitPrefixInvertedIndex) getShards(shard *shard.Annotation) ([]*indexShard, bool) {
if shard == nil {
return ii.shards, false
}
// When comparing a higher shard factor to a lower inverted index shard factor
// we must filter resulting fingerprints as the the lower shard factor in the
// inverted index is a superset of the requested factor.
//
// For instance, the 3_of_4 shard factor maps to the bit prefix 0b11.
// If the inverted index only has a factor of 2, we'll need to check the 0b1
// prefixed shard (which contains the 0b10 and 0b11 prefixes).
// Conversely, if the requested shard is 1_of_2, but the index has a factor of 4,
// we can _exactly_ match ob1 => (ob10, ob11) and know all fingerprints in those
// resulting shards have the requested ob1 prefix (don't need to filter).
var filter bool
if shard.Of > len(ii.shards) {
filter = true
}
requestedShard := shard.TSDB()
minFp, maxFp := requestedShard.Bounds()
// Determine how many bits we need to take from
// the requested shard's min/max fingerprint values
// in order to calculate the indices for the inverted index's
// shard factor.
requiredBits := index.NewShard(0, uint32(len(ii.shards))).RequiredBits()
lowerIdx := int(minFp >> (64 - requiredBits))
upperIdx := int(maxFp >> (64 - requiredBits))
// If the upper bound's shard doesn't align exactly
// with the maximum fingerprint, we must also
// check the subsequent shard.
// This happens in two cases:
// 1) When requesting the last shard of any factor.
// This accounts for zero indexing in our sharding logic
// to successfully request `shards[start:len(shards)]`
// 2) When requesting the _first_ shard of a larger factor
// than the index uses. In this case, the required_bits are not
// enough and the requested end prefix gets trimmed.
// If confused, comment out this line and see which tests fail.
if (upperIdx << (64 - requiredBits)) != int(maxFp) {
upperIdx++
}
return ii.shards[lowerIdx:upperIdx], filter
}
func (ii *BitPrefixInvertedIndex) shardForFP(fp model.Fingerprint) int {
localShard := index.NewShard(0, uint32(len(ii.shards)))
return int(fp >> (64 - localShard.RequiredBits()))
}
func (ii *BitPrefixInvertedIndex) validateShard(shard *shard.Annotation) error {
if shard == nil {
return nil
}
if 1<<(shard.TSDB().RequiredBits()) != shard.Of {
return fmt.Errorf("Shard factor must be a power of two, got %d", shard.Of)
}
return nil
}
// Add a fingerprint under the specified labels.
// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (ii *BitPrefixInvertedIndex) Add(labels phlaremodel.Labels, fp model.Fingerprint) phlaremodel.Labels {
// add() returns 'interned' values so the original labels are not retained
return ii.shards[ii.shardForFP(fp)].add(labels, fp)
}
// Lookup all fingerprints for the provided matchers.
func (ii *BitPrefixInvertedIndex) Lookup(matchers []*labels.Matcher, shard *shard.Annotation) ([]model.Fingerprint, error) {
if err := ii.validateShard(shard); err != nil {
return nil, err
}
var result []model.Fingerprint
shards, filter := ii.getShards(shard)
// if no matcher is specified, all fingerprints would be returned
if len(matchers) == 0 {
for i := range shards {
fps := shards[i].allFPs()
result = append(result, fps...)
}
} else {
for i := range shards {
fps := shards[i].lookup(matchers)
result = append(result, fps...)
}
}
// Because bit prefix order is also ascending order,
// the merged fingerprints from ascending shards are also in order.
if filter {
minFP, maxFP := shard.TSDB().Bounds()
minIdx := sort.Search(len(result), func(i int) bool {
return result[i] >= minFP
})
maxIdx := sort.Search(len(result), func(i int) bool {
return result[i] >= maxFP
})
result = result[minIdx:maxIdx]
}
return result, nil
}
// LabelNames returns all label names.
func (ii *BitPrefixInvertedIndex) LabelNames(shard *shard.Annotation) ([]string, error) {
if err := ii.validateShard(shard); err != nil {
return nil, err
}
var extractor func(unlockIndex) []string
shards, filter := ii.getShards(shard)
// If we need to check shard inclusion, we have to do it the expensive way :(
// Therefore it's more performant to request shard factors lower or equal to the
// inverted index factor
if filter {
s := shard.TSDB()
extractor = func(x unlockIndex) (results []string) {
outer:
for name, entry := range x {
for _, valEntry := range entry.fps {
for _, fp := range valEntry.fps {
if s.Match(fp) {
results = append(results, name)
continue outer
}
}
}
}
return results
}
}
results := make([][]string, 0, len(shards))
for i := range shards {
shardResult := shards[i].labelNames(extractor)
results = append(results, shardResult)
}
return mergeStringSlices(results), nil
}
// LabelValues returns the values for the given label.
func (ii *BitPrefixInvertedIndex) LabelValues(name string, shard *shard.Annotation) ([]string, error) {
if err := ii.validateShard(shard); err != nil {
return nil, err
}
var extractor func(indexEntry) []string
shards, filter := ii.getShards(shard)
if filter {
s := shard.TSDB()
extractor = func(x indexEntry) []string {
results := make([]string, 0, len(x.fps))
outer:
for val, valEntry := range x.fps {
for _, fp := range valEntry.fps {
if s.Match(fp) {
results = append(results, val)
continue outer
}
}
}
return results
}
}
results := make([][]string, 0, len(shards))
for i := range shards {
shardResult := shards[i].labelValues(name, extractor)
results = append(results, shardResult)
}
return mergeStringSlices(results), nil
}
// Delete a fingerprint with the given label pairs.
func (ii *BitPrefixInvertedIndex) Delete(labels []*typesv1.LabelPair, fp model.Fingerprint) {
localShard := index.NewShard(0, uint32(len(ii.shards)))
idx := int(fp >> (64 - localShard.RequiredBits()))
ii.shards[idx].delete(labels, fp)
}