Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
15 changes: 15 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1599,4 +1599,19 @@ blocks_storage:
# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -blocks-storage.users-scanner.strategy
[strategy: <string> | default = "list"]

# Maximum period of time to consider the user index as stale. Fall back to
# the base scanner if stale. Only valid when strategy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# TTL of the cached users. 0 disables caching and relies on caching at
# bucket client level.
# CLI flag: -blocks-storage.users-scanner.cache-ttl
[cache_ttl: <duration> | default = 0s]
```
15 changes: 15 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1712,4 +1712,19 @@ blocks_storage:
# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -blocks-storage.users-scanner.strategy
[strategy: <string> | default = "list"]

# Maximum period of time to consider the user index as stale. Fall back to
# the base scanner if stale. Only valid when strategy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# TTL of the cached users. 0 disables caching and relies on caching at
# bucket client level.
# CLI flag: -blocks-storage.users-scanner.cache-ttl
[cache_ttl: <duration> | default = 0s]
```
15 changes: 15 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,21 @@ tsdb:
# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]

users_scanner:
# Strategy to use to scan users. Supported values are: list, user_index.
# CLI flag: -blocks-storage.users-scanner.strategy
[strategy: <string> | default = "list"]

# Maximum period of time to consider the user index as stale. Fall back to the
# base scanner if stale. Only valid when strategy is user_index.
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
[max_stale_period: <duration> | default = 1h]

# TTL of the cached users. 0 disables caching and relies on caching at bucket
# client level.
# CLI flag: -blocks-storage.users-scanner.cache-ttl
[cache_ttl: <duration> | default = 0s]
```

### `compactor_config`
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Currently experimental features are:
- Blocks storage bucket index
- The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
- Blocks storage user index
- Querier: tenant federation
- The thanosconvert tool for converting Thanos block metadata to Cortex
- HA Tracker: cleanup of old replicas from KV Store.
Expand Down
1 change: 1 addition & 0 deletions docs/guides/limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The following tenant IDs are considered invalid in Cortex.
- Current directory (`.`)
- Parent directory (`..`)
- Markers directory (`__markers__`)
- User Index File (`user-index.json.gz`)

### Length

Expand Down
20 changes: 12 additions & 8 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/concurrency"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand Down Expand Up @@ -51,7 +52,7 @@ type BlocksCleaner struct {
cfgProvider ConfigProvider
logger log.Logger
bucketClient objstore.InstrumentedBucket
usersScanner *cortex_tsdb.UsersScanner
usersScanner users.Scanner

ringLifecyclerID string

Expand Down Expand Up @@ -85,7 +86,7 @@ type BlocksCleaner struct {
func NewBlocksCleaner(
cfg BlocksCleanerConfig,
bucketClient objstore.InstrumentedBucket,
usersScanner *cortex_tsdb.UsersScanner,
usersScanner users.Scanner,
compactionVisitMarkerTimeout time.Duration,
cfgProvider ConfigProvider,
logger log.Logger,
Expand Down Expand Up @@ -336,19 +337,22 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
}

func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, error) {
users, deleted, err := c.usersScanner.ScanUsers(ctx)
active, deleting, deleted, err := c.usersScanner.ScanUsers(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to discover users from bucket")
}

isActive := util.StringsMap(users)
isDeleted := util.StringsMap(deleted)
allUsers := append(users, deleted...)
isActive := util.StringsMap(active)
markedForDeletion := make([]string, 0, len(deleting)+len(deleted))
markedForDeletion = append(markedForDeletion, deleting...)
markedForDeletion = append(markedForDeletion, deleted...)
isMarkedForDeletion := util.StringsMap(markedForDeletion)
allUsers := append(active, markedForDeletion...)
// Delete per-tenant metrics for all tenants not belonging anymore to this shard.
// Such tenants have been moved to a different shard, so their updated metrics will
// be exported by the new shard.
for _, userID := range c.lastOwnedUsers {
if !isActive[userID] && !isDeleted[userID] {
if !isActive[userID] && !isMarkedForDeletion[userID] {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
Expand All @@ -365,7 +369,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
}
c.lastOwnedUsers = allUsers

return users, deleted, nil
return active, markedForDeletion, nil
}

func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
Expand Down
60 changes: 46 additions & 14 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -83,21 +84,25 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
reg := prometheus.NewRegistry()
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, mbucket, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

// Clean User with no error
cleaner.bucketClient = bkt
userLogger := util_log.WithUserID(userID, cleaner.logger)
userBucket := bucket.NewUserBucketClient(userID, cleaner.bucketClient, cleaner.cfgProvider)
err := cleaner.cleanUser(ctx, userLogger, userBucket, userID, false)
err = cleaner.cleanUser(ctx, userLogger, userBucket, userID, false)
require.NoError(t, err)
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
Expand Down Expand Up @@ -196,7 +201,10 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions

reg := prometheus.NewPedanticRegistry()
logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Expand Down Expand Up @@ -364,7 +372,11 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
reg := prometheus.NewRegistry()
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Expand Down Expand Up @@ -429,7 +441,11 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
reg := prometheus.NewRegistry()
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Expand Down Expand Up @@ -487,8 +503,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar

ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
reg := prometheus.NewRegistry()
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Expand Down Expand Up @@ -522,7 +541,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
))

// Override the users scanner to reconfigure it to only return a subset of users.
cleaner.usersScanner = tsdb.NewUsersScanner(bucketClient, func(userID string) (bool, error) { return userID == "user-1", nil }, logger)
cleaner.usersScanner, err = users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cleaner.usersScanner = users.NewShardedScanner(cleaner.usersScanner, func(userID string) (bool, error) { return userID == "user-1", nil }, logger)

// Create new blocks, to double check expected metrics have changed.
createTSDBBlock(t, bucketClient, "user-1", 40, 50, nil)
Expand Down Expand Up @@ -630,7 +653,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Expand Down Expand Up @@ -859,7 +885,10 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Expand All @@ -885,7 +914,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
Version: PartitionedGroupInfoVersion1,
}
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
require.NoError(t, err)

visitMarker := &partitionVisitMarker{
Expand Down Expand Up @@ -931,7 +960,10 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
scanner, err := users.NewScanner(tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
}, bucketClient, logger, reg)
require.NoError(t, err)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Expand Down Expand Up @@ -960,7 +992,7 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
Version: PartitionedGroupInfoVersion1,
}
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
_, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
require.NoError(t, err)
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupInfo.PartitionedGroupID)

Expand Down
Loading
Loading