Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [BUGFIX] Compactor: Fix stale `cortex_bucket_index_last_successful_update_timestamp_seconds` metric not being cleaned up when tenant ownership changes due to ring rebalancing. This caused false alarms on bucket index update rate when a tenant moved between compactors. #7485
* [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512
* [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515
* [BUGFIX] Querier: Fix unbounded resource leak in the bucket-scan blocks finder (used when the bucket index is disabled). Per-tenant metadata fetchers, their Prometheus registries, and on-disk meta caches are now evicted once a tenant is no longer active, instead of being retained for the lifetime of the process. #7573
* [BUGFIX] Ingester: Close TSDB when compaction fails during `createTSDB`, preventing resource leaks (file descriptors, mmap handles) that could lead to ingester instability. #7560
* [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534
* [BUGFIX] Ingester: Release the TSDB appender on every early-return path in `Push` (e.g. out-of-order label set) by deferring `Rollback`. Previously such requests leaked TSDB head series references, mmap'd chunks and pending state per request, causing the `cortex_ingester_tsdb_head_active_appenders` gauge to grow unbounded. #7528
Expand Down
67 changes: 67 additions & 0 deletions pkg/querier/blocks_finder_bucket_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"maps"
"os"
"path"
"path/filepath"
"slices"
Expand Down Expand Up @@ -267,6 +268,17 @@ pushJobsLoop:
}
d.userMx.Unlock()

// Reconcile the cached metadata fetchers (and their per-tenant Prometheus registries and
// on-disk meta caches) against the set of currently active tenants, so these resources stay
// bounded as tenants are deleted from storage. userIDs comes from a successful ScanUsers call
// (we return early above if it failed), so it is the authoritative active set regardless of
// any per-tenant scan errors collected in resErrs; we therefore reconcile even on the
// partial-error path, so the leak stays bounded under tenant churn. We only skip when the
// context has been cancelled (i.e. the service is shutting down).
if ctx.Err() == nil {
d.evictInactiveUserFetchers(userIDs)
}

return resErrs.Err()
}

Expand Down Expand Up @@ -417,6 +429,61 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat
return f, userBucket, deletionMarkFilter, nil
}

// metaSyncerCacheDirName is the sub-directory, under the per-tenant cache directory, where
// block.NewMetaFetcher stores its cached meta.json files (see createMetaFetcher).
const metaSyncerCacheDirName = "meta-syncer"

// evictInactiveUserFetchers reconciles the per-tenant metadata fetchers against the set of
// currently active tenants. For every tenant that is no longer active it removes the cached
// fetcher, unregisters its per-tenant Prometheus registry, and deletes the fetcher's on-disk meta
// cache. Without this, d.fetchers, d.fetchersMetrics and the on-disk cache would grow unbounded
// for the lifetime of the process as tenants are deleted from storage.
func (d *BucketScanBlocksFinder) evictInactiveUserFetchers(activeUserIDs []string) {
active := make(map[string]struct{}, len(activeUserIDs))
for _, userID := range activeUserIDs {
active[userID] = struct{}{}
}

// Evict the in-memory fetchers and their per-tenant Prometheus registries.
var evicted []string
d.fetchersMx.Lock()
for userID := range d.fetchers {
if _, ok := active[userID]; ok {
continue
}

d.fetchersMetrics.RemoveUserRegistry(userID)
delete(d.fetchers, userID)
evicted = append(evicted, userID)
}
d.fetchersMx.Unlock()

if len(evicted) == 0 {
return
}

level.Info(d.logger).Log("msg", "evicted metadata fetchers for inactive tenants", "count", len(evicted))

// Delete each evicted fetcher's on-disk meta cache, outside the lock to keep disk I/O off it.
// We remove only the fetcher's own "meta-syncer" sub-directory, not the whole CacheDir/<userID>
// tree: in single-binary mode CacheDir is the store-gateway's SyncDir, whose block data also
// lives under CacheDir/<userID>/ and must not be deleted here. We key this off the fetchers
// this process evicted rather than sweeping CacheDir, so we never reach into a co-located
// store-gateway's cache; stale directories left by a previous process are reaped by the
// store-gateway's own cleanup (single-binary) and are otherwise a negligible disk residual.
for _, userID := range evicted {
metaCacheDir := filepath.Join(d.cfg.CacheDir, userID, metaSyncerCacheDirName)
if err := os.RemoveAll(metaCacheDir); err != nil {
level.Warn(d.logger).Log("msg", "failed to delete cached metadata fetcher directory for inactive user", "user", userID, "dir", metaCacheDir, "err", err)
continue
}

// Best-effort removal of the now-empty per-tenant directory. os.Remove only succeeds on an
// empty directory, so a co-located store-gateway's data under the same path is preserved.
_ = os.Remove(filepath.Join(d.cfg.CacheDir, userID))
}
}

func (d *BucketScanBlocksFinder) getBlockMeta(userID string, blockID ulid.ULID) *bucketindex.Block {
d.userMx.RLock()
defer d.userMx.RUnlock()
Expand Down
199 changes: 199 additions & 0 deletions pkg/querier/blocks_finder_bucket_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -416,6 +417,204 @@ func TestBucketScanBlocksFinder_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t
assert.Empty(t, deletionMarks)
}

func TestBucketScanBlocksFinder_PeriodicScanEvictsDeletedUserFetcher(t *testing.T) {
t.Parallel()
ctx := context.Background()
s, bucket, _, reg := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())

cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

// The initial scan must have created and cached a per-tenant metadata fetcher, registered a
// per-tenant Prometheus registry, and created an on-disk meta cache for the active tenant.
s.fetchersMx.Lock()
require.Equal(t, 1, len(s.fetchers))
s.fetchersMx.Unlock()

userCacheDir := filepath.Join(s.cfg.CacheDir, "user-1")
metaSyncerDir := filepath.Join(userCacheDir, metaSyncerCacheDirName)
require.DirExists(t, metaSyncerDir)

// The per-tenant registry contributes to the aggregated cortex_blocks_meta_synced gauge.
syncedBefore, err := testutil.GatherAndCount(reg, "cortex_blocks_meta_synced")
require.NoError(t, err)
require.Greater(t, syncedBefore, 0)

// Delete the user from the bucket so it is no longer active.
require.NoError(t, bucket.Delete(ctx, "user-1"))

// Trigger a periodic scan.
require.NoError(t, s.scan(ctx))

// Once the user is no longer active, its cached fetcher, per-tenant Prometheus registry and
// on-disk meta cache must all be released, otherwise they leak for the lifetime of the process.
s.fetchersMx.Lock()
assert.Equal(t, 0, len(s.fetchers))
s.fetchersMx.Unlock()

// The fetcher's own meta-syncer cache must be removed; the now-empty parent dir is then
// removed on a best-effort basis.
assert.NoDirExists(t, metaSyncerDir)
assert.NoDirExists(t, userCacheDir)

// The per-tenant registry was unregistered, so its gauge series no longer appear.
syncedAfter, err := testutil.GatherAndCount(reg, "cortex_blocks_meta_synced")
require.NoError(t, err)
assert.Equal(t, 0, syncedAfter)
}

func TestBucketScanBlocksFinder_PeriodicScanEvictsOnlyInactiveUserFetchers(t *testing.T) {
t.Parallel()
ctx := context.Background()
s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())

cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
cortex_testutil.MockStorageBlock(t, bucket, "user-2", 10, 20)
cortex_testutil.MockStorageBlock(t, bucket, "user-3", 10, 20)

require.NoError(t, services.StartAndAwaitRunning(ctx, s))

s.fetchersMx.Lock()
require.Equal(t, 3, len(s.fetchers))
s.fetchersMx.Unlock()

// Delete only user-2.
require.NoError(t, bucket.Delete(ctx, "user-2"))
require.NoError(t, s.scan(ctx))

// Only the inactive tenant's fetcher is evicted; the active tenants' fetchers are retained.
s.fetchersMx.Lock()
_, has1 := s.fetchers["user-1"]
_, has2 := s.fetchers["user-2"]
_, has3 := s.fetchers["user-3"]
require.Equal(t, 2, len(s.fetchers))
s.fetchersMx.Unlock()
assert.True(t, has1)
assert.False(t, has2)
assert.True(t, has3)

// A returning tenant gets its fetcher re-created on the next scan.
cortex_testutil.MockStorageBlock(t, bucket, "user-2", 20, 30)
require.NoError(t, s.scan(ctx))

s.fetchersMx.Lock()
_, has2 = s.fetchers["user-2"]
require.Equal(t, 3, len(s.fetchers))
s.fetchersMx.Unlock()
assert.True(t, has2)
}

// failUserBucket makes the per-tenant block listing fail for a single user, so that scanning that
// user returns an error (resErrs != 0) while the top-level user listing used by ScanUsers (Iter
// with an empty prefix) still succeeds.
type failUserBucket struct {
objstore.InstrumentedBucket
failUser string
}

func (b *failUserBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
if strings.HasPrefix(dir, b.failUser) {
return errors.New("injected listing failure")
}
return b.InstrumentedBucket.Iter(ctx, dir, f, options...)
}

func (b *failUserBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
if strings.HasPrefix(dir, b.failUser) {
return errors.New("injected listing failure")
}
return b.InstrumentedBucket.IterWithAttributes(ctx, dir, f, options...)
}

func (b *failUserBucket) WithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.Bucket {
return b
}

func (b *failUserBucket) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b
}

func TestBucketScanBlocksFinder_PeriodicScanEvictsInactiveUserDespiteOtherTenantScanError(t *testing.T) {
t.Parallel()
ctx := context.Background()

bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
wrapped := &failUserBucket{InstrumentedBucket: bkt, failUser: "user-2"}

cfg := prepareBucketScanBlocksFinderConfig()
cfg.CacheDir = t.TempDir()
usersScanner, err := users.NewScanner(users.UsersScannerConfig{
Strategy: users.UserScanStrategyList,
MaxStalePeriod: time.Hour,
CacheTTL: 0,
}, wrapped, log.NewNopLogger(), nil)
require.NoError(t, err)

s := NewBucketScanBlocksFinder(cfg, usersScanner, wrapped, nil, log.NewNopLogger(), nil)
t.Cleanup(func() {
s.StopAsync()
require.NoError(t, s.AwaitTerminated(context.Background()))
})

// Only user-1 is active at startup, so the initial scan succeeds and caches its fetcher.
cortex_testutil.MockStorageBlock(t, bkt, "user-1", 10, 20)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))

s.fetchersMx.Lock()
require.Equal(t, 1, len(s.fetchers))
s.fetchersMx.Unlock()

// Introduce an active-but-erroring tenant and delete user-1.
cortex_testutil.MockStorageBlock(t, bkt, "user-2", 10, 20)
require.NoError(t, bkt.Delete(ctx, "user-1"))

// This scan collects a per-tenant error for user-2 (resErrs != 0). The failing tenant is
// retried with the finder's hardcoded backoff, so this scan takes a few seconds; that cost is
// intrinsic and must not be "optimised" with a short-deadline context, which would cancel the
// context and make scanBucket skip eviction entirely (eviction is gated on ctx.Err() == nil).
require.Error(t, s.scanBucket(ctx))

// ...but eviction is decoupled from per-tenant scan errors, so the now-inactive user-1 is still
// evicted while the erroring (but still active) user-2 is retained.
s.fetchersMx.Lock()
_, has1 := s.fetchers["user-1"]
_, has2 := s.fetchers["user-2"]
s.fetchersMx.Unlock()
assert.False(t, has1)
assert.True(t, has2)
}

// TestBucketScanBlocksFinder_PeriodicScanPreservesNonMetaSyncerDataOnEviction guards the
// single-binary safety property: CacheDir is the store-gateway's SyncDir, whose block data lives
// under the same per-tenant directory, so evicting a tenant must remove only the fetcher's own
// meta-syncer cache and never the sibling block data.
func TestBucketScanBlocksFinder_PeriodicScanPreservesNonMetaSyncerDataOnEviction(t *testing.T) {
t.Parallel()
ctx := context.Background()
s, bucket, _, _ := prepareBucketScanBlocksFinder(t, prepareBucketScanBlocksFinderConfig())

cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))

metaSyncerDir := filepath.Join(s.cfg.CacheDir, "user-1", metaSyncerCacheDirName)
require.DirExists(t, metaSyncerDir)

// Simulate a co-located store-gateway's block data under the same per-tenant directory.
sgBlockFile := filepath.Join(s.cfg.CacheDir, "user-1", "01DTVP434PA9VFXSW2JKB3392D", "index")
require.NoError(t, os.MkdirAll(filepath.Dir(sgBlockFile), 0o755))
require.NoError(t, os.WriteFile(sgBlockFile, []byte("block-data"), 0o644))

require.NoError(t, bucket.Delete(ctx, "user-1"))
require.NoError(t, s.scan(ctx))

// The fetcher's own meta-syncer cache is deleted...
assert.NoDirExists(t, metaSyncerDir)
// ...but the co-located store-gateway block data under the same per-tenant dir must survive.
assert.FileExists(t, sgBlockFile)
}

func TestBucketScanBlocksFinder_GetBlocks(t *testing.T) {
//parallel testing causes data race
ctx := context.Background()
Expand Down
Loading