-
Notifications
You must be signed in to change notification settings - Fork 781
/
metadata_fetcher_filters.go
107 lines (87 loc) · 3.67 KB
/
metadata_fetcher_filters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package storegateway
import (
"context"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)
type MetadataFilterWithBucketIndex interface {
// FilterWithBucketIndex is like Thanos MetadataFilter.Filter() but it provides in input the bucket index too.
FilterWithBucketIndex(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
}
// IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements
// the MetadataFilterWithBucketIndex interface.
type IgnoreDeletionMarkFilter struct {
upstream *block.IgnoreDeletionMarkFilter
delay time.Duration
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}
// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter {
return &IgnoreDeletionMarkFilter{
upstream: block.NewIgnoreDeletionMarkFilter(logger, bkt, delay, concurrency),
delay: delay,
}
}
// DeletionMarkBlocks returns blocks that were marked for deletion.
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark {
// If the cached deletion marks exist it means the filter function was called with the bucket
// index, so it's safe to return it.
if f.deletionMarkMap != nil {
return f.deletionMarkMap
}
return f.upstream.DeletionMarkBlocks()
}
// Filter implements block.MetadataFilter.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
return f.upstream.Filter(ctx, metas, synced, modified)
}
// FilterWithBucketIndex implements MetadataFilterWithBucketIndex.
func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, synced block.GaugeVec) error {
// Build a map of block deletion marks
marks := make(map[ulid.ULID]*metadata.DeletionMark, len(idx.BlockDeletionMarks))
for _, mark := range idx.BlockDeletionMarks {
marks[mark.ID] = mark.ThanosDeletionMark()
}
// Keep it cached.
f.deletionMarkMap = marks
for _, mark := range marks {
if _, ok := metas[mark.ID]; !ok {
continue
}
if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(block.MarkedForDeletionMeta).Inc()
delete(metas, mark.ID)
}
}
return nil
}
func NewIgnoreNonQueryableBlocksFilter(logger log.Logger, ignoreWithin time.Duration) *IgnoreNonQueryableBlocksFilter {
return &IgnoreNonQueryableBlocksFilter{
logger: logger,
ignoreWithin: ignoreWithin,
}
}
// IgnoreNonQueryableBlocksFilter ignores blocks that are too new be queried.
// This has be used in conjunction with `-querier.query-store-after` with some buffer.
type IgnoreNonQueryableBlocksFilter struct {
// Blocks that were created since `now() - ignoreWithin` will not be synced.
ignoreWithin time.Duration
logger log.Logger
}
// Filter implements block.MetadataFilter.
func (f *IgnoreNonQueryableBlocksFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
ignoreWithin := time.Now().Add(-f.ignoreWithin).UnixMilli()
for id, m := range metas {
if m.MinTime > ignoreWithin {
level.Debug(f.logger).Log("msg", "ignoring block because it won't be queried", "id", id)
delete(metas, id)
}
}
return nil
}