From 8dd22aba4b6f33ef3768dad72807fa88b62cdf19 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 7 Nov 2025 21:55:14 +0900 Subject: [PATCH 1/2] Add parquet shard cache TTL Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 4 + docs/configuration/config-file-reference.md | 4 + pkg/querier/parquet_queryable.go | 93 +++++++++++++++-- pkg/querier/parquet_queryable_test.go | 109 ++++++++++++++++++++ pkg/querier/querier.go | 10 +- schemas/cortex-config-schema.json | 7 ++ 7 files changed, 214 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7e6a83d570..cf9d2674ec6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092 +* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098 * [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063 * [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074 * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index be429ba38d6..61a71e9b06b 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -302,6 +302,10 @@ querier: # queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + + # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-queryable-shard-cache-ttl + [parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 200d23476de..a0c67fa5570 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4790,6 +4790,10 @@ thanos_engine: # need to make sure Parquet files are created before it is queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + +# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-queryable-shard-cache-ttl +[parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `query_frontend_config` diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 502a635534b..663e985ac53 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -51,6 +51,8 @@ const ( parquetBlockStore blockStoreType = "parquet" ) +const defaultMaintenanceInterval = time.Minute + var ( validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} ) @@ -97,6 +99,7 @@ type parquetQueryableWithFallback struct { fallbackDisabled bool queryStoreAfter time.Duration parquetQueryable storage.Queryable + cache cacheInterface[parquet_storage.ParquetShard] blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -132,7 +135,7 @@ func NewParquetQueryable( return nil, err } - cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg)) + cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg)) if err != nil { return nil, err } @@ -248,6 +251,7 @@ func NewParquetQueryable( subservices: manager, blockStorageQueryable: blockStorageQueryable, parquetQueryable: parquetQueryable, + cache: cache, queryStoreAfter: config.QueryStoreAfter, subservicesWatcher: services.NewFailureWatcher(), finder: blockStorageQueryable.finder, @@ -283,6 +287,10 @@ func (p *parquetQueryableWithFallback) running(ctx context.Context) error { } func (p *parquetQueryableWithFallback) stopping(_ error) error { + if p.cache != nil { + p.cache.Close() + } + return services.StopManagerAndAwaitStopped(context.Background(), p.subservices) } @@ -613,6 +621,7 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint type cacheInterface[T any] interface { Get(path string) T Set(path string, reader T) + Close() } type cacheMetrics struct { @@ -643,17 +652,24 @@ func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { } } +type cacheEntry[T any] struct { + value T + expiresAt time.Time +} + type Cache[T any] struct { - cache *lru.Cache[string, T] + cache *lru.Cache[string, *cacheEntry[T]] name string metrics *cacheMetrics + ttl time.Duration + stopCh chan struct{} } -func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterface[T], error) { +func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) { if size <= 0 { return &noopCache[T]{}, nil } - cache, err := lru.NewWithEvict(size, func(key string, value T) { + cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) { metrics.evictions.WithLabelValues(name).Inc() metrics.size.WithLabelValues(name).Dec() }) @@ -661,17 +677,56 @@ func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterfa return nil, err } - return &Cache[T]{ + c := &Cache[T]{ cache: cache, name: name, metrics: metrics, - }, nil + ttl: ttl, + stopCh: make(chan struct{}), + } + + if ttl > 0 { + go c.maintenanceLoop(maintenanceInterval) + } + + return c, nil +} + +func (c *Cache[T]) maintenanceLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + keys := c.cache.Keys() + for _, key := range keys { + if entry, ok := c.cache.Peek(key); ok { + // we use a Peek() because the Get() change LRU order. + if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { + c.cache.Remove(key) + } + } + } + case <-c.stopCh: + return + } + } } func (c *Cache[T]) Get(path string) (r T) { - if reader, ok := c.cache.Get(path); ok { + if entry, ok := c.cache.Get(path); ok { + isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) + + if isExpired { + c.cache.Remove(path) + c.metrics.misses.WithLabelValues(c.name).Inc() + return + } + c.metrics.hits.WithLabelValues(c.name).Inc() - return reader + return entry.value } c.metrics.misses.WithLabelValues(c.name).Inc() return @@ -681,8 +736,22 @@ func (c *Cache[T]) Set(path string, reader T) { if !c.cache.Contains(path) { c.metrics.size.WithLabelValues(c.name).Inc() } - c.metrics.misses.WithLabelValues(c.name).Inc() - c.cache.Add(path, reader) + + var expiresAt time.Time + if c.ttl > 0 { + expiresAt = time.Now().Add(c.ttl) + } + + entry := &cacheEntry[T]{ + value: reader, + expiresAt: expiresAt, + } + + c.cache.Add(path, entry) +} + +func (c *Cache[T]) Close() { + close(c.stopCh) } type noopCache[T any] struct { @@ -696,6 +765,10 @@ func (n noopCache[T]) Set(_ string, _ T) { } +func (n noopCache[T]) Close() { + +} + var ( shardInfoCtxKey contextKey = 1 ) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 263bb361490..9626a9324e2 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -1,6 +1,7 @@ package querier import ( + "bytes" "context" "fmt" "math/rand" @@ -14,6 +15,7 @@ import ( "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -881,3 +883,110 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { }) }) } + +func Test_Cache_LRUEviction(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + cache, err := newCache[string]("test", 2, 0, time.Minute, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + _ = cache.Get("key1") // hit + // "key2" deleted by LRU eviction + cache.Set("key3", "value3") + + val1 := cache.Get("key1") // hit + require.Equal(t, "value1", val1) + val3 := cache.Get("key3") // hit + require.Equal(t, "value3", val3) + val2 := cache.Get("key2") // miss + require.Equal(t, "", val2) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 2 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 +`)))) +} + +func Test_Cache_TTLEvictionByGet(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + + cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + val = cache.Get("key1") + require.Equal(t, "", val) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 +`)))) +} + +func Test_Cache_TTLEvictionByLoop(t *testing.T) { + reg := prometheus.NewRegistry() + metrics := newCacheMetrics(reg) + + cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + if c, ok := cache.(*Cache[string]); ok { + // should delete by maintenance loop + _, ok := c.cache.Peek("key1") + require.False(t, ok) + } + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 +`)))) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f84f07b9674..8818f1266a1 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -92,10 +92,11 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` + ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -147,6 +148,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 8b748f0c8d3..1e4a246cd27 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5801,6 +5801,13 @@ "type": "number", "x-cli-flag": "querier.parquet-queryable-shard-cache-size" }, + "parquet_queryable_shard_cache_ttl": { + "default": "24h0m0s", + "description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.", + "type": "string", + "x-cli-flag": "querier.parquet-queryable-shard-cache-ttl", + "x-format": "duration" + }, "per_step_stats_enabled": { "default": false, "description": "Enable returning samples stats per steps in query response.", From 0ce1c396bdedf5aef23a81686331e401b43a91d6 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sat, 8 Nov 2025 08:09:50 +0900 Subject: [PATCH 2/2] fix lint Signed-off-by: SungJin1212 --- pkg/querier/parquet_queryable_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 9626a9324e2..f8895eb550d 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -905,7 +905,7 @@ func Test_Cache_LRUEviction(t *testing.T) { val2 := cache.Get("key2") // miss require.Equal(t, "", val2) - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions # TYPE cortex_parquet_queryable_cache_evictions_total counter cortex_parquet_queryable_cache_evictions_total{name="test"} 1 @@ -918,7 +918,7 @@ func Test_Cache_LRUEviction(t *testing.T) { # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses # TYPE cortex_parquet_queryable_cache_misses_total counter cortex_parquet_queryable_cache_misses_total{name="test"} 1 -`)))) + `))) } func Test_Cache_TTLEvictionByGet(t *testing.T) { @@ -940,7 +940,7 @@ func Test_Cache_TTLEvictionByGet(t *testing.T) { val = cache.Get("key1") require.Equal(t, "", val) - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions # TYPE cortex_parquet_queryable_cache_evictions_total counter cortex_parquet_queryable_cache_evictions_total{name="test"} 1 @@ -953,7 +953,7 @@ func Test_Cache_TTLEvictionByGet(t *testing.T) { # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses # TYPE cortex_parquet_queryable_cache_misses_total counter cortex_parquet_queryable_cache_misses_total{name="test"} 1 -`)))) + `))) } func Test_Cache_TTLEvictionByLoop(t *testing.T) { @@ -978,7 +978,7 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { require.False(t, ok) } - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(` + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions # TYPE cortex_parquet_queryable_cache_evictions_total counter cortex_parquet_queryable_cache_evictions_total{name="test"} 1 @@ -988,5 +988,5 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items # TYPE cortex_parquet_queryable_cache_item_count gauge cortex_parquet_queryable_cache_item_count{name="test"} 0 -`)))) + `))) }