-
Notifications
You must be signed in to change notification settings - Fork 784
/
bucket_index_metadata_fetcher.go
133 lines (113 loc) · 4.15 KB
/
bucket_index_metadata_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package storegateway
import (
"context"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)
const (
corruptedBucketIndex = "corrupted-bucket-index"
noBucketIndex = "no-bucket-index"
)
// BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Cortex bucket index.
type BucketIndexMetadataFetcher struct {
userID string
bkt objstore.Bucket
strategy ShardingStrategy
cfgProvider bucket.TenantConfigProvider
logger log.Logger
filters []block.MetadataFilter
metrics *block.FetcherMetrics
}
func NewBucketIndexMetadataFetcher(
userID string,
bkt objstore.Bucket,
strategy ShardingStrategy,
cfgProvider bucket.TenantConfigProvider,
logger log.Logger,
reg prometheus.Registerer,
filters []block.MetadataFilter,
) *BucketIndexMetadataFetcher {
return &BucketIndexMetadataFetcher{
userID: userID,
bkt: bkt,
strategy: strategy,
cfgProvider: cfgProvider,
logger: logger,
filters: filters,
metrics: block.NewFetcherMetrics(reg, [][]string{{corruptedBucketIndex}, {noBucketIndex}}, nil),
}
}
// Fetch implements block.MetadataFetcher. Not goroutine-safe.
func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) {
f.metrics.ResetTx()
// Check whether the user belongs to the shard.
if len(f.strategy.FilterUsers(ctx, []string{f.userID})) != 1 {
f.metrics.Submit()
return nil, nil, nil
}
// Track duration and sync counters only if wasn't filtered out by the sharding strategy.
start := time.Now()
defer func() {
f.metrics.SyncDuration.Observe(time.Since(start).Seconds())
if err != nil {
f.metrics.SyncFailures.Inc()
}
}()
f.metrics.Syncs.Inc()
// Fetch the bucket index.
idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
f.metrics.Synced.WithLabelValues(noBucketIndex).Set(1)
f.metrics.Submit()
return nil, nil, nil
}
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
// In case a single tenant bucket index is corrupted, we don't want the store-gateway to fail at startup
// because unable to fetch blocks metadata. We'll act as if the tenant has no bucket index, but the query
// will fail anyway in the querier (the querier fails in the querier if bucket index is corrupted).
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
f.metrics.Synced.WithLabelValues(corruptedBucketIndex).Set(1)
f.metrics.Submit()
return nil, nil, nil
}
if err != nil {
f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1)
f.metrics.Submit()
return nil, nil, errors.Wrapf(err, "read bucket index")
}
// Build block metas out of the index.
metas = make(map[ulid.ULID]*metadata.Meta, len(idx.Blocks))
for _, b := range idx.Blocks {
metas[b.ID] = b.ThanosMeta(f.userID)
}
for _, filter := range f.filters {
var err error
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if customFilter, ok := filter.(MetadataFilterWithBucketIndex); ok {
err = customFilter.FilterWithBucketIndex(ctx, metas, idx, f.metrics.Synced)
} else {
err = filter.Filter(ctx, metas, f.metrics.Synced, f.metrics.Modified)
}
if err != nil {
return nil, nil, errors.Wrap(err, "filter metas")
}
}
f.metrics.Synced.WithLabelValues(block.LoadedMeta).Set(float64(len(metas)))
f.metrics.Submit()
return metas, nil, nil
}
func (f *BucketIndexMetadataFetcher) UpdateOnChange(callback func([]metadata.Meta, error)) {
// Unused by the store-gateway.
callback(nil, errors.New("UpdateOnChange is unsupported"))
}