forked from grafana/loki
/
series.go
153 lines (132 loc) · 3.27 KB
/
series.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package retention
import (
"github.com/prometheus/prometheus/model/labels"
"go.etcd.io/bbolt"
"github.com/frelon/loki/v2/pkg/storage/chunk"
)
type userSeries struct {
key []byte
seriesIDLen int
}
func newUserSeries(seriesID, userID []byte) userSeries {
key := make([]byte, 0, len(seriesID)+len(userID))
key = append(key, seriesID...)
key = append(key, userID...)
return userSeries{
key: key,
seriesIDLen: len(seriesID),
}
}
func (us userSeries) Key() string {
return unsafeGetString(us.key)
}
func (us userSeries) SeriesID() []byte {
return us.key[:us.seriesIDLen]
}
func (us userSeries) UserID() []byte {
return us.key[us.seriesIDLen:]
}
func (us *userSeries) Reset(seriesID, userID []byte) {
if us.key == nil {
us.key = make([]byte, 0, len(seriesID)+len(userID))
}
us.key = us.key[:0]
us.key = append(us.key, seriesID...)
us.key = append(us.key, userID...)
us.seriesIDLen = len(seriesID)
}
type userSeriesInfo struct {
userSeries
isDeleted bool
lbls labels.Labels
}
type userSeriesMap map[string]userSeriesInfo
func newUserSeriesMap() userSeriesMap {
return make(userSeriesMap)
}
func (u userSeriesMap) Add(seriesID, userID []byte, lbls labels.Labels) {
us := newUserSeries(seriesID, userID)
if _, ok := u[us.Key()]; ok {
return
}
u[us.Key()] = userSeriesInfo{
userSeries: us,
isDeleted: true,
lbls: lbls,
}
}
// MarkSeriesNotDeleted is used to mark series not deleted when it still has some chunks left in the store
func (u userSeriesMap) MarkSeriesNotDeleted(seriesID, userID []byte) {
us := newUserSeries(seriesID, userID)
usi := u[us.Key()]
usi.isDeleted = false
u[us.Key()] = usi
}
func (u userSeriesMap) ForEach(callback func(info userSeriesInfo) error) error {
for _, v := range u {
if err := callback(v); err != nil {
return err
}
}
return nil
}
type seriesLabels struct {
userSeries
lbs labels.Labels
}
type seriesLabelsMapper struct {
cursor *bbolt.Cursor
config chunk.PeriodConfig
bufKey userSeries
mapping map[string]*seriesLabels
}
func newSeriesLabelsMapper(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*seriesLabelsMapper, error) {
sm := &seriesLabelsMapper{
cursor: bucket.Cursor(),
mapping: map[string]*seriesLabels{},
config: config,
bufKey: newUserSeries(nil, nil),
}
if err := sm.build(); err != nil {
return nil, err
}
return sm, nil
}
func (sm *seriesLabelsMapper) Get(seriesID, userID []byte) labels.Labels {
sm.bufKey.Reset(seriesID, userID)
lbs, ok := sm.mapping[sm.bufKey.Key()]
if ok {
return lbs.lbs
}
return labels.Labels{}
}
func (sm *seriesLabelsMapper) build() error {
Outer:
for k, v := sm.cursor.First(); k != nil; k, v = sm.cursor.Next() {
ref, ok, err := parseLabelSeriesRangeKey(decodeKey(k))
if err != nil {
return err
}
if !ok {
continue
}
sm.bufKey.Reset(ref.SeriesID, ref.UserID)
lbs, ok := sm.mapping[sm.bufKey.Key()]
if !ok {
k := newUserSeries(ref.SeriesID, ref.UserID)
lbs = &seriesLabels{
userSeries: k,
lbs: make(labels.Labels, 0, 15),
}
sm.mapping[k.Key()] = lbs
}
// add the labels if it doesn't exist.
for _, l := range lbs.lbs {
if l.Name == unsafeGetString(ref.Name) {
continue Outer
}
}
lbs.lbs = append(lbs.lbs, labels.Label{Name: string(ref.Name), Value: string(v)})
}
return nil
}