diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d97a4d173..fbc16da662 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ * [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485 * [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512 * [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515 +* [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573 * [BUGFIX] Ingester: Close TSDB when compaction fails during `createTSDB`, preventing resource leaks (file descriptors, mmap handles) that could lead to ingester instability. #7560 * [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534 * [BUGFIX] Ingester: Release the TSDB appender on every early-return path in `Push` (e.g. out-of-order label set) by deferring `Rollback`. Previously such requests leaked TSDB head series references, mmap'd chunks and pending state per request, causing the `cortex_ingester_tsdb_head_active_appenders` gauge to grow unbounded. #7528 diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index 9b6988ecab..1e57b30c7e 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -3,6 +3,7 @@ package querier import ( "context" "maps" + "os" "path" "path/filepath" "slices" @@ -267,6 +268,17 @@ pushJobsLoop: } d.userMx.Unlock() + // Reconcile the cached metadata fetchers (and their per-tenant Prometheus registries and + // on-disk meta caches) against the set of currently active tenants, so these resources stay + // bounded as tenants are deleted from storage. userIDs comes from a successful ScanUsers call + // (we return early above if it failed), so it is the authoritative active set regardless of + // any per-tenant scan errors collected in resErrs; we therefore reconcile even on the + // partial-error path, so the leak stays bounded under tenant churn. We only skip when the + // context has been cancelled (i.e. the service is shutting down). + if ctx.Err() == nil { + d.evictInactiveUserFetchers(userIDs) + } + return resErrs.Err() } @@ -417,6 +429,61 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat return f, userBucket, deletionMarkFilter, nil } +// metaSyncerCacheDirName is the sub-directory, under the per-tenant cache directory, where +// block.NewMetaFetcher stores its cached meta.json files (see createMetaFetcher). +const metaSyncerCacheDirName = "meta-syncer" + +// evictInactiveUserFetchers reconciles the per-tenant metadata fetchers against the set of +// currently active tenants. For every tenant that is no longer active it removes the cached +// fetcher, unregisters its per-tenant Prometheus registry, and deletes the fetcher's on-disk meta +// cache. Without this, d.fetchers, d.fetchersMetrics and the on-disk cache would grow unbounded +// for the lifetime of the process as tenants are deleted from storage. +func (d *BucketScanBlocksFinder) evictInactiveUserFetchers(activeUserIDs []string) { + active := make(map[string]struct{}, len(activeUserIDs)) + for _, userID := range activeUserIDs { + active[userID] = struct{}{} + } + + // Evict the in-memory fetchers and their per-tenant Prometheus registries. + var evicted []string + d.fetchersMx.Lock() + for userID := range d.fetchers { + if _, ok := active[userID]; ok { + continue + } + + d.fetchersMetrics.RemoveUserRegistry(userID) + delete(d.fetchers, userID) + evicted = append(evicted, userID) + } + d.fetchersMx.Unlock() + + if len(evicted) == 0 { + return + } + + level.Info(d.logger).Log("msg", "evicted metadata fetchers for inactive tenants", "count", len(evicted)) + + // Delete each evicted fetcher's on-disk meta cache, outside the lock to keep disk I/O off it. + // We remove only the fetcher's own "meta-syncer" sub-directory, not the whole CacheDir/ + // tree: in single-binary mode CacheDir is the store-gateway's SyncDir, whose block data also + // lives under CacheDir// and must not be deleted here. We key this off the fetchers + // this process evicted rather than sweeping CacheDir, so we never reach into a co-located + // store-gateway's cache; stale directories left by a previous process are reaped by the + // store-gateway's own cleanup (single-binary) and are otherwise a negligible disk residual. + for _, userID := range evicted { + metaCacheDir := filepath.Join(d.cfg.CacheDir, userID, metaSyncerCacheDirName) + if err := os.RemoveAll(metaCacheDir); err != nil { + level.Warn(d.logger).Log("msg", "failed to delete cached metadata fetcher directory for inactive user", "user", userID, "dir", metaCacheDir, "err", err) + continue + } + + // Best-effort removal of the now-empty per-tenant directory. os.Remove only succeeds on an + // empty directory, so a co-located store-gateway's data under the same path is preserved. + _ = os.Remove(filepath.Join(d.cfg.CacheDir, userID)) + } +} + func (d *BucketScanBlocksFinder) getBlockMeta(userID string, blockID ulid.ULID) *bucketindex.Block { d.userMx.RLock() defer d.userMx.RUnlock() diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index 929a0900ca..515e090bf0 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "strings" "testing" "time" @@ -416,6 +417,204 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t assert.Empty(t, deletionMarks) } +func TestBucketScanBlocksFinder_PeriodicScanEvictsDeletedUserFetcher(t *testing.T) { + t.Parallel() + ctx := context.Background() + s, bucket, _, reg := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) + + cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20) + cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30) + + require.NoError(t, services.StartAndAwaitRunning(ctx, s)) + + // The initial scan must have created and cached a per-tenant metadata fetcher, registered a + // per-tenant Prometheus registry, and created an on-disk meta cache for the active tenant. + s.fetchersMx.Lock() + require.Equal(t, 1, len(s.fetchers)) + s.fetchersMx.Unlock() + + userCacheDir := filepath.Join(s.cfg.CacheDir, "user-1") + metaSyncerDir := filepath.Join(userCacheDir, metaSyncerCacheDirName) + require.DirExists(t, metaSyncerDir) + + // The per-tenant registry contributes to the aggregated cortex_blocks_meta_synced gauge. + syncedBefore, err := testutil.GatherAndCount(reg, "cortex_blocks_meta_synced") + require.NoError(t, err) + require.Greater(t, syncedBefore, 0) + + // Delete the user from the bucket so it is no longer active. + require.NoError(t, bucket.Delete(ctx, "user-1")) + + // Trigger a periodic scan. + require.NoError(t, s.scan(ctx)) + + // Once the user is no longer active, its cached fetcher, per-tenant Prometheus registry and + // on-disk meta cache must all be released, otherwise they leak for the lifetime of the process. + s.fetchersMx.Lock() + assert.Equal(t, 0, len(s.fetchers)) + s.fetchersMx.Unlock() + + // The fetcher's own meta-syncer cache must be removed; the now-empty parent dir is then + // removed on a best-effort basis. + assert.NoDirExists(t, metaSyncerDir) + assert.NoDirExists(t, userCacheDir) + + // The per-tenant registry was unregistered, so its gauge series no longer appear. + syncedAfter, err := testutil.GatherAndCount(reg, "cortex_blocks_meta_synced") + require.NoError(t, err) + assert.Equal(t, 0, syncedAfter) +} + +func TestBucketScanBlocksFinder_PeriodicScanEvictsOnlyInactiveUserFetchers(t *testing.T) { + t.Parallel() + ctx := context.Background() + s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) + + cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20) + cortex_testutil.MockStorageBlock(t, bucket, "user-2", 10, 20) + cortex_testutil.MockStorageBlock(t, bucket, "user-3", 10, 20) + + require.NoError(t, services.StartAndAwaitRunning(ctx, s)) + + s.fetchersMx.Lock() + require.Equal(t, 3, len(s.fetchers)) + s.fetchersMx.Unlock() + + // Delete only user-2. + require.NoError(t, bucket.Delete(ctx, "user-2")) + require.NoError(t, s.scan(ctx)) + + // Only the inactive tenant's fetcher is evicted; the active tenants' fetchers are retained. + s.fetchersMx.Lock() + _, has1 := s.fetchers["user-1"] + _, has2 := s.fetchers["user-2"] + _, has3 := s.fetchers["user-3"] + require.Equal(t, 2, len(s.fetchers)) + s.fetchersMx.Unlock() + assert.True(t, has1) + assert.False(t, has2) + assert.True(t, has3) + + // A returning tenant gets its fetcher re-created on the next scan. + cortex_testutil.MockStorageBlock(t, bucket, "user-2", 20, 30) + require.NoError(t, s.scan(ctx)) + + s.fetchersMx.Lock() + _, has2 = s.fetchers["user-2"] + require.Equal(t, 3, len(s.fetchers)) + s.fetchersMx.Unlock() + assert.True(t, has2) +} + +// failUserBucket makes the per-tenant block listing fail for a single user, so that scanning that +// user returns an error (resErrs != 0) while the top-level user listing used by ScanUsers (Iter +// with an empty prefix) still succeeds. +type failUserBucket struct { + objstore.InstrumentedBucket + failUser string +} + +func (b *failUserBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + if strings.HasPrefix(dir, b.failUser) { + return errors.New("injected listing failure") + } + return b.InstrumentedBucket.Iter(ctx, dir, f, options...) +} + +func (b *failUserBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if strings.HasPrefix(dir, b.failUser) { + return errors.New("injected listing failure") + } + return b.InstrumentedBucket.IterWithAttributes(ctx, dir, f, options...) +} + +func (b *failUserBucket) WithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.Bucket { + return b +} + +func (b *failUserBucket) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return b +} + +func TestBucketScanBlocksFinder_PeriodicScanEvictsInactiveUserDespiteOtherTenantScanError(t *testing.T) { + t.Parallel() + ctx := context.Background() + + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + wrapped := &failUserBucket{InstrumentedBucket: bkt, failUser: "user-2"} + + cfg := prepareBucketScanBlocksFinderConfig() + cfg.CacheDir = t.TempDir() + usersScanner, err := users.NewScanner(users.UsersScannerConfig{ + Strategy: users.UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, wrapped, log.NewNopLogger(), nil) + require.NoError(t, err) + + s := NewBucketScanBlocksFinder(cfg, usersScanner, wrapped, nil, log.NewNopLogger(), nil) + t.Cleanup(func() { + s.StopAsync() + require.NoError(t, s.AwaitTerminated(context.Background())) + }) + + // Only user-1 is active at startup, so the initial scan succeeds and caches its fetcher. + cortex_testutil.MockStorageBlock(t, bkt, "user-1", 10, 20) + require.NoError(t, services.StartAndAwaitRunning(ctx, s)) + + s.fetchersMx.Lock() + require.Equal(t, 1, len(s.fetchers)) + s.fetchersMx.Unlock() + + // Introduce an active-but-erroring tenant and delete user-1. + cortex_testutil.MockStorageBlock(t, bkt, "user-2", 10, 20) + require.NoError(t, bkt.Delete(ctx, "user-1")) + + // This scan collects a per-tenant error for user-2 (resErrs != 0). The failing tenant is + // retried with the finder's hardcoded backoff, so this scan takes a few seconds; that cost is + // intrinsic and must not be "optimised" with a short-deadline context, which would cancel the + // context and make scanBucket skip eviction entirely (eviction is gated on ctx.Err() == nil). + require.Error(t, s.scanBucket(ctx)) + + // ...but eviction is decoupled from per-tenant scan errors, so the now-inactive user-1 is still + // evicted while the erroring (but still active) user-2 is retained. + s.fetchersMx.Lock() + _, has1 := s.fetchers["user-1"] + _, has2 := s.fetchers["user-2"] + s.fetchersMx.Unlock() + assert.False(t, has1) + assert.True(t, has2) +} + +// TestBucketScanBlocksFinder_PeriodicScanPreservesNonMetaSyncerDataOnEviction guards the +// single-binary safety property: CacheDir is the store-gateway's SyncDir, whose block data lives +// under the same per-tenant directory, so evicting a tenant must remove only the fetcher's own +// meta-syncer cache and never the sibling block data. +func TestBucketScanBlocksFinder_PeriodicScanPreservesNonMetaSyncerDataOnEviction(t *testing.T) { + t.Parallel() + ctx := context.Background() + s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig()) + + cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20) + require.NoError(t, services.StartAndAwaitRunning(ctx, s)) + + metaSyncerDir := filepath.Join(s.cfg.CacheDir, "user-1", metaSyncerCacheDirName) + require.DirExists(t, metaSyncerDir) + + // Simulate a co-located store-gateway's block data under the same per-tenant directory. + sgBlockFile := filepath.Join(s.cfg.CacheDir, "user-1", "01DTVP434PA9VFXSW2JKB3392D", "index") + require.NoError(t, os.MkdirAll(filepath.Dir(sgBlockFile), 0o755)) + require.NoError(t, os.WriteFile(sgBlockFile, []byte("block-data"), 0o644)) + + require.NoError(t, bucket.Delete(ctx, "user-1")) + require.NoError(t, s.scan(ctx)) + + // The fetcher's own meta-syncer cache is deleted... + assert.NoDirExists(t, metaSyncerDir) + // ...but the co-located store-gateway block data under the same per-tenant dir must survive. + assert.FileExists(t, sgBlockFile) +} + func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) { //parallel testing causes data race ctx := context.Background()