From dc23b0eb1382b263e26b62022ddf1471ed86213e Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Tue, 5 May 2026 19:09:56 +0530 Subject: [PATCH 1/6] parquetutil: Add parquet row ranges cache Implement a parquet-common RowRangesForConstraintsCache backed by Cortex cache backends. Encode row ranges into a compact binary format and hash cache keys so they are safe for memcached and other shared cache backends. Signed-off-by: Siddarth Gundu --- pkg/util/parquetutil/row_ranges_cache.go | 182 +++++++++++ pkg/util/parquetutil/row_ranges_cache_test.go | 301 ++++++++++++++++++ 2 files changed, 483 insertions(+) create mode 100644 pkg/util/parquetutil/row_ranges_cache.go create mode 100644 pkg/util/parquetutil/row_ranges_cache_test.go diff --git a/pkg/util/parquetutil/row_ranges_cache.go b/pkg/util/parquetutil/row_ranges_cache.go new file mode 100644 index 00000000000..5434667be79 --- /dev/null +++ b/pkg/util/parquetutil/row_ranges_cache.go @@ -0,0 +1,182 @@ +package parquetutil + +import ( + "context" + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "fmt" + "hash" + "strconv" + "time" + + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/cache" + + "github.com/cortexproject/cortex/pkg/util/users" +) + +const ( + // 4 bytes uint32 count. + rowRangesHeaderSize = 4 + + // int64 From + int64 Count. + rowRangeEntrySize = 16 +) + +type rowRangesCacheMetrics struct { + hits *prometheus.CounterVec + misses *prometheus.CounterVec + decodeErrors *prometheus.CounterVec +} + +func newRowRangesCacheMetrics(r prometheus.Registerer) *rowRangesCacheMetrics { + return &rowRangesCacheMetrics{ + hits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_row_ranges_cache_hits_total", + Help: "Total number of parquet row ranges cache hits.", + }, []string{"name"}), + misses: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_row_ranges_cache_misses_total", + Help: "Total number of parquet row ranges cache misses.", + }, []string{"name"}), + decodeErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_row_ranges_cache_decode_errors_total", + Help: "Total number of parquet row ranges cache decode errors.", + }, []string{"name"}), + } +} + +type RowRangesCache struct { + cache cache.Cache + name string + metrics *rowRangesCacheMetrics + ttl time.Duration +} + +func NewRowRangesCache(cache cache.Cache, name string, ttl time.Duration, reg prometheus.Registerer) *RowRangesCache { + if cache == nil { + return nil + } + + return &RowRangesCache{ + cache: cache, + name: name, + metrics: newRowRangesCacheMetrics(reg), + ttl: ttl, + } +} + +func (c *RowRangesCache) Get(ctx context.Context, shard parquet_storage.ParquetShard, rgIdx int, cs []search.Constraint) ([]search.RowRange, bool) { + key, ok := rowRangesCacheKey(c.name, ctx, shard, rgIdx, cs) + if !ok { + c.metrics.misses.WithLabelValues(c.name).Inc() + return nil, false + } + + hits := c.cache.Fetch(ctx, []string{key}) + buf, ok := hits[key] + if !ok { + c.metrics.misses.WithLabelValues(c.name).Inc() + return nil, false + } + + rowRanges, err := decodeRowRanges(buf) + if err != nil { + c.metrics.decodeErrors.WithLabelValues(c.name).Inc() + c.metrics.misses.WithLabelValues(c.name).Inc() + return nil, false + } + + c.metrics.hits.WithLabelValues(c.name).Inc() + return rowRanges, true +} + +func (c *RowRangesCache) Set(ctx context.Context, shard parquet_storage.ParquetShard, rgIdx int, cs []search.Constraint, rr []search.RowRange) error { + key, ok := rowRangesCacheKey(c.name, ctx, shard, rgIdx, cs) + if !ok { + return nil + } + + c.cache.Store(map[string][]byte{key: encodeRowRanges(rr)}, c.ttl) + return nil +} + +func (c *RowRangesCache) Delete(context.Context, parquet_storage.ParquetShard, int, []search.Constraint) error { + // thanos/pkg/cache.Cache doesn't expose deletion. + return nil +} + +func (c *RowRangesCache) Close() error { + // thanos/pkg/cache.Cache doesn't expose Close. + return nil +} + +func rowRangesCacheKey(name string, ctx context.Context, shard parquet_storage.ParquetShard, rgIdx int, cs []search.Constraint) (string, bool) { + userID, err := users.TenantID(ctx) + if err != nil { + return "", false + } + + //gomemcache rejects whitespace or keys over 250 bytes, so hash the key + h := sha256.New() + writeCacheKeyPart(h, userID) + writeCacheKeyPart(h, shard.Name()) + writeCacheKeyPart(h, strconv.Itoa(shard.ShardIdx())) + writeCacheKeyPart(h, strconv.Itoa(rgIdx)) + for _, c := range cs { + writeCacheKeyPart(h, c.String()) + } + + return name + ":" + hex.EncodeToString(h.Sum(nil)), true +} + +func writeCacheKeyPart(h hash.Hash, part string) { + _, _ = fmt.Fprintf(h, "%d:", len(part)) + _, _ = h.Write([]byte(part)) +} + +func encodeRowRanges(rowRanges []search.RowRange) []byte { + buf := make([]byte, rowRangesHeaderSize+len(rowRanges)*rowRangeEntrySize) + binary.LittleEndian.PutUint32(buf[:rowRangesHeaderSize], uint32(len(rowRanges))) + + offset := rowRangesHeaderSize + for _, rowRange := range rowRanges { + binary.LittleEndian.PutUint64(buf[offset:offset+8], uint64(rowRange.From)) + binary.LittleEndian.PutUint64(buf[offset+8:offset+rowRangeEntrySize], uint64(rowRange.Count)) + offset += rowRangeEntrySize + } + + return buf +} + +func decodeRowRanges(buf []byte) ([]search.RowRange, error) { + if len(buf) < rowRangesHeaderSize { + return nil, fmt.Errorf("invalid row ranges cache entry: got %d bytes, expected at least %d", len(buf), rowRangesHeaderSize) + } + + count := binary.LittleEndian.Uint32(buf[:rowRangesHeaderSize]) + if count > uint32((len(buf)-rowRangesHeaderSize)/rowRangeEntrySize) { + return nil, fmt.Errorf("invalid row ranges cache entry: got %d bytes for %d row ranges", len(buf), count) + } + + expectedSize := rowRangesHeaderSize + int(count)*rowRangeEntrySize + if len(buf) != expectedSize { + return nil, fmt.Errorf("invalid row ranges cache entry: got %d bytes, expected %d", len(buf), expectedSize) + } + + rowRanges := make([]search.RowRange, int(count)) + offset := rowRangesHeaderSize + for i := range rowRanges { + rowRanges[i] = search.RowRange{ + From: int64(binary.LittleEndian.Uint64(buf[offset : offset+8])), + Count: int64(binary.LittleEndian.Uint64(buf[offset+8 : offset+rowRangeEntrySize])), + } + offset += rowRangeEntrySize + } + + return rowRanges, nil +} diff --git a/pkg/util/parquetutil/row_ranges_cache_test.go b/pkg/util/parquetutil/row_ranges_cache_test.go new file mode 100644 index 00000000000..7d14a84fb21 --- /dev/null +++ b/pkg/util/parquetutil/row_ranges_cache_test.go @@ -0,0 +1,301 @@ +package parquetutil + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/parquet-go/parquet-go" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" +) + +func TestRowRanges_EncodeDecode(t *testing.T) { + tests := []struct { + name string + rowRanges []search.RowRange + encoded []byte + }{ + { + name: "empty", + rowRanges: []search.RowRange{}, + encoded: []byte{ + 0, 0, 0, 0, + }, + }, + { + name: "multiple row ranges", + rowRanges: []search.RowRange{ + {From: 1, Count: 2}, + {From: 256, Count: 513}, + }, + encoded: []byte{ + 2, 0, 0, 0, + 1, 0, 0, 0, 0, 0, 0, 0, + 2, 0, 0, 0, 0, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, + 1, 2, 0, 0, 0, 0, 0, 0, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + encoded := encodeRowRanges(tt.rowRanges) + require.Equal(t, tt.encoded, encoded) + + decoded, err := decodeRowRanges(encoded) + require.NoError(t, err) + require.Equal(t, tt.rowRanges, decoded) + }) + } +} + +func TestRowRanges_InvalidDecode(t *testing.T) { + tests := []struct { + name string + buf []byte + errContains string + }{ + { + name: "short header", + buf: []byte{1, 0}, + errContains: "invalid row ranges cache entry: got 2 bytes, expected at least 4", + }, + { + name: "truncated row range", + buf: []byte{1, 0, 0, 0}, + errContains: "invalid row ranges cache entry: got 4 bytes for 1 row ranges", + }, + { + name: "trailing bytes", + buf: []byte{0, 0, 0, 0, 0}, + errContains: "invalid row ranges cache entry: got 5 bytes, expected 4", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + decoded, err := decodeRowRanges(tt.buf) + require.Nil(t, decoded) + require.ErrorContains(t, err, tt.errContains) + }) + } +} + +func TestRowRangesCacheKey(t *testing.T) { + cacheName := "parquet-row-ranges" + shard := fakeParquetShard{name: "block-01/labels.parquet", shardIdx: 7} + ctx := user.InjectOrgID(context.Background(), "tenant-a") + constraints := []search.Constraint{ + equalConstraint("labels.foo", "bar baz:qux"), + equalConstraint("labels.zone", "us-east"), + } + + key, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, constraints) + require.True(t, ok) + require.Regexp(t, `^parquet-row-ranges:[0-9a-f]{64}$`, key) + require.Len(t, key, len(cacheName)+1+64) + require.NotContains(t, key, " ") + require.NotContains(t, key, "bar baz:qux") + + sameKey, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, constraints) + require.True(t, ok) + require.Equal(t, key, sameKey) + + differentConstraintKey, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, []search.Constraint{ + equalConstraint("labels.foo", "different"), + equalConstraint("labels.zone", "us-east"), + }) + require.True(t, ok) + require.NotEqual(t, key, differentConstraintKey) + + noConstraintKey, ok := rowRangesCacheKey(cacheName, ctx, shard, 4, nil) + require.True(t, ok) + require.Regexp(t, `^parquet-row-ranges:[0-9a-f]{64}$`, noConstraintKey) + require.NotEqual(t, key, noConstraintKey) +} + +func TestRowRangesCacheKey_MissingTenant(t *testing.T) { + shard := fakeParquetShard{name: "block-01/labels.parquet", shardIdx: 7} + + key, ok := rowRangesCacheKey("parquet-row-ranges", context.Background(), shard, 3, nil) + require.False(t, ok) + require.Empty(t, key) +} + +func TestRowRangesCache_GetSet(t *testing.T) { + env := newRowRangesCacheTestEnv(t) + rowRanges := []search.RowRange{ + {From: 10, Count: 3}, + {From: 30, Count: 4}, + } + + got, ok := env.cache.Get(env.ctx, env.shard, 2, env.constraints) + require.False(t, ok) + require.Nil(t, got) + + require.NoError(t, env.cache.Set(env.ctx, env.shard, 2, env.constraints, rowRanges)) + require.Equal(t, time.Hour, env.backingCache.lastTTL) + require.Len(t, env.backingCache.entries, 1) + + got, ok = env.cache.Get(env.ctx, env.shard, 2, env.constraints) + require.True(t, ok) + require.Equal(t, rowRanges, got) + + require.NoError(t, testutil.GatherAndCompare(env.reg, bytes.NewBufferString(` + # HELP cortex_parquet_row_ranges_cache_hits_total Total number of parquet row ranges cache hits. + # TYPE cortex_parquet_row_ranges_cache_hits_total counter + cortex_parquet_row_ranges_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_row_ranges_cache_misses_total Total number of parquet row ranges cache misses. + # TYPE cortex_parquet_row_ranges_cache_misses_total counter + cortex_parquet_row_ranges_cache_misses_total{name="test"} 1 + `), + "cortex_parquet_row_ranges_cache_hits_total", + "cortex_parquet_row_ranges_cache_misses_total", + )) +} + +func TestRowRangesCache_GetCorruptEntry(t *testing.T) { + env := newRowRangesCacheTestEnv(t) + key, ok := rowRangesCacheKey("test", env.ctx, env.shard, 2, env.constraints) + require.True(t, ok) + env.backingCache.entries[key] = []byte{1, 0, 0, 0} + + got, ok := env.cache.Get(env.ctx, env.shard, 2, env.constraints) + require.False(t, ok) + require.Nil(t, got) + + require.NoError(t, testutil.GatherAndCompare(env.reg, bytes.NewBufferString(` + # HELP cortex_parquet_row_ranges_cache_decode_errors_total Total number of parquet row ranges cache decode errors. + # TYPE cortex_parquet_row_ranges_cache_decode_errors_total counter + cortex_parquet_row_ranges_cache_decode_errors_total{name="test"} 1 + # HELP cortex_parquet_row_ranges_cache_misses_total Total number of parquet row ranges cache misses. + # TYPE cortex_parquet_row_ranges_cache_misses_total counter + cortex_parquet_row_ranges_cache_misses_total{name="test"} 1 + `), + "cortex_parquet_row_ranges_cache_decode_errors_total", + "cortex_parquet_row_ranges_cache_misses_total", + )) +} + +func TestRowRangesCache_MissingTenant(t *testing.T) { + env := newRowRangesCacheTestEnv(t) + rowRanges := []search.RowRange{{From: 10, Count: 3}} + + require.NoError(t, env.cache.Set(context.Background(), env.shard, 2, env.constraints, rowRanges)) + require.Empty(t, env.backingCache.entries) + + got, ok := env.cache.Get(context.Background(), env.shard, 2, env.constraints) + require.False(t, ok) + require.Nil(t, got) + require.Equal(t, 0, env.backingCache.fetchCalls) + + require.NoError(t, testutil.GatherAndCompare(env.reg, bytes.NewBufferString(` + # HELP cortex_parquet_row_ranges_cache_misses_total Total number of parquet row ranges cache misses. + # TYPE cortex_parquet_row_ranges_cache_misses_total counter + cortex_parquet_row_ranges_cache_misses_total{name="test"} 1 + `), + "cortex_parquet_row_ranges_cache_misses_total", + )) +} + +func TestNewRowRangesCache_NilBackend(t *testing.T) { + require.Nil(t, NewRowRangesCache(nil, "test", time.Hour, prometheus.NewRegistry())) +} + +func equalConstraint(path, value string) search.Constraint { + return search.Equal(path, parquet.ValueOf(value)) +} + +type rowRangesCacheTestEnv struct { + reg *prometheus.Registry + backingCache *fakeThanosCache + cache *RowRangesCache + ctx context.Context + shard fakeParquetShard + constraints []search.Constraint +} + +func newRowRangesCacheTestEnv(t *testing.T) rowRangesCacheTestEnv { + t.Helper() + + reg := prometheus.NewRegistry() + backingCache := newFakeThanosCache() + + return rowRangesCacheTestEnv{ + reg: reg, + backingCache: backingCache, + cache: NewRowRangesCache(backingCache, "test", time.Hour, reg), + ctx: user.InjectOrgID(context.Background(), "tenant-a"), + shard: fakeParquetShard{name: "block-01/labels.parquet", shardIdx: 7}, + constraints: []search.Constraint{equalConstraint("labels.foo", "bar")}, + } +} + +type fakeThanosCache struct { + entries map[string][]byte + lastTTL time.Duration + fetchCalls int +} + +func newFakeThanosCache() *fakeThanosCache { + return &fakeThanosCache{ + entries: map[string][]byte{}, + } +} + +func (c *fakeThanosCache) Store(data map[string][]byte, ttl time.Duration) { + c.lastTTL = ttl + for key, value := range data { + c.entries[key] = value + } +} + +func (c *fakeThanosCache) Fetch(_ context.Context, keys []string) map[string][]byte { + c.fetchCalls++ + + hits := map[string][]byte{} + for _, key := range keys { + value, ok := c.entries[key] + if ok { + hits[key] = value + } + } + return hits +} + +func (c *fakeThanosCache) Name() string { + return "fake" +} + +type fakeParquetShard struct { + name string + shardIdx int +} + +func (s fakeParquetShard) Name() string { + return s.name +} + +func (s fakeParquetShard) ShardIdx() int { + return s.shardIdx +} + +func (s fakeParquetShard) LabelsFile() parquet_storage.ParquetFileView { + return nil +} + +func (s fakeParquetShard) ChunksFile() parquet_storage.ParquetFileView { + return nil +} + +func (s fakeParquetShard) TSDBSchema() (*schema.TSDBSchema, error) { + return nil, nil +} From a7b819ced1c889bd7d98ce4a7daa638dc58c5b1b Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Tue, 5 May 2026 19:20:46 +0530 Subject: [PATCH 2/6] querier: Wire parquet row ranges cache into querier Add bucket-store config for the parquet row ranges cache, create the backend cache for parquet queryable Signed-off-by: Siddarth Gundu --- pkg/querier/parquet_queryable.go | 15 ++++++- pkg/storage/tsdb/caching_bucket.go | 31 +++++++++++++++ pkg/storage/tsdb/config.go | 44 ++++++++++++--------- pkg/storage/tsdb/multilevel_bucket_cache.go | 3 ++ 4 files changed, 73 insertions(+), 20 deletions(-) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 559f1f1c533..33eba950616 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -142,6 +142,19 @@ func NewParquetQueryable( return nil, err } + rowRangesCacheBackend, err := cortex_tsdb.CreateParquetRowRangesCache(storageCfg.BucketStore.ParquetRowRangesCache, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + if err != nil { + return nil, err + } + rowRangesCache := parquetutil.NewRowRangesCache(rowRangesCacheBackend, "parquet-row-ranges", storageCfg.BucketStore.ParquetRowRangesCache.TTL, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + + var constraintCacheFunc queryable.ConstraintCacheFunction + if rowRangesCache != nil { + constraintCacheFunc = func(context.Context) (search.RowRangesForConstraintsCache, error) { + return rowRangesCache, nil + } + } + cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) parquetQueryableOpts := []queryable.QueryableOpts{ @@ -272,7 +285,7 @@ func NewParquetQueryable( } return shards, errGroup.Wait() - }, nil, cDecoder, parquetQueryableOpts...) + }, constraintCacheFunc, cDecoder, parquetQueryableOpts...) p := &parquetQueryableWithFallback{ subservices: manager, diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index d998c6ff53e..a5c9b15b6ac 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -218,6 +218,37 @@ func (cfg *ParquetLabelsCacheConfig) Validate() error { return cfg.BucketCacheBackend.Validate() } +type ParquetRowRangesCacheConfig struct { + BucketCacheBackend `yaml:",inline"` + + TTL time.Duration `yaml:"ttl"` +} + +func (cfg *ParquetRowRangesCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The parquet row ranges cache backend type. Single or Multiple cache backend can be provided. "+ + "Supported values in single cache: %s, %s, %s, and '' (disable). "+ + "Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedBucketCacheBackends, ", "))) + + cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") + cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") + cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.", "parquet-row-ranges") + cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.") + + f.DurationVar(&cfg.TTL, prefix+"ttl", 10*time.Minute, "TTL for caching parquet row ranges.") + + // In the multi level parquet row ranges cache, backfill TTL follows the row ranges TTL. + cfg.MultiLevel.BackFillTTL = cfg.TTL +} + +func (cfg *ParquetRowRangesCacheConfig) Validate() error { + return cfg.BucketCacheBackend.Validate() +} + +func CreateParquetRowRangesCache(cfg ParquetRowRangesCacheConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { + cfg.MultiLevel.BackFillTTL = cfg.TTL + return createBucketCache("parquet-row-ranges-cache", &cfg.BucketCacheBackend, logger, reg) +} + func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, parquetLabelsConfig ParquetLabelsCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index fa6f7b1c938..c06ac643a9f 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -277,25 +277,26 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool { // BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway. type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - SyncInterval time.Duration `yaml:"sync_interval"` - MaxConcurrent int `yaml:"max_concurrent"` - MaxInflightRequests int `yaml:"max_inflight_requests"` - TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConsistencyDelay time.Duration `yaml:"consistency_delay"` - IndexCache IndexCacheConfig `yaml:"index_cache"` - ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` - MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` - ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache"` - MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` - IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` - IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` - IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` - BucketIndex BucketIndexConfig `yaml:"bucket_index"` - BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` - BucketStoreType string `yaml:"bucket_store_type"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval"` + MaxConcurrent int `yaml:"max_concurrent"` + MaxInflightRequests int `yaml:"max_inflight_requests"` + TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + IndexCache IndexCacheConfig `yaml:"index_cache"` + ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` + MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` + ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache"` + ParquetRowRangesCache ParquetRowRangesCacheConfig `yaml:"parquet_row_ranges_cache"` + MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` + IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` + IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` + IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` + BucketIndex BucketIndexConfig `yaml:"bucket_index"` + BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` + BucketStoreType string `yaml:"bucket_store_type"` // Chunk pool. MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` @@ -356,6 +357,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.") cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.") cfg.ParquetLabelsCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-labels-cache.") + cfg.ParquetRowRangesCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-row-ranges-cache.") cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.") f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.") @@ -417,6 +419,10 @@ func (cfg *BucketStoreConfig) Validate() error { if err != nil { return errors.Wrap(err, "parquet-labels-cache configuration") } + err = cfg.ParquetRowRangesCache.Validate() + if err != nil { + return errors.Wrap(err, "parquet-row-ranges-cache configuration") + } if !slices.Contains(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) { return ErrInvalidBucketIndexBlockDiscoveryStrategy } diff --git a/pkg/storage/tsdb/multilevel_bucket_cache.go b/pkg/storage/tsdb/multilevel_bucket_cache.go index f9e2b4fbfd1..66c6c432918 100644 --- a/pkg/storage/tsdb/multilevel_bucket_cache.go +++ b/pkg/storage/tsdb/multilevel_bucket_cache.go @@ -71,6 +71,9 @@ func newMultiLevelBucketCache(name string, cfg MultiLevelBucketCacheConfig, reg case "parquet-labels-cache": itemName = "parquet_labels_cache" metricHelpText = "parquet labels cache" + case "parquet-row-ranges-cache": + itemName = "parquet_row_ranges_cache" + metricHelpText = "parquet row ranges cache" default: itemName = name } From f59966acd83f3f35807cf2ad4c038e35abc89e37 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Tue, 5 May 2026 19:22:35 +0530 Subject: [PATCH 3/6] Update generated config docs Signed-off-by: Siddarth Gundu --- docs/blocks-storage/querier.md | 234 ++++++++++++++ docs/blocks-storage/store-gateway.md | 234 ++++++++++++++ docs/configuration/config-file-reference.md | 232 ++++++++++++++ schemas/cortex-config-schema.json | 320 ++++++++++++++++++++ 4 files changed, 1020 insertions(+) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 081b1e83a1c..9050ca418d4 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1695,6 +1695,240 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl [subrange_ttl: | default = 24h] + parquet_row_ranges_cache: + # The parquet row ranges cache backend type. Single or Multiple cache + # backend can be provided. Supported values in single cache: memcached, + # redis, inmemory, and '' (disable). Supported values in multi level + # cache: a comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-row-ranges cache used + # (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV + # query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup + # made after that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. + # If set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should + # run. If more keys are specified, internally keys are split into + # multiple batches and fetched concurrently, honoring the max + # concurrency. If set to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching + # is when data is stored in memory instead of fetching data each time. + # See https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # TTL for caching parquet row ranges. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.ttl + [ttl: | default = 10m] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 7235af218c5..e218ff35f5d 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1753,6 +1753,240 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl [subrange_ttl: | default = 24h] + parquet_row_ranges_cache: + # The parquet row ranges cache backend type. Single or Multiple cache + # backend can be provided. Supported values in single cache: memcached, + # redis, inmemory, and '' (disable). Supported values in multi level + # cache: a comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-row-ranges cache used + # (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV + # query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup + # made after that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. + # If set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should + # run. If more keys are specified, internally keys are split into + # multiple batches and fetched concurrently, honoring the max + # concurrency. If set to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching + # is when data is stored in memory instead of fetching data each time. + # See https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # TTL for caching parquet row ranges. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.ttl + [ttl: | default = 10m] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f9c206ac537..7f14bf289f0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2376,6 +2376,238 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl [subrange_ttl: | default = 24h] + parquet_row_ranges_cache: + # The parquet row ranges cache backend type. Single or Multiple cache + # backend can be provided. Supported values in single cache: memcached, + # redis, inmemory, and '' (disable). Supported values in multi level cache: + # a comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-row-ranges cache used (shared + # between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. If + # set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should run. + # If more keys are specified, internally keys are split into multiple + # batches and fetched concurrently, honoring the max concurrency. If set + # to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit breaker + # becomes half-open. If set to 0, by default open duration is 60 + # seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate against. + # If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching is + # when data is stored in memory instead of fetching data each time. See + # https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit breaker + # becomes half-open. If set to 0, by default open duration is 60 + # seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # TTL for caching parquet row ranges. + # CLI flag: -blocks-storage.bucket-store.parquet-row-ranges-cache.ttl + [ttl: | default = 10m] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 50b9a88bb71..654dfdb739b 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -2606,6 +2606,326 @@ }, "type": "object" }, + "parquet_row_ranges_cache": { + "properties": { + "backend": { + "description": "The parquet row ranges cache backend type. Single or Multiple cache backend can be provided. Supported values in single cache: memcached, redis, inmemory, and '' (disable). Supported values in multi level cache: a comma-separated list of (inmemory, memcached, redis)", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.backend" + }, + "inmemory": { + "properties": { + "max_size_bytes": { + "default": 1073741824, + "description": "Maximum size in bytes of in-memory parquet-row-ranges cache used (shared between all tenants).", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.inmemory.max-size-bytes" + } + }, + "type": "object" + }, + "memcached": { + "properties": { + "addresses": { + "description": "Comma separated list of memcached addresses. Supported prefixes are: dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after that).", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.addresses" + }, + "auto_discovery": { + "default": false, + "description": "Use memcached auto-discovery mechanism provided by some cloud provider like GCP and AWS", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.auto-discovery" + }, + "max_async_buffer_size": { + "default": 10000, + "description": "The maximum number of enqueued asynchronous operations allowed.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-buffer-size" + }, + "max_async_concurrency": { + "default": 3, + "description": "The maximum number of concurrent asynchronous operations can occur.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-async-concurrency" + }, + "max_get_multi_batch_size": { + "default": 0, + "description": "The maximum number of keys a single underlying get operation should run. If more keys are specified, internally keys are split into multiple batches and fetched concurrently, honoring the max concurrency. If set to 0, the max batch size is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-batch-size" + }, + "max_get_multi_concurrency": { + "default": 100, + "description": "The maximum number of concurrent connections running get operations. If set to 0, concurrency is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-get-multi-concurrency" + }, + "max_idle_connections": { + "default": 16, + "description": "The maximum number of idle connections that will be maintained per address.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-idle-connections" + }, + "max_item_size": { + "default": 1048576, + "description": "The maximum size of an item stored in memcached. Bigger items are not stored. If set to 0, no maximum size is enforced.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.max-item-size" + }, + "set_async_circuit_breaker_config": { + "properties": { + "consecutive_failures": { + "default": 5, + "description": "Consecutive failures to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.consecutive-failures" + }, + "enabled": { + "default": false, + "description": "If true, enable circuit breaker.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.enabled" + }, + "failure_percent": { + "default": 0.05, + "description": "Failure percentage to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.failure-percent" + }, + "half_open_max_requests": { + "default": 10, + "description": "Maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, by default it allows 1 request.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.half-open-max-requests" + }, + "min_requests": { + "default": 50, + "description": "Minimal requests to trigger the circuit breaker.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.min-requests" + }, + "open_duration": { + "default": "5s", + "description": "Period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, by default open duration is 60 seconds.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.set-async.circuit-breaker.open-duration", + "x-format": "duration" + } + }, + "type": "object" + }, + "timeout": { + "default": "100ms", + "description": "The socket read/write timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.memcached.timeout", + "x-format": "duration" + } + }, + "type": "object" + }, + "multilevel": { + "properties": { + "max_async_buffer_size": { + "default": 10000, + "description": "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-buffer-size" + }, + "max_async_concurrency": { + "default": 3, + "description": "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-async-concurrency" + }, + "max_backfill_items": { + "default": 10000, + "description": "The maximum number of items to backfill per asynchronous operation.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.multilevel.max-backfill-items" + } + }, + "type": "object" + }, + "redis": { + "properties": { + "addresses": { + "description": "Comma separated list of redis addresses. Supported prefixes are: dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after that).", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.addresses" + }, + "cache_size": { + "default": 0, + "description": "If not zero then client-side caching is enabled. Client-side caching is when data is stored in memory instead of fetching data each time. See https://redis.io/docs/manual/client-side-caching/ for more info.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.cache-size" + }, + "db": { + "default": 0, + "description": "Database to be selected after connecting to the server.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.db" + }, + "dial_timeout": { + "default": "5s", + "description": "Client dial timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.dial-timeout", + "x-format": "duration" + }, + "get_multi_batch_size": { + "default": 100, + "description": "The maximum size per batch for mget.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.get-multi-batch-size" + }, + "master_name": { + "description": "Specifies the master's name. Must be not empty for Redis Sentinel.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.master-name" + }, + "max_async_buffer_size": { + "default": 10000, + "description": "The maximum number of enqueued asynchronous operations allowed.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-buffer-size" + }, + "max_async_concurrency": { + "default": 3, + "description": "The maximum number of concurrent asynchronous operations can occur.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-async-concurrency" + }, + "max_get_multi_concurrency": { + "default": 100, + "description": "The maximum number of concurrent GetMulti() operations. If set to 0, concurrency is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-get-multi-concurrency" + }, + "max_set_multi_concurrency": { + "default": 100, + "description": "The maximum number of concurrent SetMulti() operations. If set to 0, concurrency is unlimited.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.max-set-multi-concurrency" + }, + "password": { + "description": "Redis password.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.password" + }, + "read_timeout": { + "default": "3s", + "description": "Client read timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.read-timeout", + "x-format": "duration" + }, + "set_async_circuit_breaker_config": { + "properties": { + "consecutive_failures": { + "default": 5, + "description": "Consecutive failures to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.consecutive-failures" + }, + "enabled": { + "default": false, + "description": "If true, enable circuit breaker.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.enabled" + }, + "failure_percent": { + "default": 0.05, + "description": "Failure percentage to determine if the circuit breaker should open.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.failure-percent" + }, + "half_open_max_requests": { + "default": 10, + "description": "Maximum number of requests allowed to pass through when the circuit breaker is half-open. If set to 0, by default it allows 1 request.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.half-open-max-requests" + }, + "min_requests": { + "default": 50, + "description": "Minimal requests to trigger the circuit breaker.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.min-requests" + }, + "open_duration": { + "default": "5s", + "description": "Period of the open state after which the state of the circuit breaker becomes half-open. If set to 0, by default open duration is 60 seconds.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-async.circuit-breaker.open-duration", + "x-format": "duration" + } + }, + "type": "object" + }, + "set_multi_batch_size": { + "default": 100, + "description": "The maximum size per batch for pipeline set.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.set-multi-batch-size" + }, + "tls_ca_path": { + "description": "Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-ca-path" + }, + "tls_cert_path": { + "description": "Path to the client certificate file, which will be used for authenticating with the server. Also requires the key path to be configured.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-cert-path" + }, + "tls_enabled": { + "default": false, + "description": "Whether to enable tls for redis connection.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-enabled" + }, + "tls_insecure_skip_verify": { + "default": false, + "description": "Skip validating server certificate.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-insecure-skip-verify" + }, + "tls_key_path": { + "description": "Path to the key file for the client certificate. Also requires the client certificate to be configured.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-key-path" + }, + "tls_server_name": { + "description": "Override the expected name on the server certificate.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.tls-server-name" + }, + "username": { + "description": "Redis username.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.username" + }, + "write_timeout": { + "default": "3s", + "description": "Client write timeout.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.redis.write-timeout", + "x-format": "duration" + } + }, + "type": "object" + }, + "ttl": { + "default": "10m0s", + "description": "TTL for caching parquet row ranges.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-row-ranges-cache.ttl", + "x-format": "duration" + } + }, + "type": "object" + }, "parquet_shard_cache_size": { "default": 512, "description": "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.", From 032209dbb079cb7d7f0564e9018c0a7a23359763 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Tue, 5 May 2026 21:29:24 +0530 Subject: [PATCH 4/6] parquetutil: modernize row ranges cache test Signed-off-by: Siddarth Gundu --- pkg/util/parquetutil/row_ranges_cache_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/util/parquetutil/row_ranges_cache_test.go b/pkg/util/parquetutil/row_ranges_cache_test.go index 7d14a84fb21..3e18b0f326c 100644 --- a/pkg/util/parquetutil/row_ranges_cache_test.go +++ b/pkg/util/parquetutil/row_ranges_cache_test.go @@ -3,6 +3,7 @@ package parquetutil import ( "bytes" "context" + "maps" "testing" "time" @@ -253,9 +254,7 @@ func newFakeThanosCache() *fakeThanosCache { func (c *fakeThanosCache) Store(data map[string][]byte, ttl time.Duration) { c.lastTTL = ttl - for key, value := range data { - c.entries[key] = value - } + maps.Copy(c.entries, data) } func (c *fakeThanosCache) Fetch(_ context.Context, keys []string) map[string][]byte { From d4c2f52e8b03c86d556c84828b7ceb04ed14803e Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Thu, 7 May 2026 19:11:24 +0530 Subject: [PATCH 5/6] remove redundant backfill ttl assignment Signed-off-by: Siddarth Gundu --- pkg/storage/tsdb/caching_bucket.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index a5c9b15b6ac..d18634763c9 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -235,9 +235,6 @@ func (cfg *ParquetRowRangesCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.") f.DurationVar(&cfg.TTL, prefix+"ttl", 10*time.Minute, "TTL for caching parquet row ranges.") - - // In the multi level parquet row ranges cache, backfill TTL follows the row ranges TTL. - cfg.MultiLevel.BackFillTTL = cfg.TTL } func (cfg *ParquetRowRangesCacheConfig) Validate() error { From 51522cf88e0d61efc433d2e07a0f8edcfcd852f9 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Thu, 7 May 2026 19:17:24 +0530 Subject: [PATCH 6/6] storegateway: wire parquet row ranges cache Signed-off-by: Siddarth Gundu --- CHANGELOG.md | 1 + pkg/storegateway/parquet_bucket_store.go | 3 +- pkg/storegateway/parquet_bucket_stores.go | 52 +++++++++++++------ .../parquet_bucket_stores_test.go | 11 ++++ 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a7640e63d8..21b7373cbb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 * [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 +* [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index e6bded1e2f6..8713d2c4dfb 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -40,6 +40,7 @@ type parquetBucketStore struct { matcherCache storecache.MatchersCache parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] + rowRangesCache search.RowRangesForConstraintsCache } func (p *parquetBucketStore) Close() error { @@ -66,7 +67,7 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx)) for _, blockID := range blockIDs { // TODO: support shard ID > 0 later. - block, err := p.newParquetBlock(ctx, blockID, 0, bucketOpener, bucketOpener, p.chunksDecoder, noopQuota, noopQuota, noopQuota) + block, err := p.newParquetBlock(ctx, blockID, 0, bucketOpener, bucketOpener, p.chunksDecoder, p.rowRangesCache, noopQuota, noopQuota, noopQuota) if err != nil { return nil, err } diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index b51bf758ae5..ecb1e993673 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -23,6 +23,7 @@ import ( storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -56,6 +57,7 @@ type ParquetBucketStores struct { matcherCache storecache.MatchersCache parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] + rowRangesCache search.RowRangesForConstraintsCache inflightRequests *cortex_util.InflightRequestTracker } @@ -73,6 +75,15 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore. return nil, err } + rowRangesCacheBackend, err := tsdb.CreateParquetRowRangesCache(cfg.BucketStore.ParquetRowRangesCache, logger, reg) + if err != nil { + return nil, err + } + var rowRangesCache search.RowRangesForConstraintsCache + if rowRangesCacheBackend != nil { + rowRangesCache = parquetutil.NewRowRangesCache(rowRangesCacheBackend, "parquet-row-ranges", cfg.BucketStore.ParquetRowRangesCache.TTL, reg) + } + u := &ParquetBucketStores{ logger: logger, cfg: cfg, @@ -83,6 +94,7 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore. chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()), inflightRequests: cortex_util.NewInflightRequestTracker(), parquetShardCache: parquetShardCache, + rowRangesCache: rowRangesCache, } if cfg.BucketStore.MatchersCacheMaxItems > 0 { @@ -109,6 +121,7 @@ func (u *ParquetBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Sto return fmt.Errorf("no userID") } + spanCtx = user.InjectOrgID(spanCtx, userID) err := u.getStoreError(userID) userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) if err != nil { @@ -149,6 +162,7 @@ func (u *ParquetBucketStores) LabelNames(ctx context.Context, req *storepb.Label if userID == "" { return nil, fmt.Errorf("no userID") } + spanCtx = user.InjectOrgID(spanCtx, userID) err := u.getStoreError(userID) userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) @@ -165,7 +179,7 @@ func (u *ParquetBucketStores) LabelNames(ctx context.Context, req *storepb.Label return nil, status.Error(codes.Internal, err.Error()) } - return store.LabelNames(ctx, req) + return store.LabelNames(spanCtx, req) } // LabelValues implements BucketStores @@ -177,6 +191,7 @@ func (u *ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.Labe if userID == "" { return nil, fmt.Errorf("no userID") } + spanCtx = user.InjectOrgID(spanCtx, userID) err := u.getStoreError(userID) userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) @@ -193,7 +208,7 @@ func (u *ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.Labe return nil, status.Error(codes.Internal, err.Error()) } - return store.LabelValues(ctx, req) + return store.LabelValues(spanCtx, req) } // SyncBlocks implements BucketStores @@ -262,19 +277,21 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger chunksDecoder: u.chunksDecoder, matcherCache: u.matcherCache, parquetShardCache: u.parquetShardCache, + rowRangesCache: u.rowRangesCache, } return store, nil } type parquetBlock struct { - name string - shard parquet_storage.ParquetShard - m *search.Materializer - concurrency int + name string + shard parquet_storage.ParquetShard + m *search.Materializer + concurrency int + rowRangesCache search.RowRangesForConstraintsCache } -func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, shardID int, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) { +func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, shardID int, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowRangesCache search.RowRangesForConstraintsCache, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) { userID, err := users.TenantID(ctx) if err != nil { return nil, err @@ -316,10 +333,11 @@ func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, s } return &parquetBlock{ - shard: shard, - m: m, - concurrency: p.concurrency, - name: name, + shard: shard, + m: m, + concurrency: p.concurrency, + name: name, + rowRangesCache: rowRangesCache, }, nil } @@ -385,8 +403,8 @@ func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks b if err != nil { return err } - // TODO: Add cache. - rr, err := search.Filter(ctx, b.shard, rgi, nil, cs...) + + rr, err := search.Filter(ctx, b.shard, rgi, b.rowRangesCache, cs...) if err != nil { return err } @@ -446,8 +464,8 @@ func (b *parquetBlock) LabelNames(ctx context.Context, limit int64, matchers []* if err != nil { return err } - // TODO: Add cache. - rr, err := search.Filter(ctx, b.shard, rgi, nil, cs...) + + rr, err := search.Filter(ctx, b.shard, rgi, b.rowRangesCache, cs...) if err != nil { return err } @@ -487,8 +505,8 @@ func (b *parquetBlock) LabelValues(ctx context.Context, name string, limit int64 if err != nil { return err } - // TODO: Add cache. - rr, err := search.Filter(ctx, b.shard, rgi, nil, cs...) + + rr, err := search.Filter(ctx, b.shard, rgi, b.rowRangesCache, cs...) if err != nil { return err } diff --git a/pkg/storegateway/parquet_bucket_stores_test.go b/pkg/storegateway/parquet_bucket_stores_test.go index abbc68e805e..59ba34ef209 100644 --- a/pkg/storegateway/parquet_bucket_stores_test.go +++ b/pkg/storegateway/parquet_bucket_stores_test.go @@ -221,6 +221,11 @@ func TestParquetBucketStoresWithCaching(t *testing.T) { Backend: "inmemory", }, }, + ParquetRowRangesCache: cortex_tsdb.ParquetRowRangesCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, }, } @@ -235,6 +240,12 @@ func TestParquetBucketStoresWithCaching(t *testing.T) { parquetStores, err := newParquetBucketStores(storageCfg, bucketClient, limits, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) require.NotNil(t, parquetStores) + require.NotNil(t, parquetStores.rowRangesCache) + + store, err := parquetStores.getOrCreateStore("user-1") + require.NoError(t, err) + require.NotNil(t, store.rowRangesCache) + require.Same(t, parquetStores.rowRangesCache, store.rowRangesCache) // Verify that the bucket is a caching bucket (it should be wrapped) // The caching bucket should be different from the original bucket client