/
series.go
111 lines (97 loc) · 2.41 KB
/
series.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package compactor
import (
"github.com/prometheus/prometheus/model/labels"
"go.etcd.io/bbolt"
"github.com/grafana/loki/v3/pkg/storage/config"
)
type userSeries struct {
key []byte
seriesIDLen int
}
func newUserSeries(seriesID []byte, userID []byte) userSeries {
key := make([]byte, 0, len(seriesID)+len(userID))
key = append(key, seriesID...)
key = append(key, userID...)
return userSeries{
key: key,
seriesIDLen: len(seriesID),
}
}
func (us userSeries) Key() string {
return unsafeGetString(us.key)
}
func (us userSeries) SeriesID() []byte {
return us.key[:us.seriesIDLen]
}
func (us userSeries) UserID() []byte {
return us.key[us.seriesIDLen:]
}
func (us *userSeries) Reset(seriesID []byte, userID []byte) {
if us.key == nil {
us.key = make([]byte, 0, len(seriesID)+len(userID))
}
us.key = us.key[:0]
us.key = append(us.key, seriesID...)
us.key = append(us.key, userID...)
us.seriesIDLen = len(seriesID)
}
type seriesLabels struct {
userSeries
lbs labels.Labels
}
type seriesLabelsMapper struct {
cursor *bbolt.Cursor
config config.PeriodConfig
bufKey userSeries
mapping map[string]*seriesLabels
}
func newSeriesLabelsMapper(bucket *bbolt.Bucket, config config.PeriodConfig) (*seriesLabelsMapper, error) {
sm := &seriesLabelsMapper{
cursor: bucket.Cursor(),
mapping: map[string]*seriesLabels{},
config: config,
bufKey: newUserSeries(nil, nil),
}
if err := sm.build(); err != nil {
return nil, err
}
return sm, nil
}
func (sm *seriesLabelsMapper) Get(seriesID []byte, userID []byte) labels.Labels {
sm.bufKey.Reset(seriesID, userID)
lbs, ok := sm.mapping[sm.bufKey.Key()]
if ok {
return lbs.lbs
}
return labels.Labels{}
}
func (sm *seriesLabelsMapper) build() error {
Outer:
for k, v := sm.cursor.First(); k != nil; k, v = sm.cursor.Next() {
ref, ok, err := parseLabelSeriesRangeKey(decodeKey(k))
if err != nil {
return err
}
if !ok {
continue
}
sm.bufKey.Reset(ref.SeriesID, ref.UserID)
lbs, ok := sm.mapping[sm.bufKey.Key()]
if !ok {
k := newUserSeries(ref.SeriesID, ref.UserID)
lbs = &seriesLabels{
userSeries: k,
lbs: make(labels.Labels, 0, 15),
}
sm.mapping[k.Key()] = lbs
}
// add the labels if it doesn't exist.
for _, l := range lbs.lbs {
if l.Name == unsafeGetString(ref.Name) {
continue Outer
}
}
lbs.lbs = append(lbs.lbs, labels.Label{Name: string(ref.Name), Value: string(v)})
}
return nil
}