/
blocks_finder_bucket_index.go
120 lines (97 loc) · 3.74 KB
/
blocks_finder_bucket_index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package querier
import (
"context"
"time"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/services"
)
var (
errBucketIndexBlocksFinderNotRunning = errors.New("bucket index blocks finder is not running")
errBucketIndexTooOld = errors.New("bucket index is too old and the last time it was updated exceeds the allowed max staleness")
)
type BucketIndexBlocksFinderConfig struct {
IndexLoader bucketindex.LoaderConfig
MaxStalePeriod time.Duration
IgnoreDeletionMarksDelay time.Duration
IgnoreBlocksWithin time.Duration
}
// BucketIndexBlocksFinder implements BlocksFinder interface and find blocks in the bucket
// looking up the bucket index.
type BucketIndexBlocksFinder struct {
services.Service
cfg BucketIndexBlocksFinderConfig
loader *bucketindex.Loader
}
func NewBucketIndexBlocksFinder(cfg BucketIndexBlocksFinderConfig, bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) *BucketIndexBlocksFinder {
loader := bucketindex.NewLoader(cfg.IndexLoader, bkt, cfgProvider, logger, reg)
return &BucketIndexBlocksFinder{
cfg: cfg,
loader: loader,
Service: loader,
}
}
// GetBlocks implements BlocksFinder.
func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) {
if f.State() != services.Running {
return nil, nil, errBucketIndexBlocksFinderNotRunning
}
if maxT < minT {
return nil, nil, errInvalidBlocksRange
}
// Get the bucket index for this user.
idx, ss, err := f.loader.GetIndex(ctx, userID)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
// This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet
// so the bucket index hasn't been created yet.
return nil, nil, nil
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
return nil, nil, validation.AccessDeniedError(err.Error())
}
// Short circuit when bucket failed to be updated due CMK errors recently
if time.Since(ss.GetNonQueryableUntil()) < 0 && ss.NonQueryableReason == bucketindex.CustomerManagedKeyError {
return nil, nil, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error())
}
if err != nil {
return nil, nil, err
}
// Ensure the bucket index is not too old.
if time.Since(idx.GetUpdatedAt()) > f.cfg.MaxStalePeriod {
return nil, nil, errBucketIndexTooOld
}
var (
matchingBlocks = map[ulid.ULID]*bucketindex.Block{}
matchingDeletionMarks = map[ulid.ULID]*bucketindex.BlockDeletionMark{}
)
// Filter blocks containing samples within the range.
for _, block := range idx.Blocks {
if !block.Within(minT, maxT) {
continue
}
matchingBlocks[block.ID] = block
}
for _, mark := range idx.BlockDeletionMarks {
// Filter deletion marks by matching blocks only.
if _, ok := matchingBlocks[mark.ID]; !ok {
continue
}
// Exclude blocks marked for deletion. This is the same logic as Thanos IgnoreDeletionMarkFilter.
if time.Since(time.Unix(mark.DeletionTime, 0)).Seconds() > f.cfg.IgnoreDeletionMarksDelay.Seconds() {
delete(matchingBlocks, mark.ID)
continue
}
matchingDeletionMarks[mark.ID] = mark
}
// Convert matching blocks into a list.
blocks := make(bucketindex.Blocks, 0, len(matchingBlocks))
for _, b := range matchingBlocks {
blocks = append(blocks, b)
}
return blocks, matchingDeletionMarks, nil
}