-
Notifications
You must be signed in to change notification settings - Fork 456
/
metadata_fetcher_filters.go
111 lines (88 loc) · 3.53 KB
/
metadata_fetcher_filters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/storegateway/metadata_fetcher_filters.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
package storegateway
import (
"context"
"time"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/thanos-io/objstore"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketindex"
)
type MetadataFilterWithBucketIndex interface {
// FilterWithBucketIndex is like Thanos MetadataFilter.Filter() but it provides in input the bucket index too.
FilterWithBucketIndex(ctx context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
}
// IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements
// the MetadataFilterWithBucketIndex interface.
type IgnoreDeletionMarkFilter struct {
upstream *block.IgnoreDeletionMarkFilter
delay time.Duration
deletionMarkMap map[ulid.ULID]*block.DeletionMark
}
// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter {
return &IgnoreDeletionMarkFilter{
upstream: block.NewIgnoreDeletionMarkFilter(logger, bkt, delay, concurrency),
delay: delay,
}
}
// DeletionMarkBlocks returns blocks that were marked for deletion.
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*block.DeletionMark {
// If the cached deletion marks exist it means the filter function was called with the bucket
// index, so it's safe to return it.
if f.deletionMarkMap != nil {
return f.deletionMarkMap
}
return f.upstream.DeletionMarkBlocks()
}
// Filter implements block.MetadataFilter.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error {
return f.upstream.Filter(ctx, metas, synced)
}
// FilterWithBucketIndex implements MetadataFilterWithBucketIndex.
func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error {
// Build a map of block deletion marks
marks := make(map[ulid.ULID]*block.DeletionMark, len(idx.BlockDeletionMarks))
for _, mark := range idx.BlockDeletionMarks {
marks[mark.ID] = mark.ThanosDeletionMark()
}
// Keep it cached.
f.deletionMarkMap = marks
for _, mark := range marks {
if _, ok := metas[mark.ID]; !ok {
continue
}
if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(block.MarkedForDeletionMeta).Inc()
delete(metas, mark.ID)
}
}
return nil
}
const minTimeExcludedMeta = "min-time-excluded"
// minTimeMetaFilter filters out blocks that contain the most recent data (based on block MinTime).
type minTimeMetaFilter struct {
limit time.Duration
}
func newMinTimeMetaFilter(limit time.Duration) *minTimeMetaFilter {
return &minTimeMetaFilter{limit: limit}
}
func (f *minTimeMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error {
if f.limit <= 0 {
return nil
}
limitTime := timestamp.FromTime(time.Now().Add(-f.limit))
for id, m := range metas {
if m.MinTime < limitTime {
continue
}
synced.WithLabelValues(minTimeExcludedMeta).Inc()
delete(metas, id)
}
return nil
}