forked from grafana/loki
/
iterator.go
155 lines (132 loc) · 3.45 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package retention
import (
"fmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.etcd.io/bbolt"
"github.com/frelon/loki/v2/pkg/storage/chunk"
)
var (
_ ChunkEntryIterator = &chunkIndexIterator{}
_ SeriesCleaner = &seriesCleaner{}
)
type ChunkEntry struct {
ChunkRef
Labels labels.Labels
}
type ChunkEntryIterator interface {
Next() bool
Entry() ChunkEntry
// Delete deletes the current entry.
Delete() error
Err() error
}
type chunkIndexIterator struct {
cursor *bbolt.Cursor
current ChunkEntry
first bool
err error
labelsMapper *seriesLabelsMapper
}
func newChunkIndexIterator(bucket *bbolt.Bucket, config chunk.PeriodConfig) (*chunkIndexIterator, error) {
labelsMapper, err := newSeriesLabelsMapper(bucket, config)
if err != nil {
return nil, err
}
return &chunkIndexIterator{
cursor: bucket.Cursor(),
first: true,
labelsMapper: labelsMapper,
current: ChunkEntry{},
}, nil
}
func (b *chunkIndexIterator) Err() error {
return b.err
}
func (b *chunkIndexIterator) Entry() ChunkEntry {
return b.current
}
func (b *chunkIndexIterator) Delete() error {
return b.cursor.Delete()
}
func (b *chunkIndexIterator) Next() bool {
var key []byte
if b.first {
key, _ = b.cursor.First()
b.first = false
} else {
key, _ = b.cursor.Next()
}
for key != nil {
ref, ok, err := parseChunkRef(decodeKey(key))
if err != nil {
b.err = err
return false
}
// skips anything else than chunk index entries.
if !ok {
key, _ = b.cursor.Next()
continue
}
b.current.ChunkRef = ref
b.current.Labels = b.labelsMapper.Get(ref.SeriesID, ref.UserID)
return true
}
return false
}
type SeriesCleaner interface {
Cleanup(userID []byte, lbls labels.Labels) error
}
type seriesCleaner struct {
tableInterval model.Interval
shards map[uint32]string
bucket *bbolt.Bucket
config chunk.PeriodConfig
schema chunk.SeriesStoreSchema
buf []byte
}
func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig, tableName string) *seriesCleaner {
baseSchema, _ := config.CreateSchema()
schema := baseSchema.(chunk.SeriesStoreSchema)
var shards map[uint32]string
if config.RowShards != 0 {
shards = map[uint32]string{}
for s := uint32(0); s <= config.RowShards; s++ {
shards[s] = fmt.Sprintf("%02d", s)
}
}
return &seriesCleaner{
tableInterval: ExtractIntervalFromTableName(tableName),
schema: schema,
bucket: bucket,
buf: make([]byte, 0, 1024),
config: config,
shards: shards,
}
}
func (s *seriesCleaner) Cleanup(userID []byte, lbls labels.Labels) error {
// We need to add metric name label as well if it is missing since the series ids are calculated including that.
if lbls.Get(labels.MetricName) == "" {
lbls = append(lbls, labels.Label{
Name: labels.MetricName,
Value: logMetricName,
})
}
_, indexEntries, err := s.schema.GetCacheKeysAndLabelWriteEntries(s.tableInterval.Start, s.tableInterval.End, string(userID), logMetricName, lbls, "")
if err != nil {
return err
}
for i := range indexEntries {
for _, indexEntry := range indexEntries[i] {
key := make([]byte, 0, len(indexEntry.HashValue)+len(separator)+len(indexEntry.RangeValue))
key = append(key, []byte(indexEntry.HashValue)...)
key = append(key, []byte(separator)...)
key = append(key, indexEntry.RangeValue...)
err := s.bucket.Delete(key)
if err != nil {
return err
}
}
}
return nil
}