-
Notifications
You must be signed in to change notification settings - Fork 781
/
caching_bucket.go
310 lines (253 loc) · 12.6 KB
/
caching_bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
package tsdb
import (
"flag"
"fmt"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/go-kit/log"
"github.com/golang/snappy"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/cacheutil"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)
const (
CacheBackendMemcached = "memcached"
CacheBackendRedis = "redis"
)
type CacheBackend struct {
Backend string `yaml:"backend"`
Memcached MemcachedClientConfig `yaml:"memcached"`
Redis RedisClientConfig `yaml:"redis"`
}
// Validate the config.
func (cfg *CacheBackend) Validate() error {
switch cfg.Backend {
case CacheBackendMemcached:
return cfg.Memcached.Validate()
case CacheBackendRedis:
return cfg.Redis.Validate()
case "":
default:
return fmt.Errorf("unsupported cache backend: %s", cfg.Backend)
}
return nil
}
type ChunksCacheConfig struct {
CacheBackend `yaml:",inline"`
SubrangeSize int64 `yaml:"subrange_size"`
MaxGetRangeRequests int `yaml:"max_get_range_requests"`
AttributesTTL time.Duration `yaml:"attributes_ttl"`
SubrangeTTL time.Duration `yaml:"subrange_ttl"`
}
func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s.", CacheBackendMemcached))
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.")
f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for chunks.")
f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual chunks subranges.")
}
func (cfg *ChunksCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
}
type MetadataCacheConfig struct {
CacheBackend `yaml:",inline"`
TenantsListTTL time.Duration `yaml:"tenants_list_ttl"`
TenantBlocksListTTL time.Duration `yaml:"tenant_blocks_list_ttl"`
ChunksListTTL time.Duration `yaml:"chunks_list_ttl"`
MetafileExistsTTL time.Duration `yaml:"metafile_exists_ttl"`
MetafileDoesntExistTTL time.Duration `yaml:"metafile_doesnt_exist_ttl"`
MetafileContentTTL time.Duration `yaml:"metafile_content_ttl"`
MetafileMaxSize int `yaml:"metafile_max_size_bytes"`
MetafileAttributesTTL time.Duration `yaml:"metafile_attributes_ttl"`
BlockIndexAttributesTTL time.Duration `yaml:"block_index_attributes_ttl"`
BucketIndexContentTTL time.Duration `yaml:"bucket_index_content_ttl"`
BucketIndexMaxSize int `yaml:"bucket_index_max_size_bytes"`
}
func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for metadata cache, if not empty. Supported values: %s.", CacheBackendMemcached))
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
f.DurationVar(&cfg.TenantsListTTL, prefix+"tenants-list-ttl", 15*time.Minute, "How long to cache list of tenants in the bucket.")
f.DurationVar(&cfg.TenantBlocksListTTL, prefix+"tenant-blocks-list-ttl", 5*time.Minute, "How long to cache list of blocks for each tenant.")
f.DurationVar(&cfg.ChunksListTTL, prefix+"chunks-list-ttl", 24*time.Hour, "How long to cache list of chunks for a block.")
f.DurationVar(&cfg.MetafileExistsTTL, prefix+"metafile-exists-ttl", 2*time.Hour, "How long to cache information that block metafile exists. Also used for user deletion mark file.")
f.DurationVar(&cfg.MetafileDoesntExistTTL, prefix+"metafile-doesnt-exist-ttl", 5*time.Minute, "How long to cache information that block metafile doesn't exist. Also used for user deletion mark file.")
f.DurationVar(&cfg.MetafileContentTTL, prefix+"metafile-content-ttl", 24*time.Hour, "How long to cache content of the metafile.")
f.IntVar(&cfg.MetafileMaxSize, prefix+"metafile-max-size-bytes", 1*1024*1024, "Maximum size of metafile content to cache in bytes. Caching will be skipped if the content exceeds this size. This is useful to avoid network round trip for large content if the configured caching backend has an hard limit on cached items size (in this case, you should set this limit to the same limit in the caching backend).")
f.DurationVar(&cfg.MetafileAttributesTTL, prefix+"metafile-attributes-ttl", 168*time.Hour, "How long to cache attributes of the block metafile.")
f.DurationVar(&cfg.BlockIndexAttributesTTL, prefix+"block-index-attributes-ttl", 168*time.Hour, "How long to cache attributes of the block index.")
f.DurationVar(&cfg.BucketIndexContentTTL, prefix+"bucket-index-content-ttl", 5*time.Minute, "How long to cache content of the bucket index.")
f.IntVar(&cfg.BucketIndexMaxSize, prefix+"bucket-index-max-size-bytes", 1*1024*1024, "Maximum size of bucket index content to cache in bytes. Caching will be skipped if the content exceeds this size. This is useful to avoid network round trip for large content if the configured caching backend has an hard limit on cached items size (in this case, you should set this limit to the same limit in the caching backend).")
}
func (cfg *MetadataCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
}
func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false
chunksCache, err := createCache("chunks-cache", &chunksConfig.CacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "chunks-cache")
}
if chunksCache != nil {
cachingConfigured = true
chunksCache = cache.NewTracingCache(chunksCache)
cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
}
metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "metadata-cache")
}
if metadataCache != nil {
cachingConfigured = true
metadataCache = cache.NewTracingCache(metadataCache)
cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL)
cfg.CacheAttributes("block-index", metadataCache, matchers.GetBlockIndexMatcher(), metadataConfig.BlockIndexAttributesTTL)
cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)
codec := snappyIterCodec{storecache.JSONIterCodec{}}
cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec, "")
cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec, "")
cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec, "")
}
if !cachingConfigured {
// No caching is configured.
return bkt, nil
}
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
}
func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
switch cacheBackend.Backend {
case "":
// No caching.
return nil, nil
case CacheBackendMemcached:
var client cacheutil.MemcachedClient
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create memcached client")
}
return cache.NewMemcachedCache(cacheName, logger, client, reg), nil
case CacheBackendRedis:
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create redis client")
}
return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil
default:
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend)
}
}
type Matchers struct {
matcherMap map[string]func(string) bool
}
func NewMatchers() Matchers {
matcherMap := make(map[string]func(string) bool)
matcherMap["chunks"] = isTSDBChunkFile
matcherMap["metafile"] = isMetaFile
matcherMap["block-index"] = isBlockIndexFile
matcherMap["bucket-index"] = isBucketIndexFiles
matcherMap["tenants-iter"] = isTenantsDir
matcherMap["tenant-blocks-iter"] = isTenantBlocksDir
matcherMap["chunks-iter"] = isChunksDir
return Matchers{
matcherMap: matcherMap,
}
}
func (m *Matchers) SetMetaFileMatcher(f func(string) bool) {
m.matcherMap["metafile"] = f
}
func (m *Matchers) SetChunksMatcher(f func(string) bool) {
m.matcherMap["chunks"] = f
}
func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) {
m.matcherMap["block-index"] = f
}
func (m *Matchers) SetBucketIndexMatcher(f func(string) bool) {
m.matcherMap["bucket-index"] = f
}
func (m *Matchers) SetTenantsIterMatcher(f func(string) bool) {
m.matcherMap["tenants-iter"] = f
}
func (m *Matchers) SetTenantBlocksIterMatcher(f func(string) bool) {
m.matcherMap["tenant-blocks-iter"] = f
}
func (m *Matchers) SetChunksIterMatcher(f func(string) bool) {
m.matcherMap["chunks-iter"] = f
}
func (m *Matchers) GetChunksMatcher() func(string) bool {
return m.matcherMap["chunks"]
}
func (m *Matchers) GetMetafileMatcher() func(string) bool {
return m.matcherMap["metafile"]
}
func (m *Matchers) GetBlockIndexMatcher() func(string) bool {
return m.matcherMap["block-index"]
}
func (m *Matchers) GetBucketIndexMatcher() func(string) bool {
return m.matcherMap["bucket-index"]
}
func (m *Matchers) GetTenantsIterMatcher() func(string) bool {
return m.matcherMap["tenants-iter"]
}
func (m *Matchers) GetTenantBlocksIterMatcher() func(string) bool {
return m.matcherMap["tenant-blocks-iter"]
}
func (m *Matchers) GetChunksIterMatcher() func(string) bool {
return m.matcherMap["chunks-iter"]
}
var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`)
func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) }
func isMetaFile(name string) bool {
return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile)
}
func isBlockIndexFile(name string) bool {
// Ensure the path ends with "<block id>/<index filename>".
if !strings.HasSuffix(name, "/"+block.IndexFilename) {
return false
}
_, err := ulid.Parse(filepath.Base(filepath.Dir(name)))
return err == nil
}
func isBucketIndexFiles(name string) bool {
// TODO can't reference bucketindex because of a circular dependency. To be fixed.
return strings.HasSuffix(name, "/bucket-index.json.gz") || strings.HasSuffix(name, "/bucket-index-sync-status.json")
}
func isTenantsDir(name string) bool {
return name == ""
}
var tenantDirMatcher = regexp.MustCompile("^[^/]+/?$")
func isTenantBlocksDir(name string) bool {
return tenantDirMatcher.MatchString(name)
}
func isChunksDir(name string) bool {
return strings.HasSuffix(name, "/chunks")
}
type snappyIterCodec struct {
cache.IterCodec
}
func (i snappyIterCodec) Encode(files []string) ([]byte, error) {
b, err := i.IterCodec.Encode(files)
if err != nil {
return nil, err
}
return snappy.Encode(nil, b), nil
}
func (i snappyIterCodec) Decode(cachedData []byte) ([]string, error) {
b, err := snappy.Decode(nil, cachedData)
if err != nil {
return nil, errors.Wrap(err, "snappyIterCodec")
}
return i.IterCodec.Decode(b)
}