diff --git a/CHANGELOG.md b/CHANGELOG.md index bcd47a5fe58..f78f1b445c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766 * [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769 * [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778 +* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 893ab67282a..b7d622c82c6 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1599,4 +1599,19 @@ blocks_storage: # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] + + users_scanner: + # Strategy to use to scan users. Supported values are: list, user_index. + # CLI flag: -blocks-storage.users-scanner.strategy + [strategy: | default = "list"] + + # Maximum period of time to consider the user index as stale. Fall back to + # the base scanner if stale. Only valid when strategy is user_index. + # CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period + [max_stale_period: | default = 1h] + + # TTL of the cached users. 0 disables caching and relies on caching at + # bucket client level. + # CLI flag: -blocks-storage.users-scanner.cache-ttl + [cache_ttl: | default = 0s] ``` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 5053ea86ee7..525dee49b2c 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1712,4 +1712,19 @@ blocks_storage: # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] + + users_scanner: + # Strategy to use to scan users. Supported values are: list, user_index. + # CLI flag: -blocks-storage.users-scanner.strategy + [strategy: | default = "list"] + + # Maximum period of time to consider the user index as stale. Fall back to + # the base scanner if stale. Only valid when strategy is user_index. + # CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period + [max_stale_period: | default = 1h] + + # TTL of the cached users. 0 disables caching and relies on caching at + # bucket client level. + # CLI flag: -blocks-storage.users-scanner.cache-ttl + [cache_ttl: | default = 0s] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6882ca45c45..10f1a4ab8fa 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2163,6 +2163,21 @@ tsdb: # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] + +users_scanner: + # Strategy to use to scan users. Supported values are: list, user_index. + # CLI flag: -blocks-storage.users-scanner.strategy + [strategy: | default = "list"] + + # Maximum period of time to consider the user index as stale. Fall back to the + # base scanner if stale. Only valid when strategy is user_index. + # CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period + [max_stale_period: | default = 1h] + + # TTL of the cached users. 0 disables caching and relies on caching at bucket + # client level. + # CLI flag: -blocks-storage.users-scanner.cache-ttl + [cache_ttl: | default = 0s] ``` ### `compactor_config` diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 77a66e4d293..fcad7e9ee49 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -64,6 +64,7 @@ Currently experimental features are: - Blocks storage bucket index - The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental - The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions +- Blocks storage user index - Querier: tenant federation - The thanosconvert tool for converting Thanos block metadata to Cortex - HA Tracker: cleanup of old replicas from KV Store. diff --git a/docs/guides/limitations.md b/docs/guides/limitations.md index d8059b0b7c1..6f9be53da84 100644 --- a/docs/guides/limitations.md +++ b/docs/guides/limitations.md @@ -38,6 +38,7 @@ The following tenant IDs are considered invalid in Cortex. - Current directory (`.`) - Parent directory (`..`) - Markers directory (`__markers__`) +- User Index File (`user-index.json.gz`) ### Length diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 03e7564b0a6..c52b326cb03 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -51,7 +52,7 @@ type BlocksCleaner struct { cfgProvider ConfigProvider logger log.Logger bucketClient objstore.InstrumentedBucket - usersScanner *cortex_tsdb.UsersScanner + usersScanner users.Scanner ringLifecyclerID string @@ -85,7 +86,7 @@ type BlocksCleaner struct { func NewBlocksCleaner( cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, - usersScanner *cortex_tsdb.UsersScanner, + usersScanner users.Scanner, compactionVisitMarkerTimeout time.Duration, cfgProvider ConfigProvider, logger log.Logger, @@ -336,19 +337,22 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e } func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, error) { - users, deleted, err := c.usersScanner.ScanUsers(ctx) + active, deleting, deleted, err := c.usersScanner.ScanUsers(ctx) if err != nil { return nil, nil, errors.Wrap(err, "failed to discover users from bucket") } - isActive := util.StringsMap(users) - isDeleted := util.StringsMap(deleted) - allUsers := append(users, deleted...) + isActive := util.StringsMap(active) + markedForDeletion := make([]string, 0, len(deleting)+len(deleted)) + markedForDeletion = append(markedForDeletion, deleting...) + markedForDeletion = append(markedForDeletion, deleted...) + isMarkedForDeletion := util.StringsMap(markedForDeletion) + allUsers := append(active, markedForDeletion...) // Delete per-tenant metrics for all tenants not belonging anymore to this shard. // Such tenants have been moved to a different shard, so their updated metrics will // be exported by the new shard. for _, userID := range c.lastOwnedUsers { - if !isActive[userID] && !isDeleted[userID] { + if !isActive[userID] && !isMarkedForDeletion[userID] { c.tenantBlocks.DeleteLabelValues(userID) c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID) c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) @@ -365,7 +369,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro } c.lastOwnedUsers = allUsers - return users, deleted, nil + return active, markedForDeletion, nil } func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 17583f7d80f..befa6f9787a 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -25,6 +25,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" @@ -83,7 +84,11 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { } logger := log.NewNopLogger() - scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger) + reg := prometheus.NewRegistry() + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, mbucket, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -91,13 +96,13 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { }, append(commonLabels, reasonLabelName)) dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"}) - cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) + cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec) // Clean User with no error cleaner.bucketClient = bkt userLogger := util_log.WithUserID(userID, cleaner.logger) userBucket := bucket.NewUserBucketClient(userID, cleaner.bucketClient, cleaner.cfgProvider) - err := cleaner.cleanUser(ctx, userLogger, userBucket, userID, false) + err = cleaner.cleanUser(ctx, userLogger, userBucket, userID, false) require.NoError(t, err) s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger) require.NoError(t, err) @@ -196,7 +201,10 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions reg := prometheus.NewPedanticRegistry() logger := log.NewNopLogger() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -364,7 +372,11 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { } logger := log.NewNopLogger() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + reg := prometheus.NewRegistry() + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -429,7 +441,11 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { } logger := log.NewNopLogger() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + reg := prometheus.NewRegistry() + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -487,8 +503,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar ctx := context.Background() logger := log.NewNopLogger() - reg := prometheus.NewPedanticRegistry() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + reg := prometheus.NewRegistry() + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -522,7 +541,11 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar )) // Override the users scanner to reconfigure it to only return a subset of users. - cleaner.usersScanner = tsdb.NewUsersScanner(bucketClient, func(userID string) (bool, error) { return userID == "user-1", nil }, logger) + cleaner.usersScanner, err = users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) + cleaner.usersScanner = users.NewShardedScanner(cleaner.usersScanner, func(userID string) (bool, error) { return userID == "user-1", nil }, logger) // Create new blocks, to double check expected metrics have changed. createTSDBBlock(t, bucketClient, "user-1", 40, 50, nil) @@ -630,7 +653,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { ctx := context.Background() logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -859,7 +885,10 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { ctx := context.Background() logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -885,7 +914,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) { CreationTime: time.Now().Add(-5 * time.Minute).Unix(), Version: PartitionedGroupInfoVersion1, } - _, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo) + _, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo) require.NoError(t, err) visitMarker := &partitionVisitMarker{ @@ -931,7 +960,10 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) { ctx := context.Background() logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + scanner, err := users.NewScanner(tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, bucketClient, logger, reg) + require.NoError(t, err) cfgProvider := newMockConfigProvider() blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, @@ -960,7 +992,7 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) { CreationTime: time.Now().Add(-5 * time.Minute).Unix(), Version: PartitionedGroupInfoVersion1, } - _, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo) + _, err = UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo) require.NoError(t, err) partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupInfo.PartitionedGroupID) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a2be271953e..950c7be9e55 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -33,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/backoff" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -413,7 +414,9 @@ type Compactor struct { blocksCompactorFactory BlocksCompactorFactory // Users scanner, used to discover users from the bucket. - usersScanner *cortex_tsdb.UsersScanner + usersScanner users.Scanner + + userIndexUpdater *users.UserIndexUpdater // Blocks cleaner is responsible to hard delete blocks marked for deletion. blocksCleaner *BlocksCleaner @@ -653,7 +656,6 @@ func (c *Compactor) starting(ctx context.Context) error { c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient) cleanerBucketClient := c.bucketClient - if c.compactorCfg.CleanerCachingBucketEnabled { cleanerBucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, true, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "cleaner"}, c.registerer)) if err != nil { @@ -661,8 +663,18 @@ func (c *Compactor) starting(ctx context.Context) error { } } + if c.compactorCfg.CachingBucketEnabled { + c.bucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, false, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + if err != nil { + return errors.Wrap(err, "create caching bucket for compactor") + } + } + // Create the users scanner. - c.usersScanner = cortex_tsdb.NewUsersScanner(cleanerBucketClient, c.ownUserForCleanUp, c.parentLogger) + c.usersScanner, err = users.NewScanner(c.storageCfg.UsersScanner, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + if err != nil { + return errors.Wrap(err, "failed to create users scanner") + } var cleanerRingLifecyclerID = "default-cleaner" // Initialize the compactors ring if sharding is enabled. @@ -729,6 +741,8 @@ func (c *Compactor) starting(ctx context.Context) error { } } + // Cleaner needs a users scanner that is sharded. + cleanerUsersScanner := users.NewShardedScanner(c.usersScanner, c.ownUserForCleanUp, c.logger) // Create the blocks cleaner (service). c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ DeletionDelay: c.compactorCfg.DeletionDelay, @@ -738,15 +752,19 @@ func (c *Compactor) starting(ctx context.Context) error { TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, ShardingStrategy: c.compactorCfg.ShardingStrategy, CompactionStrategy: c.compactorCfg.CompactionStrategy, - }, cleanerBucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, + }, cleanerBucketClient, cleanerUsersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions) - if c.compactorCfg.CachingBucketEnabled { - c.bucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, false, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) - if err != nil { - return errors.Wrap(err, "create caching bucket for compactor") - } + // If sharding is disabled, there is no need to have every compactor to run the user index updater + // as it will be the same to fallback to list strategy. + if c.compactorCfg.ShardingEnabled && c.storageCfg.UsersScanner.Strategy == cortex_tsdb.UserScanStrategyUserIndex { + // We hardcode strategy to be list so can ignore error. + baseScanner, _ := users.NewScanner(cortex_tsdb.UsersScannerConfig{ + Strategy: cortex_tsdb.UserScanStrategyList, + }, c.bucketClient, c.logger, c.registerer) + c.userIndexUpdater = users.NewUserIndexUpdater(c.bucketClient, baseScanner, c.registerer) } + return nil } @@ -772,6 +790,10 @@ func (c *Compactor) running(ctx context.Context) error { return errors.Wrap(err, "failed to start the blocks cleaner") } + if c.userIndexUpdater != nil { + go c.userIndexUpdateLoop(ctx) + } + // Run an initial compaction before starting the interval. // Insert jitter right before compaction starts to avoid multiple starting compactor to be in sync select { @@ -1110,15 +1132,16 @@ func (c *Compactor) discoverUsersWithRetries(ctx context.Context) ([]string, err return nil, lastErr } +// discoverUsers returns all users that are active and deleting. Deleted users are not included. func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) { - var users []string - - err := c.bucketClient.Iter(ctx, "", func(entry string) error { - users = append(users, strings.TrimSuffix(entry, "/")) - return nil - }) - - return users, err + activeUsers, deletingUsers, _, err := c.usersScanner.ScanUsers(ctx) + if err != nil { + return nil, err + } + users := make([]string, 0, len(activeUsers)+len(deletingUsers)) + users = append(users, activeUsers...) + users = append(users, deletingUsers...) + return users, nil } func (c *Compactor) ownUserForCompaction(userID string) (bool, error) { @@ -1175,6 +1198,37 @@ func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) { return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil } +func (c *Compactor) userIndexUpdateLoop(ctx context.Context) { + // Hardcode ID to check which compactor owns updating user index. + userID := users.UserIndexCompressedFilename + // Align with clean up interval. + ticker := time.NewTicker(util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1)) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + level.Error(c.logger).Log("msg", "context timeout, exit user index update loop", "err", ctx.Err()) + return + case <-ticker.C: + owned, err := c.ownUser(userID, true) + if err != nil { + level.Error(c.logger).Log("msg", "failed to check if compactor owns updating user index", "err", err) + // Wait for next interval. Worst case, the user index scanner will fallback to list strategy. + continue + } + if !owned { + continue + } + if err := c.userIndexUpdater.UpdateUserIndex(ctx); err != nil { + level.Error(c.logger).Log("msg", "failed to update user index", "err", err) + // Wait for next interval. Worst case, the user index scanner will fallback to list strategy. + continue + } + } + } +} + const compactorMetaPrefix = "compactor-meta-" // metaSyncDirForUser returns directory to store cached meta files. diff --git a/pkg/compactor/compactor_paritioning_test.go b/pkg/compactor/compactor_paritioning_test.go index dd3e46a5663..c355abf0bdf 100644 --- a/pkg/compactor/compactor_paritioning_test.go +++ b/pkg/compactor/compactor_paritioning_test.go @@ -34,6 +34,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_storage_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -191,6 +192,7 @@ func TestPartitionCompactor_SkipCompactionWhenCmkError(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{userID}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) @@ -225,6 +227,7 @@ func TestPartitionCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{}, nil) bucketClient.MockIter("__markers__", []string{}, nil) cfg := prepareConfigForPartitioning() @@ -300,6 +303,7 @@ func TestPartitionCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersF // Fail to iterate over the bucket while discovering users. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", nil, errors.New("failed to iterate the bucket")) bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) @@ -373,6 +377,7 @@ func TestPartitionCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASing userID := "test-user" partitionedGroupID := getPartitionedGroupID(userID) bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{userID}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX", userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) @@ -433,6 +438,7 @@ func TestPartitionCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASing func TestPartitionCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { partitionedGroupID1 := getPartitionedGroupID("user-1") bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -488,6 +494,7 @@ func TestPartitionCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -638,6 +645,7 @@ func TestPartitionCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) @@ -762,6 +770,7 @@ func TestPartitionCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testin partitionedGroupID2 := getPartitionedGroupID("user-2") // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -854,6 +863,7 @@ func TestPartitionCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *t partitionedGroupID1 := getPartitionedGroupID("user-1") // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("__markers__", []string{"__markers__/user-1/"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) @@ -1023,6 +1033,7 @@ func TestPartitionCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInst partitionedGroupID2 := getPartitionedGroupID("user-2") // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -1138,6 +1149,7 @@ func TestPartitionCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEn // Mock the bucket to contain all users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", userIDs, nil) bucketClient.MockIter("__markers__", []string{}, nil) for _, userID := range userIDs { @@ -1251,6 +1263,7 @@ func TestPartitionCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingE // Mock the bucket to contain all users, each one with five blocks, 2 sets of overlapping blocks and 1 separate block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", userIDs, nil) bucketClient.MockIter("__markers__", []string{}, nil) @@ -1404,6 +1417,7 @@ func prepareForPartitioning(t *testing.T, compactorCfg Config, bucketClient objs storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery) + storageCfg.UsersScanner.Strategy = cortex_tsdb.UserScanStrategyUserIndex // Create a temporary directory for compactor data. compactorCfg.DataDir = t.TempDir() @@ -1611,6 +1625,7 @@ func TestPartitionCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Mock the bucket bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{}, nil) bucketClient.MockIter("__markers__", []string{}, nil) @@ -1651,6 +1666,7 @@ func TestPartitionCompactor_ShouldNotHangIfPlannerReturnNothing(t *testing.T) { partitionedGroupID := getPartitionedGroupID("user-1") bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) @@ -1710,6 +1726,7 @@ func TestPartitionCompactor_ShouldNotFailCompactionIfAccessDeniedErrDuringMetaSy partitionedGroupID := getPartitionedGroupID("user-1") bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) @@ -1763,6 +1780,7 @@ func TestPartitionCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFrom partitionedGroupID := getPartitionedGroupID("user-1") bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index c482e6c7329..a34dd7b2b50 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -41,6 +41,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_storage_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -171,6 +172,7 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{userID}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) @@ -204,6 +206,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{}, nil) bucketClient.MockIter("__markers__", []string{}, nil) cfg := prepareConfig() @@ -280,6 +283,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket // Fail to iterate over the bucket while discovering users. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", nil, errors.New("failed to iterate the bucket")) bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) @@ -352,6 +356,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( userID := "test-user" bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{userID}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX", userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) @@ -408,6 +413,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -457,6 +463,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -601,6 +608,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) @@ -722,6 +730,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -807,6 +816,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("__markers__", []string{"__markers__/user-1/"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) @@ -970,6 +980,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) @@ -1080,6 +1091,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM // Mock the bucket to contain all users, each one with one block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", userIDs, nil) bucketClient.MockIter("__markers__", []string{}, nil) for _, userID := range userIDs { @@ -1190,6 +1202,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit // Mock the bucket to contain all users, each one with five blocks, 2 sets of overlapping blocks and 1 separate block. bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", userIDs, nil) bucketClient.MockIter("__markers__", []string{}, nil) @@ -1479,6 +1492,8 @@ func removeIgnoredLogs(input []string) []string { `level=info component=compactor msg="changing instance state from" old_state=ACTIVE new_state=LEAVING ring=compactor`: {}, `level=error component=compactor msg="failed to set state to LEAVING" ring=compactor err="Changing instance state from LEAVING -> LEAVING is disallowed"`: {}, `level=error component=compactor msg="failed to set state to LEAVING" ring=compactor err="Changing instance state from JOINING -> LEAVING is disallowed"`: {}, + `level=info component=compactor msg="user index not found, fallback to base scanner"`: {}, + `level=error component=compactor msg="context timeout, exit user index update loop" err="context canceled"`: {}, `level=debug component=compactor msg="unregistering instance from ring" ring=compactor`: {}, `level=info component=compactor msg="instance removed from the KV store" ring=compactor`: {}, `level=info component=compactor msg="observing tokens before going ACTIVE" ring=compactor`: {}, @@ -1552,6 +1567,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Instrument storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery) + storageCfg.UsersScanner.Strategy = cortex_tsdb.UserScanStrategyUserIndex // Create a temporary directory for compactor data. compactorCfg.DataDir = t.TempDir() @@ -1839,6 +1855,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Mock the bucket bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("", []string{}, nil) bucketClient.MockIter("__markers__", []string{}, nil) @@ -1939,6 +1956,7 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrDuringMetaSync(t *tes require.NoError(t, err) bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) @@ -1989,6 +2007,7 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t require.NoError(t, err) bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) @@ -2039,6 +2058,7 @@ func TestCompactor_FailedWithRetriableError(t *testing.T) { require.NoError(t, err) bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) @@ -2092,6 +2112,7 @@ func TestCompactor_FailedWithHaltError(t *testing.T) { require.NoError(t, err) bucketClient := &bucket.ClientMock{} + bucketClient.MockGet(users.UserIndexCompressedFilename, "", nil) bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) @@ -2388,3 +2409,44 @@ func (l *mockTenantLimits) setLimits(userID string, limits *validation.Limits) { defer l.m.Unlock() l.limits[userID] = limits } + +func TestCompactor_UserIndexUpdateLoop(t *testing.T) { + // Prepare test dependencies + bucketClient, _ := cortex_storage_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + cfg := prepareConfig() + cfg.ShardingEnabled = true + cfg.ShardingRing.InstanceID = "compactor-1" + cfg.ShardingRing.InstanceAddr = "1.2.3.4" + cfg.ShardingRing.KVStore.Mock = ringStore + cfg.CleanupInterval = 100 * time.Millisecond // Short interval for testing + + compactor, _, _, _, _ := prepare(t, cfg, bucketClient, &validation.Limits{}) + + // Start the compactor service + require.NoError(t, services.StartAndAwaitRunning(context.Background(), compactor)) + + // Wait for the user index file to be created + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Poll for the user index file + for { + exists, err := bucketClient.Exists(ctx, "user-index.json.gz") + require.NoError(t, err) + if exists { + break + } + + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for user index file to be created") + case <-time.After(100 * time.Millisecond): + // Continue polling + } + } +} diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 74bf0750a33..2a7998281d2 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -86,6 +86,9 @@ func TestCortex(t *testing.T) { Backend: tsdb.IndexCacheBackendInMemory, }, }, + UsersScanner: tsdb.UsersScannerConfig{ + Strategy: tsdb.UserScanStrategyList, + }, }, RulerStorage: rulestore.Config{ Config: bucket.Config{ diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 1de6df37909..80c44b81b0b 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -24,6 +24,7 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/logutil" "github.com/cortexproject/cortex/pkg/ring" @@ -31,6 +32,7 @@ import ( cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" @@ -73,6 +75,8 @@ type Converter struct { ringSubservices *services.Manager ringSubservicesWatcher *services.FailureWatcher + usersScanner users.Scanner + bkt objstore.Bucket // chunk pool @@ -98,18 +102,26 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) { bkt, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "parquet-converter", logger, registerer) + if err != nil { + return nil, err + } + bkt = bucketindex.BucketWithGlobalMarkers(bkt) + usersScanner, err := users.NewScanner(storageCfg.UsersScanner, bkt, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "parquet-converter"}, registerer)) + if err != nil { + return nil, errors.Wrap(err, "unable to initialize users scanner") + } - return newConverter(cfg, bkt, storageCfg, blockRanges, logger, registerer, limits), err + return newConverter(cfg, bkt, storageCfg, blockRanges, logger, registerer, limits, usersScanner), err } -func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) *Converter { - bkt = bucketindex.BucketWithGlobalMarkers(bkt) +func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter { c := &Converter{ cfg: cfg, reg: registerer, storageCfg: storageCfg, logger: logger, limits: limits, + usersScanner: usersScanner, pool: chunkenc.NewPool(), blockRanges: blockRanges, fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil), @@ -257,14 +269,9 @@ func (c *Converter) stopping(_ error) error { } func (c *Converter) discoverUsers(ctx context.Context) ([]string, error) { - var users []string - - err := c.bkt.Iter(ctx, "", func(entry string) error { - users = append(users, strings.TrimSuffix(entry, "/")) - return nil - }) - - return users, err + // Only active users are considered. + active, _, _, err := c.usersScanner.ScanUsers(ctx) + return active, err } func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring ring.ReadRing, userID string) error { diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 3e6c9705122..c1795592ecd 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -25,6 +25,8 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -133,6 +135,7 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour} flagext.DefaultValues(&storageCfg) storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) // Create a temporary directory for compactor data. cfg.DataDir = t.TempDir() @@ -149,6 +152,10 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - c := newConverter(cfg, bucketClient, storageCfg, blockRanges.ToMilliseconds(), logger, registry, overrides) + scanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{ + Strategy: cortex_tsdb.UserScanStrategyList, + }, bucketClient, logger, registry) + require.NoError(t, err) + c := newConverter(cfg, bucketClient, storageCfg, blockRanges.ToMilliseconds(), logger, registry, overrides, scanner) return c, logger, registry } diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index 15e53dd66d4..c6a7eff87bd 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -23,6 +23,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/backoff" @@ -56,7 +57,7 @@ type BucketScanBlocksFinder struct { logger log.Logger bucketClient objstore.Bucket fetchersMetrics *storegateway.MetadataFetcherMetrics - usersScanner *cortex_tsdb.UsersScanner + usersScanner users.Scanner // We reuse the metadata fetcher instance for a given tenant both because of performance // reasons (the fetcher keeps a in-memory cache) and being able to collect and group metrics. @@ -73,18 +74,18 @@ type BucketScanBlocksFinder struct { scanLastSuccess prometheus.Gauge } -func NewBucketScanBlocksFinder(cfg BucketScanBlocksFinderConfig, bucketClient objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) *BucketScanBlocksFinder { +func NewBucketScanBlocksFinder(cfg BucketScanBlocksFinderConfig, usersScanner users.Scanner, bucketClient objstore.InstrumentedBucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) *BucketScanBlocksFinder { d := &BucketScanBlocksFinder{ cfg: cfg, cfgProvider: cfgProvider, logger: logger, bucketClient: bucketClient, fetchers: make(map[string]userFetcher), - usersScanner: cortex_tsdb.NewUsersScanner(bucketClient, cortex_tsdb.AllUsers, logger), userMetas: make(map[string]bucketindex.Blocks), userMetasLookup: make(map[string]map[ulid.ULID]*bucketindex.Block), userDeletionMarks: map[string]map[ulid.ULID]*bucketindex.BlockDeletionMark{}, fetchersMetrics: storegateway.NewMetadataFetcherMetrics(), + usersScanner: usersScanner, scanDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_querier_blocks_scan_duration_seconds", Help: "The total time it takes to run a full blocks scan across the storage.", @@ -183,7 +184,8 @@ func (d *BucketScanBlocksFinder) scanBucket(ctx context.Context) (returnErr erro }(time.Now()) // Discover all users first. This helps cacheability of the object store call. - userIDs, _, err := d.usersScanner.ScanUsers(ctx) + // Only active users are considered. + userIDs, _, _, err := d.usersScanner.ScanUsers(ctx) if err != nil { return err } diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index 6dab2ca8660..1b977be5f5e 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -23,6 +23,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -86,7 +87,13 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) { cfg := prepareBucketScanBlocksFinderConfig() cfg.CacheDir = t.TempDir() - s := NewBucketScanBlocksFinder(cfg, bucket, nil, log.NewNopLogger(), reg) + usersScanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{ + Strategy: cortex_tsdb.UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, bucket, log.NewNopLogger(), reg) + require.NoError(t, err) + s := NewBucketScanBlocksFinder(cfg, usersScanner, bucket, nil, log.NewNopLogger(), reg) defer func() { s.StopAsync() s.AwaitTerminated(context.Background()) //nolint: errcheck @@ -154,7 +161,14 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *t cfg.MetasConcurrency = 1 cfg.TenantsConcurrency = 1 - s := NewBucketScanBlocksFinder(cfg, bucket, nil, log.NewLogfmtLogger(os.Stdout), nil) + reg := prometheus.NewRegistry() + usersScanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{ + Strategy: cortex_tsdb.UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, bucket, log.NewNopLogger(), reg) + require.NoError(t, err) + s := NewBucketScanBlocksFinder(cfg, usersScanner, bucket, nil, log.NewLogfmtLogger(os.Stdout), reg) // Start the scanner, let it run for 1s and then issue a stop. require.NoError(t, s.StartAsync(context.Background())) @@ -191,7 +205,14 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *te cfg.MetasConcurrency = 1 cfg.TenantsConcurrency = 1 - s := NewBucketScanBlocksFinder(cfg, bucket, nil, log.NewLogfmtLogger(os.Stdout), nil) + reg := prometheus.NewRegistry() + usersScanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{ + Strategy: cortex_tsdb.UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, bucket, log.NewNopLogger(), reg) + require.NoError(t, err) + s := NewBucketScanBlocksFinder(cfg, usersScanner, bucket, nil, log.NewLogfmtLogger(os.Stdout), reg) // Start the scanner, let it run for 1s and then issue a stop. require.NoError(t, s.StartAsync(context.Background())) @@ -504,7 +525,16 @@ func prepareBucketScanBlocksFinder(t *testing.T, cfg BucketScanBlocksFinderConfi reg := prometheus.NewPedanticRegistry() cfg.CacheDir = t.TempDir() - s := NewBucketScanBlocksFinder(cfg, bkt, nil, log.NewNopLogger(), reg) + + // Create a user scanner with list strategy + usersScanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{ + Strategy: cortex_tsdb.UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, bkt, log.NewNopLogger(), reg) + require.NoError(t, err) + + s := NewBucketScanBlocksFinder(cfg, usersScanner, bkt, nil, log.NewNopLogger(), reg) t.Cleanup(func() { s.StopAsync() diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 83de51b28fc..bd40b0a1c69 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -44,6 +44,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/tenant" @@ -213,6 +214,10 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin, }, bucketClient, limits, logger, reg) } else { + usersScanner, err := users.NewScanner(storageCfg.UsersScanner, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + if err != nil { + return nil, errors.Wrap(err, "failed to create users scanner for bucket scan blocks finder") + } finder = NewBucketScanBlocksFinder(BucketScanBlocksFinderConfig{ ScanInterval: storageCfg.BucketStore.SyncInterval, TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency, @@ -221,7 +226,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin, BlockDiscoveryStrategy: storageCfg.BucketStore.BlockDiscoveryStrategy, - }, bucketClient, limits, logger, reg) + }, usersScanner, bucketClient, limits, logger, reg) } if gatewayCfg.ShardingEnabled { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 13daaff3f4c..5e62165a105 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -67,9 +67,10 @@ var ( // //nolint:revive type BlocksStorageConfig struct { - Bucket bucket.Config `yaml:",inline"` - BucketStore BucketStoreConfig `yaml:"bucket_store" doc:"description=This configures how the querier and store-gateway discover and synchronize blocks stored in the bucket."` - TSDB TSDBConfig `yaml:"tsdb"` + Bucket bucket.Config `yaml:",inline"` + BucketStore BucketStoreConfig `yaml:"bucket_store" doc:"description=This configures how the querier and store-gateway discover and synchronize blocks stored in the bucket."` + TSDB TSDBConfig `yaml:"tsdb"` + UsersScanner UsersScannerConfig `yaml:"users_scanner"` } // DurationList is the block ranges for a tsdb @@ -109,11 +110,12 @@ func (d *DurationList) ToMilliseconds() []int64 { return values } -// RegisterFlags registers the TSDB flags +// RegisterFlags registers the block storage flags func (cfg *BlocksStorageConfig) RegisterFlags(f *flag.FlagSet) { cfg.Bucket.RegisterFlagsWithPrefix("blocks-storage.", f) cfg.BucketStore.RegisterFlags(f) cfg.TSDB.RegisterFlags(f) + cfg.UsersScanner.RegisterFlagsWithPrefix("blocks-storage", f) } // Validate the config. @@ -126,6 +128,10 @@ func (cfg *BlocksStorageConfig) Validate() error { return err } + if err := cfg.UsersScanner.Validate(); err != nil { + return err + } + return cfg.BucketStore.Validate() } diff --git a/pkg/storage/tsdb/users/cache.go b/pkg/storage/tsdb/users/cache.go new file mode 100644 index 00000000000..09cbc8738eb --- /dev/null +++ b/pkg/storage/tsdb/users/cache.go @@ -0,0 +1,71 @@ +package users + +import ( + "context" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +// cachedScanner is a scanner that caches the result of the underlying scanner. +type cachedScanner struct { + scanner Scanner + + mtx sync.RWMutex + lastUpdatedAt time.Time + ttl time.Duration + + active, deleting, deleted []string + + requests prometheus.Counter + hits prometheus.Counter +} + +func newCachedScanner(scanner Scanner, cfg tsdb.UsersScannerConfig, reg prometheus.Registerer) *cachedScanner { + return &cachedScanner{ + scanner: scanner, + ttl: cfg.CacheTTL, + requests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_cached_users_scanner_requests_total", + Help: "Total number of scans made to the cache scanner", + }), + hits: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_cached_users_scanner_hits_total", + Help: "Total number of hits of scanner cache", + }), + } +} + +func (s *cachedScanner) ScanUsers(ctx context.Context) ([]string, []string, []string, error) { + s.requests.Inc() + s.mtx.RLock() + // Check if we have a valid cached result + if !s.lastUpdatedAt.Before(time.Now().Add(-s.ttl)) { + active := s.active + deleting := s.deleting + deleted := s.deleted + s.mtx.RUnlock() + s.hits.Inc() + return active, deleting, deleted, nil + } + s.mtx.RUnlock() + + // TODO: move to promise based. + active, deleting, deleted, err := s.scanner.ScanUsers(ctx) + if err != nil { + return nil, nil, nil, err + } + + s.mtx.Lock() + s.active = active + s.deleting = deleting + s.deleted = deleted + s.lastUpdatedAt = time.Now() + s.mtx.Unlock() + + return active, deleting, deleted, nil +} diff --git a/pkg/storage/tsdb/users/cache_test.go b/pkg/storage/tsdb/users/cache_test.go new file mode 100644 index 00000000000..9b1f0d7d426 --- /dev/null +++ b/pkg/storage/tsdb/users/cache_test.go @@ -0,0 +1,154 @@ +package users + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +func TestCachedScanner_ScanUsers(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + tests := map[string]struct { + scanner *mockScanner + ttl time.Duration + waitTime time.Duration + expectedCalls int + expectErr bool + }{ + "cache hit within TTL": { + scanner: &mockScanner{ + active: []string{"user-1"}, + deleting: []string{"user-2"}, + deleted: []string{"user-3"}, + }, + ttl: 1 * time.Hour, + waitTime: 0, + expectedCalls: 1, + expectErr: false, + }, + "cache miss after TTL": { + scanner: &mockScanner{ + active: []string{"user-1"}, + deleting: []string{"user-2"}, + deleted: []string{"user-3"}, + }, + ttl: 100 * time.Millisecond, + waitTime: 500 * time.Millisecond, + expectedCalls: 2, + expectErr: false, + }, + "scanner error": { + scanner: &mockScanner{ + err: assert.AnError, + }, + ttl: 1 * time.Hour, + waitTime: 0, + expectedCalls: 1, + expectErr: true, + }, + "empty results": { + scanner: &mockScanner{ + active: []string{}, + deleting: []string{}, + deleted: []string{}, + }, + ttl: 1 * time.Hour, + waitTime: 0, + expectedCalls: 1, + expectErr: false, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + reg := prometheus.NewRegistry() + cachedScanner := newCachedScanner(testData.scanner, tsdb.UsersScannerConfig{ + CacheTTL: testData.ttl, + }, reg) + + // First call + active, deleting, deleted, err := cachedScanner.ScanUsers(ctx) + if testData.expectErr { + require.Error(t, err) + assert.Equal(t, assert.AnError, err) + return + } + require.NoError(t, err) + assert.Equal(t, testData.scanner.active, active) + assert.Equal(t, testData.scanner.deleting, deleting) + assert.Equal(t, testData.scanner.deleted, deleted) + + // Wait if needed + if testData.waitTime > 0 { + time.Sleep(testData.waitTime) + } + + // Second call + active, deleting, deleted, err = cachedScanner.ScanUsers(ctx) + if testData.expectErr { + require.Error(t, err) + assert.Equal(t, assert.AnError, err) + return + } + require.NoError(t, err) + assert.Equal(t, testData.scanner.active, active) + assert.Equal(t, testData.scanner.deleting, deleting) + assert.Equal(t, testData.scanner.deleted, deleted) + + // Verify number of calls to underlying scanner + assert.Equal(t, testData.expectedCalls, testData.scanner.calls) + }) + } +} + +func TestCachedScanner_ConcurrentAccess(t *testing.T) { + t.Parallel() + + ctx := context.Background() + reg := prometheus.NewRegistry() + scanner := &mockScanner{ + active: []string{"user-1"}, + deleting: []string{"user-2"}, + deleted: []string{"user-3"}, + } + + cachedScanner := newCachedScanner(scanner, tsdb.UsersScannerConfig{ + CacheTTL: 1 * time.Hour, + }, reg) + + // Run multiple concurrent scans + const goroutines = 10 + done := make(chan struct{}) + + for i := 0; i < goroutines; i++ { + go func() { + defer func() { done <- struct{}{} }() + + active, deleting, deleted, err := cachedScanner.ScanUsers(ctx) + require.NoError(t, err) + assert.Equal(t, scanner.active, active) + assert.Equal(t, scanner.deleting, deleting) + assert.Equal(t, scanner.deleted, deleted) + }() + } + + // Wait for all goroutines to complete + for i := 0; i < goroutines; i++ { + <-done + } + + // Verify that the underlying scanner was called only once + assert.Equal(t, 1, scanner.calls) +} diff --git a/pkg/storage/tsdb/users/index.go b/pkg/storage/tsdb/users/index.go new file mode 100644 index 00000000000..ae273d203a5 --- /dev/null +++ b/pkg/storage/tsdb/users/index.go @@ -0,0 +1,96 @@ +package users + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "time" + + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/objstore" + + "github.com/cortexproject/cortex/pkg/util/runutil" +) + +const ( + userIndexVersion = 1 + UserIndexFilename = "user-index.json" + UserIndexCompressedFilename = UserIndexFilename + ".gz" +) + +var ( + ErrIndexNotFound = errors.New("user index not found") + ErrIndexCorrupted = errors.New("user index corrupted") +) + +type UserIndex struct { + Version int `json:"version"` + UpdatedAt int64 `json:"updated_at"` + + ActiveUsers []string `json:"active"` + DeletingUsers []string `json:"deleting"` + DeletedUsers []string `json:"deleted"` +} + +func (idx *UserIndex) GetUpdatedAt() time.Time { + return time.Unix(idx.UpdatedAt, 0) +} + +// WriteUserIndex uploads the provided index to the storage. +func WriteUserIndex(ctx context.Context, bkt objstore.Bucket, idx *UserIndex) error { + // Marshal the index. + content, err := json.Marshal(idx) + if err != nil { + return errors.Wrap(err, "marshal user index") + } + + // Compress it. + var gzipContent bytes.Buffer + gzip := gzip.NewWriter(&gzipContent) + gzip.Name = UserIndexFilename + + if _, err := gzip.Write(content); err != nil { + return errors.Wrap(err, "gzip user index") + } + if err := gzip.Close(); err != nil { + return errors.Wrap(err, "close gzip user index") + } + + // Upload the index to the storage. + if err := bkt.Upload(ctx, UserIndexCompressedFilename, bytes.NewReader(gzipContent.Bytes())); err != nil { + return errors.Wrap(err, "upload user index") + } + + return nil +} + +func ReadUserIndex(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger) (*UserIndex, error) { + // Get the user index. + reader, err := bkt.WithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, UserIndexCompressedFilename) + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return nil, ErrIndexNotFound + } + + return nil, errors.Wrap(err, "read user index") + } + defer runutil.CloseWithLogOnErr(logger, reader, "close user index reader") + + // Read all the content. + gzipReader, err := gzip.NewReader(reader) + if err != nil { + return nil, ErrIndexCorrupted + } + defer runutil.CloseWithLogOnErr(logger, gzipReader, "close user index gzip reader") + + // Deserialize it. + index := &UserIndex{} + d := json.NewDecoder(gzipReader) + if err := d.Decode(index); err != nil { + return nil, ErrIndexCorrupted + } + + return index, nil +} diff --git a/pkg/storage/tsdb/users/index_test.go b/pkg/storage/tsdb/users/index_test.go new file mode 100644 index 00000000000..7fcf235466b --- /dev/null +++ b/pkg/storage/tsdb/users/index_test.go @@ -0,0 +1,106 @@ +package users + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" +) + +func TestWriteAndReadUserIndex(t *testing.T) { + t.Parallel() + + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Create a test index + now := time.Now() + idx := &UserIndex{ + Version: userIndexVersion, + UpdatedAt: now.Unix(), + ActiveUsers: []string{"user-1", "user-2"}, + DeletingUsers: []string{"user-3"}, + DeletedUsers: []string{"user-4"}, + } + + // Test writing the index + err := WriteUserIndex(ctx, bkt, idx) + require.NoError(t, err) + + // Verify the index was written with the correct filename + _, err = bkt.Get(ctx, UserIndexFilename) + require.Error(t, err) + require.True(t, bkt.IsObjNotFoundErr(err)) + + // Test reading the index + readIdx, err := ReadUserIndex(ctx, bkt, log.NewNopLogger()) + require.NoError(t, err) + require.NotNil(t, readIdx) + + // Verify the read index matches the written one + assert.Equal(t, idx.Version, readIdx.Version) + assert.Equal(t, idx.UpdatedAt, readIdx.UpdatedAt) + assert.Equal(t, idx.ActiveUsers, readIdx.ActiveUsers) + assert.Equal(t, idx.DeletingUsers, readIdx.DeletingUsers) + assert.Equal(t, idx.DeletedUsers, readIdx.DeletedUsers) +} + +func TestReadUserIndex_NotFound(t *testing.T) { + t.Parallel() + + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Test reading non-existent index + _, err := ReadUserIndex(ctx, bkt, log.NewNopLogger()) + require.Error(t, err) + assert.Equal(t, ErrIndexNotFound, err) +} + +func TestReadUserIndex_Corrupted(t *testing.T) { + t.Parallel() + + ctx := context.Background() + bkt := &bucket.ClientMock{} + + // Mock a corrupted index file + bkt.MockGet(UserIndexCompressedFilename, "invalid gzip content", nil) + + // Test reading corrupted index + _, err := ReadUserIndex(ctx, bkt, log.NewNopLogger()) + require.Error(t, err) + assert.Equal(t, ErrIndexCorrupted, err) +} + +func TestUserIndex_GetUpdatedAt(t *testing.T) { + t.Parallel() + + now := time.Now() + idx := &UserIndex{ + UpdatedAt: now.Unix(), + } + + // Test GetUpdatedAt returns the correct time + assert.Equal(t, now.Unix(), idx.GetUpdatedAt().Unix()) +} + +func TestWriteUserIndex_Error(t *testing.T) { + t.Parallel() + + ctx := context.Background() + bkt := &bucket.ClientMock{} + + // Mock upload error + bkt.MockUpload(UserIndexCompressedFilename, assert.AnError) + + // Test writing index with error + err := WriteUserIndex(ctx, bkt, &UserIndex{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "upload user index") +} diff --git a/pkg/storage/tsdb/users/scanner.go b/pkg/storage/tsdb/users/scanner.go new file mode 100644 index 00000000000..f77de5b4f6f --- /dev/null +++ b/pkg/storage/tsdb/users/scanner.go @@ -0,0 +1,231 @@ +package users + +import ( + "context" + "errors" + "slices" + "sort" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/objstore" + + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/tenant" +) + +var ( + userIDsToSkip = []string{tenant.GlobalMarkersDir, UserIndexCompressedFilename} +) + +type Scanner interface { + // ScanUsers returns the list of active, deleting and deleted users. + // Both deleting and deleted users are marked for deletion. The difference is that + // deleting users might still have data in the bucket, while deleted users don't. + ScanUsers(ctx context.Context) (active, deleting, deleted []string, err error) +} + +func NewScanner(cfg tsdb.UsersScannerConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (Scanner, error) { + var scanner Scanner + switch cfg.Strategy { + case tsdb.UserScanStrategyList: + scanner = &listScanner{bkt: bkt} + case tsdb.UserScanStrategyUserIndex: + scanner = newUserIndexScanner(&listScanner{bkt: bkt}, cfg, bkt, logger, reg) + default: + return nil, tsdb.ErrInvalidUserScannerStrategy + } + + if cfg.CacheTTL > 0 { + scanner = newCachedScanner(scanner, cfg, reg) + } + + return scanner, nil +} + +func NewShardedScanner(scanner Scanner, isOwned func(userID string) (bool, error), logger log.Logger) Scanner { + return &shardedScanner{ + scanner: scanner, + isOwned: isOwned, + logger: logger, + } +} + +type listScanner struct { + bkt objstore.InstrumentedBucket +} + +func (s *listScanner) ScanUsers(ctx context.Context) (active, deleting, deleted []string, err error) { + scannedActiveUsers := make(map[string]struct{}) + scannedMarkedForDeletionUsers := make(map[string]struct{}) + deletingUsers := make(map[string]struct{}) + + // Scan users in the bucket. + err = s.bkt.Iter(ctx, "", func(entry string) error { + userID := strings.TrimSuffix(entry, "/") + if slices.Contains(userIDsToSkip, userID) { + return nil + } + scannedActiveUsers[userID] = struct{}{} + return nil + }) + if err != nil { + return nil, nil, nil, err + } + + // Scan users from the __markers__ directory. + err = s.bkt.Iter(ctx, tenant.GlobalMarkersDir, func(entry string) error { + // entry will be of the form __markers__// + parts := strings.Split(entry, objstore.DirDelim) + userID := parts[1] + scannedMarkedForDeletionUsers[userID] = struct{}{} + return nil + }) + if err != nil { + return nil, nil, nil, err + } + + for userID := range scannedActiveUsers { + // Tenant deletion mark could exist in local path for legacy code. + // If tenant deletion mark exists but user ID prefix exists in the bucket, mark it as deleting. + if deletionMarkExists, err := tsdb.TenantDeletionMarkExists(ctx, s.bkt, userID); err == nil && deletionMarkExists { + deletingUsers[userID] = struct{}{} + continue + } + + active = append(active, userID) + } + + for userID := range scannedMarkedForDeletionUsers { + // User marked for deletion but no user ID prefix in the bucket, mark it as deleted. + if _, ok := deletingUsers[userID]; !ok { + deleted = append(deleted, userID) + } + } + + for userID := range deletingUsers { + deleting = append(deleting, userID) + } + + // Sort for deterministic results in testing. There is no contract for list of users to be sorted. + sort.Strings(active) + sort.Strings(deleting) + sort.Strings(deleted) + return active, deleting, deleted, nil +} + +type userIndexScanner struct { + bkt objstore.InstrumentedBucket + logger log.Logger + baseScanner Scanner + + // Maximum period of time to consider the user index as stale. + maxStalePeriod time.Duration + + fallbackScans *prometheus.CounterVec + succeededScans prometheus.Counter + userIndexUpdateDelay prometheus.Gauge +} + +func newUserIndexScanner(baseScanner Scanner, cfg tsdb.UsersScannerConfig, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) *userIndexScanner { + return &userIndexScanner{ + bkt: bkt, + logger: logger, + baseScanner: baseScanner, + maxStalePeriod: cfg.MaxStalePeriod, + fallbackScans: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_user_index_scan_fallbacks_total", + Help: "Total number of fallbacks to base scanner", + }, []string{"reason"}), + succeededScans: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_user_index_scan_succeeded_total", + Help: "Total number of successful scans using user index", + }), + userIndexUpdateDelay: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_user_index_update_delay_seconds", + Help: "Time offset in seconds between now and user index file updated time", + }), + } +} + +func (s *userIndexScanner) ScanUsers(ctx context.Context) ([]string, []string, []string, error) { + userIndex, err := ReadUserIndex(ctx, s.bkt, s.logger) + if err != nil { + if errors.Is(err, ErrIndexNotFound) { + level.Info(s.logger).Log("msg", "user index not found, fallback to base scanner") + s.fallbackScans.WithLabelValues("not-found").Inc() + } else { + // Always fallback to the list scanner if failed to read the user index. + level.Error(s.logger).Log("msg", "failed to read user index, fallback to base scanner", "error", err) + s.fallbackScans.WithLabelValues("corrupted").Inc() + } + return s.baseScanner.ScanUsers(ctx) + } + + now := time.Now() + updatedAt := userIndex.GetUpdatedAt() + s.userIndexUpdateDelay.Set(time.Since(updatedAt).Seconds()) + if updatedAt.Before(now.Add(-s.maxStalePeriod)) { + level.Warn(s.logger).Log("msg", "user index is stale, fallback to base scanner", "updated_at", userIndex.GetUpdatedAt(), "max_stale_period", s.maxStalePeriod) + s.fallbackScans.WithLabelValues("too_old").Inc() + return s.baseScanner.ScanUsers(ctx) + } + + s.succeededScans.Inc() + return userIndex.ActiveUsers, userIndex.DeletingUsers, userIndex.DeletedUsers, nil +} + +// shardedScanner is a user scanner but applies a filter to the users to check ownership. +type shardedScanner struct { + scanner Scanner + isOwned func(userID string) (bool, error) + logger log.Logger +} + +func (s *shardedScanner) ScanUsers(ctx context.Context) ([]string, []string, []string, error) { + baseActiveUsers, baseDeletingUsers, baseDeletedUsers, err := s.scanner.ScanUsers(ctx) + if err != nil { + return nil, nil, nil, err + } + + activeUsers := make([]string, 0, len(baseActiveUsers)) + deletingUsers := make([]string, 0, len(baseDeletingUsers)) + deletedUsers := make([]string, 0, len(baseDeletedUsers)) + + for _, userID := range baseActiveUsers { + // Filter out users not owned by this instance. + if owned, err := s.isOwned(userID); err != nil { + level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) + continue + } else if !owned { + continue + } + activeUsers = append(activeUsers, userID) + } + for _, userID := range baseDeletingUsers { + // Filter out users not owned by this instance. + if owned, err := s.isOwned(userID); err != nil { + level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) + continue + } else if !owned { + continue + } + deletingUsers = append(deletingUsers, userID) + } + for _, userID := range baseDeletedUsers { + // Filter out users not owned by this instance. + if owned, err := s.isOwned(userID); err != nil { + level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) + continue + } else if !owned { + continue + } + deletedUsers = append(deletedUsers, userID) + } + + return activeUsers, deletingUsers, deletedUsers, nil +} diff --git a/pkg/storage/tsdb/users/scanner_test.go b/pkg/storage/tsdb/users/scanner_test.go new file mode 100644 index 00000000000..433f85d3aec --- /dev/null +++ b/pkg/storage/tsdb/users/scanner_test.go @@ -0,0 +1,294 @@ +package users + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +func TestListScanner_ScanUsers(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + tests := map[string]struct { + bucketSetup func(*bucket.ClientMock) + expectedActive []string + expectedDeleting []string + expectedDeleted []string + expectErr bool + }{ + "successful scan with all user types": { + bucketSetup: func(b *bucket.ClientMock) { + // Active users + b.MockIter("", []string{"user-1/", "user-2/", "user-3/"}, nil) + // Marked for deletion users + b.MockIter("__markers__", []string{"__markers__/user-1/", "__markers__/user-4/", "__markers__/user-5/"}, nil) + // Deletion marks + b.MockExists(tsdb.GetGlobalDeletionMarkPath("user-1"), true, nil) + b.MockExists(tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + b.MockExists(tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) + b.MockExists(tsdb.GetLocalDeletionMarkPath("user-2"), false, nil) + b.MockExists(tsdb.GetGlobalDeletionMarkPath("user-3"), false, nil) + b.MockExists(tsdb.GetLocalDeletionMarkPath("user-3"), false, nil) + }, + expectedActive: []string{"user-2", "user-3"}, + expectedDeleting: []string{"user-1"}, + expectedDeleted: []string{"user-4", "user-5"}, + expectErr: false, + }, + "bucket iteration error": { + bucketSetup: func(b *bucket.ClientMock) { + b.MockIter("", nil, errors.New("failed to iterate bucket")) + }, + expectErr: true, + }, + "markers iteration error": { + bucketSetup: func(b *bucket.ClientMock) { + b.MockIter("", []string{"user-1/"}, nil) + b.MockIter("__markers__", nil, errors.New("failed to iterate markers")) + }, + expectErr: true, + }, + "empty bucket": { + bucketSetup: func(b *bucket.ClientMock) { + b.MockIter("", []string{}, nil) + b.MockIter("__markers__", []string{}, nil) + }, + expectedActive: []string{}, + expectedDeleting: []string{}, + expectedDeleted: []string{}, + expectErr: false, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + bucketClient := &bucket.ClientMock{} + testData.bucketSetup(bucketClient) + + scanner := &listScanner{bkt: bucketClient} + active, deleting, deleted, err := scanner.ScanUsers(ctx) + + if testData.expectErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedActive, active) + assert.ElementsMatch(t, testData.expectedDeleting, deleting) + assert.ElementsMatch(t, testData.expectedDeleted, deleted) + }) + } +} + +func TestUserIndexScanner_ScanUsers(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := log.NewNopLogger() + + tests := map[string]struct { + bucketSetup func(*bucket.ClientMock) + maxStalePeriod time.Duration + expectedActive []string + expectedDeleting []string + expectedDeleted []string + expectErr bool + }{ + "successful scan from index": { + bucketSetup: func(b *bucket.ClientMock) { + index := &UserIndex{ + Version: userIndexVersion, + ActiveUsers: []string{"user-1", "user-2"}, + DeletingUsers: []string{"user-3"}, + DeletedUsers: []string{"user-4"}, + UpdatedAt: time.Now().Unix(), + } + indexBytes, err := json.Marshal(index) + require.NoError(t, err) + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err = gw.Write(indexBytes) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + b.MockGet(UserIndexCompressedFilename, buf.String(), nil) + }, + maxStalePeriod: 1 * time.Hour, + expectedActive: []string{"user-1", "user-2"}, + expectedDeleting: []string{"user-3"}, + expectedDeleted: []string{"user-4"}, + expectErr: false, + }, + "stale index falls back to base scanner": { + bucketSetup: func(b *bucket.ClientMock) { + // Return stale index + index := &UserIndex{ + Version: userIndexVersion, + ActiveUsers: []string{"user-1"}, + DeletingUsers: []string{}, + DeletedUsers: []string{}, + UpdatedAt: time.Now().Add(-2 * time.Hour).Unix(), + } + indexBytes, err := json.Marshal(index) + require.NoError(t, err) + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err = gw.Write(indexBytes) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + b.MockGet(UserIndexCompressedFilename, buf.String(), nil) + + // Base scanner results + b.MockIter("", []string{"user-2/"}, nil) + b.MockIter("__markers__", []string{}, nil) + b.MockExists(tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) + b.MockExists(tsdb.GetLocalDeletionMarkPath("user-2"), false, nil) + }, + maxStalePeriod: 1 * time.Hour, + expectedActive: []string{"user-2"}, + expectedDeleting: []string{}, + expectedDeleted: []string{}, + expectErr: false, + }, + "index read error falls back to base scanner": { + bucketSetup: func(b *bucket.ClientMock) { + b.MockGet(UserIndexCompressedFilename, "", errors.New("failed to read index")) + b.MockIter("", []string{"user-1/"}, nil) + b.MockIter("__markers__", []string{}, nil) + b.MockExists(tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + b.MockExists(tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + }, + maxStalePeriod: 1 * time.Hour, + expectedActive: []string{"user-1"}, + expectedDeleting: []string{}, + expectedDeleted: []string{}, + expectErr: false, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + bucketClient := &bucket.ClientMock{} + testData.bucketSetup(bucketClient) + + baseScanner := &listScanner{bkt: bucketClient} + scanner := newUserIndexScanner(baseScanner, tsdb.UsersScannerConfig{ + MaxStalePeriod: testData.maxStalePeriod, + }, bucketClient, logger, nil) + + active, deleting, deleted, err := scanner.ScanUsers(ctx) + + if testData.expectErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedActive, active) + assert.ElementsMatch(t, testData.expectedDeleting, deleting) + assert.ElementsMatch(t, testData.expectedDeleted, deleted) + }) + } +} + +func TestShardedScanner_ScanUsers(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := log.NewNopLogger() + + tests := map[string]struct { + baseScanner Scanner + isOwned func(string) (bool, error) + expectedActive []string + expectedDeleting []string + expectedDeleted []string + expectErr bool + }{ + "successful scan with ownership check": { + baseScanner: &mockScanner{ + active: []string{"user-1", "user-2", "user-3"}, + deleting: []string{"user-4", "user-5"}, + deleted: []string{"user-6", "user-7"}, + }, + isOwned: func(userID string) (bool, error) { + return userID == "user-1" || userID == "user-4" || userID == "user-6", nil + }, + expectedActive: []string{"user-1"}, + expectedDeleting: []string{"user-4"}, + expectedDeleted: []string{"user-6"}, + expectErr: false, + }, + "ownership check error": { + baseScanner: &mockScanner{ + active: []string{"user-1"}, + deleting: []string{}, + deleted: []string{}, + }, + isOwned: func(userID string) (bool, error) { + return false, errors.New("failed to check ownership") + }, + expectedActive: []string{}, + expectedDeleting: []string{}, + expectedDeleted: []string{}, + expectErr: false, + }, + "base scanner error": { + baseScanner: &mockScanner{ + err: errors.New("base scanner error"), + }, + isOwned: func(userID string) (bool, error) { + return true, nil + }, + expectErr: true, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + scanner := &shardedScanner{ + scanner: testData.baseScanner, + isOwned: testData.isOwned, + logger: logger, + } + + active, deleting, deleted, err := scanner.ScanUsers(ctx) + + if testData.expectErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + assert.ElementsMatch(t, testData.expectedActive, active) + assert.ElementsMatch(t, testData.expectedDeleting, deleting) + assert.ElementsMatch(t, testData.expectedDeleted, deleted) + }) + } +} diff --git a/pkg/storage/tsdb/users/updater.go b/pkg/storage/tsdb/users/updater.go new file mode 100644 index 00000000000..b6f185348b2 --- /dev/null +++ b/pkg/storage/tsdb/users/updater.go @@ -0,0 +1,48 @@ +package users + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/objstore" +) + +type UserIndexUpdater struct { + bkt objstore.InstrumentedBucket + scanner Scanner + + userIndexLastUpdated prometheus.Gauge +} + +func NewUserIndexUpdater(bkt objstore.InstrumentedBucket, scanner Scanner, reg prometheus.Registerer) *UserIndexUpdater { + return &UserIndexUpdater{ + bkt: bkt, + scanner: scanner, + userIndexLastUpdated: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_user_index_last_successful_update_timestamp_seconds", + Help: "Timestamp of the last successful update of user index.", + }), + } +} + +func (u *UserIndexUpdater) UpdateUserIndex(ctx context.Context) error { + active, deleting, deleted, err := u.scanner.ScanUsers(ctx) + if err != nil { + return err + } + + userIndex := &UserIndex{ + Version: userIndexVersion, + ActiveUsers: active, + DeletingUsers: deleting, + DeletedUsers: deleted, + UpdatedAt: time.Now().Unix(), + } + if err := WriteUserIndex(ctx, u.bkt, userIndex); err != nil { + return err + } + u.userIndexLastUpdated.SetToCurrentTime() + return nil +} diff --git a/pkg/storage/tsdb/users/updater_test.go b/pkg/storage/tsdb/users/updater_test.go new file mode 100644 index 00000000000..1828597b05c --- /dev/null +++ b/pkg/storage/tsdb/users/updater_test.go @@ -0,0 +1,126 @@ +package users + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" +) + +type mockScanner struct { + active []string + deleting []string + deleted []string + err error + calls int +} + +func (m *mockScanner) ScanUsers(ctx context.Context) (active, deleting, deleted []string, err error) { + m.calls++ + return m.active, m.deleting, m.deleted, m.err +} + +func TestUserIndexUpdater_UpdateUserIndex(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + tests := map[string]struct { + scanner *mockScanner + expectedIdx *UserIndex + expectErr bool + }{ + "successful update": { + scanner: &mockScanner{ + active: []string{"user-1", "user-2"}, + deleting: []string{"user-3"}, + deleted: []string{"user-4"}, + }, + expectedIdx: &UserIndex{ + Version: userIndexVersion, + ActiveUsers: []string{"user-1", "user-2"}, + DeletingUsers: []string{"user-3"}, + DeletedUsers: []string{"user-4"}, + }, + expectErr: false, + }, + "scanner error": { + scanner: &mockScanner{ + err: assert.AnError, + }, + expectErr: true, + }, + "empty lists": { + scanner: &mockScanner{ + active: []string{}, + deleting: []string{}, + deleted: []string{}, + }, + expectedIdx: &UserIndex{ + Version: userIndexVersion, + ActiveUsers: []string{}, + DeletingUsers: []string{}, + DeletedUsers: []string{}, + }, + expectErr: false, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + updater := NewUserIndexUpdater(bkt, testData.scanner, nil) + err := updater.UpdateUserIndex(ctx) + + if testData.expectErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + + // Read back the index and verify its contents + readIdx, err := ReadUserIndex(ctx, bkt, log.NewNopLogger()) + require.NoError(t, err) + require.NotNil(t, readIdx) + + // Verify the index contents + assert.Equal(t, testData.expectedIdx.Version, readIdx.Version) + assert.Equal(t, testData.expectedIdx.ActiveUsers, readIdx.ActiveUsers) + assert.Equal(t, testData.expectedIdx.DeletingUsers, readIdx.DeletingUsers) + assert.Equal(t, testData.expectedIdx.DeletedUsers, readIdx.DeletedUsers) + assert.WithinDuration(t, time.Now(), readIdx.GetUpdatedAt(), 5*time.Second) + }) + } +} + +func TestUserIndexUpdater_UpdateUserIndex_WriteError(t *testing.T) { + t.Parallel() + + ctx := context.Background() + bkt := &bucket.ClientMock{} + + // Mock the scanner to return valid data + scanner := &mockScanner{ + active: []string{"user-1"}, + deleting: []string{}, + deleted: []string{}, + } + + // Mock the bucket to return an error on upload + bkt.MockUpload(UserIndexCompressedFilename, assert.AnError) + + updater := NewUserIndexUpdater(bkt, scanner, nil) + err := updater.UpdateUserIndex(ctx) + require.Error(t, err) + assert.Contains(t, err.Error(), "upload user index") +} diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go deleted file mode 100644 index 0e4f7acbe0b..00000000000 --- a/pkg/storage/tsdb/users_scanner.go +++ /dev/null @@ -1,87 +0,0 @@ -package tsdb - -import ( - "context" - "strings" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/thanos-io/objstore" - - "github.com/cortexproject/cortex/pkg/tenant" -) - -// AllUsers returns true to each call and should be used whenever the UsersScanner should not filter out -// any user due to sharding. -func AllUsers(user string) (bool, error) { - if user == tenant.GlobalMarkersDir { - return false, nil - } - return true, nil -} - -type UsersScanner struct { - bucketClient objstore.Bucket - logger log.Logger - isOwned func(userID string) (bool, error) -} - -func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) (bool, error), logger log.Logger) *UsersScanner { - return &UsersScanner{ - bucketClient: bucketClient, - logger: logger, - isOwned: isOwned, - } -} - -// ScanUsers returns a fresh list of users found in the storage, that are not marked for deletion, -// and list of users marked for deletion. -// -// If sharding is enabled, returned lists contains only the users owned by this instance. -func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) { - scannedUsers := make(map[string]struct{}) - - // Scan users in the bucket. - err = s.bucketClient.Iter(ctx, "", func(entry string) error { - userID := strings.TrimSuffix(entry, "/") - scannedUsers[userID] = struct{}{} - return nil - }) - if err != nil { - return nil, nil, err - } - - // Scan users from the __markers__ directory. - err = s.bucketClient.Iter(ctx, tenant.GlobalMarkersDir, func(entry string) error { - // entry will be of the form __markers__// - parts := strings.Split(entry, objstore.DirDelim) - userID := parts[1] - scannedUsers[userID] = struct{}{} - return nil - }) - if err != nil { - return nil, nil, err - } - - for userID := range scannedUsers { - // Filter out users not owned by this instance. - if owned, err := s.isOwned(userID); err != nil { - level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) - } else if !owned { - continue - } - - // Filter users marked for deletion - if deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID); err != nil { - level.Warn(s.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err) - } else if deletionMarkExists { - markedForDeletion = append(markedForDeletion, userID) - continue - } - - // The remaining are the active users owned by this instance. - users = append(users, userID) - } - - return users, markedForDeletion, nil -} diff --git a/pkg/storage/tsdb/users_scanner_config.go b/pkg/storage/tsdb/users_scanner_config.go new file mode 100644 index 00000000000..5b556d9e4e4 --- /dev/null +++ b/pkg/storage/tsdb/users_scanner_config.go @@ -0,0 +1,47 @@ +package tsdb + +import ( + "flag" + "fmt" + "strings" + "time" + + "github.com/pkg/errors" +) + +type UsersScannerConfig struct { + Strategy string `yaml:"strategy"` + MaxStalePeriod time.Duration `yaml:"max_stale_period"` + CacheTTL time.Duration `yaml:"cache_ttl"` +} + +const ( + UserScanStrategyList = "list" + UserScanStrategyUserIndex = "user_index" +) + +var ( + ErrInvalidUserScannerStrategy = errors.New("invalid user scanner strategy") + ErrInvalidMaxStalePeriod = errors.New("max stale period must be positive") + ErrInvalidCacheTTL = errors.New("cache TTL must be >= 0") + supportedStrategies = []string{UserScanStrategyList, UserScanStrategyUserIndex} +) + +func (c *UsersScannerConfig) Validate() error { + if c.Strategy != UserScanStrategyList && c.Strategy != UserScanStrategyUserIndex { + return ErrInvalidUserScannerStrategy + } + if c.Strategy == UserScanStrategyUserIndex && c.MaxStalePeriod <= 0 { + return ErrInvalidMaxStalePeriod + } + if c.CacheTTL < 0 { + return ErrInvalidCacheTTL + } + return nil +} + +func (c *UsersScannerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&c.Strategy, prefix+".users-scanner.strategy", UserScanStrategyList, fmt.Sprintf("Strategy to use to scan users. Supported values are: %s.", strings.Join(supportedStrategies, ", "))) + f.DurationVar(&c.MaxStalePeriod, prefix+".users-scanner.user-index.max-stale-period", time.Hour, "Maximum period of time to consider the user index as stale. Fall back to the base scanner if stale. Only valid when strategy is user_index.") + f.DurationVar(&c.CacheTTL, prefix+".users-scanner.cache-ttl", 0, "TTL of the cached users. 0 disables caching and relies on caching at bucket client level.") +} diff --git a/pkg/storage/tsdb/users_scanner_config_test.go b/pkg/storage/tsdb/users_scanner_config_test.go new file mode 100644 index 00000000000..2abe0451c4a --- /dev/null +++ b/pkg/storage/tsdb/users_scanner_config_test.go @@ -0,0 +1,112 @@ +package tsdb + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestUsersScannerConfig_Validate(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + config UsersScannerConfig + expectErr error + }{ + "valid list strategy": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, + expectErr: nil, + }, + "valid user_index strategy": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyUserIndex, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, + expectErr: nil, + }, + "invalid strategy": { + config: UsersScannerConfig{ + Strategy: "invalid", + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, + expectErr: ErrInvalidUserScannerStrategy, + }, + "zero max stale period with user_index strategy": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyUserIndex, + MaxStalePeriod: 0, + CacheTTL: 0, + }, + expectErr: ErrInvalidMaxStalePeriod, + }, + "negative max stale period with user_index strategy": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyUserIndex, + MaxStalePeriod: -time.Hour, + CacheTTL: 0, + }, + expectErr: ErrInvalidMaxStalePeriod, + }, + "zero max stale period with list strategy": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyList, + MaxStalePeriod: 0, + CacheTTL: 0, + }, + expectErr: nil, + }, + "negative max stale period with list strategy": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyList, + MaxStalePeriod: -time.Hour, + CacheTTL: 0, + }, + expectErr: nil, + }, + "negative cache TTL": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: -time.Hour, + }, + expectErr: ErrInvalidCacheTTL, + }, + "zero cache TTL": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: 0, + }, + expectErr: nil, + }, + "positive cache TTL": { + config: UsersScannerConfig{ + Strategy: UserScanStrategyList, + MaxStalePeriod: time.Hour, + CacheTTL: time.Hour, + }, + expectErr: nil, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + err := testData.config.Validate() + if testData.expectErr != nil { + assert.ErrorIs(t, err, testData.expectErr) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go deleted file mode 100644 index a99926b3f4b..00000000000 --- a/pkg/storage/tsdb/users_scanner_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package tsdb - -import ( - "context" - "errors" - "slices" - "testing" - - "github.com/go-kit/log" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/storage/bucket" -) - -func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { - bucketClient := &bucket.ClientMock{} - bucketClient.MockIter("", []string{"user-1/", "user-2/", "user-3/", "user-4/"}, nil) - bucketClient.MockIter("__markers__", []string{"__markers__/user-5/", "__markers__/user-6/", "__markers__/user-7/"}, nil) - bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) - bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) - bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil) - bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), false, nil) - bucketClient.MockExists(GetGlobalDeletionMarkPath("user-7"), false, nil) - bucketClient.MockExists(GetLocalDeletionMarkPath("user-7"), true, nil) - - isOwned := func(userID string) (bool, error) { - return userID == "user-1" || userID == "user-3" || userID == "user-7", nil - } - - s := NewUsersScanner(bucketClient, isOwned, log.NewNopLogger()) - actual, deleted, err := s.ScanUsers(context.Background()) - require.NoError(t, err) - assert.Equal(t, []string{"user-1"}, actual) - slices.Sort(deleted) - assert.Equal(t, []string{"user-3", "user-7"}, deleted) -} - -func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDeletionCheckFailed(t *testing.T) { - expected := []string{"user-1", "user-2"} - - bucketClient := &bucket.ClientMock{} - bucketClient.MockIter("", expected, nil) - bucketClient.MockIter("__markers__", []string{}, nil) - bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) - bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) - - bucketClient.MockExists(GetGlobalDeletionMarkPath("user-2"), false, errors.New("fail")) - - isOwned := func(userID string) (bool, error) { - return false, errors.New("failed to check if user is owned") - } - - s := NewUsersScanner(bucketClient, isOwned, log.NewNopLogger()) - actual, deleted, err := s.ScanUsers(context.Background()) - require.NoError(t, err) - slices.Sort(actual) - assert.Equal(t, expected, actual) - assert.Empty(t, deleted) -} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index fe69645c57c..a7ebc486187 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -6,7 +6,6 @@ import ( "math" "os" "path/filepath" - "strings" "sync" "time" @@ -34,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/backoff" cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" @@ -81,6 +81,8 @@ type BucketStores struct { userTokenBucketsMu sync.RWMutex userTokenBuckets map[string]*util.TokenBucket + userScanner users.Scanner + // Keeps number of inflight requests inflightRequestCnt int inflightRequestMu sync.RWMutex @@ -142,6 +144,10 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra Help: "Number of tenants synced.", }), } + u.userScanner, err = users.NewScanner(cfg.UsersScanner, bucketClient, logger, reg) + if err != nil { + return nil, errors.Wrap(err, "failed to create users scanner") + } u.matcherCache = storecache.NoopMatchersCache @@ -242,9 +248,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte errs := tsdb_errors.NewMulti() errsMx := sync.Mutex{} - // Scan users in the bucket. In case of error, it may return a subset of users. If we sync a subset of users - // during a periodic sync, we may end up unloading blocks for users that still belong to this store-gateway - // so we do prefer to not run the sync at all. + // Scan users in the bucket. userIDs, err := u.scanUsers(ctx) if err != nil { return err @@ -442,18 +446,16 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues return store.LabelValues(ctx, req) } -// scanUsers in the bucket and return the list of found users. If an error occurs while -// iterating the bucket, it may return both an error and a subset of the users in the bucket. +// scanUsers in the bucket and return the list of found users. It includes active and deleting users +// but not deleted users. func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { - var users []string - - // Iterate the bucket to find all users in the bucket. Due to how the bucket listing - // caching works, it's more likely to have a cache hit if there's no delay while - // iterating the bucket, so we do load all users in memory and later process them. - err := u.bucket.Iter(ctx, "", func(s string) error { - users = append(users, strings.TrimSuffix(s, "/")) - return nil - }) + activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx) + if err != nil { + return nil, err + } + users := make([]string, 0, len(activeUsers)+len(deletingUsers)) + users = append(users, activeUsers...) + users = append(users, deletingUsers...) return users, err } diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 96db0194a1c..cf1c2a590b3 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -43,6 +43,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" ) @@ -425,6 +426,16 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", allUsers, nil) + bucketClient.MockIter(tenant.GlobalMarkersDir, []string{}, nil) + bucketClient.MockIter("user-1/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-1", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-1", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockIter("user-2/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-2", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-2", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockIter("user-3/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-3", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-3", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) require.NoError(t, err) @@ -437,7 +448,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { }) assert.NoError(t, err) - bucketClient.AssertNumberOfCalls(t, "Iter", 1) + bucketClient.AssertNumberOfCalls(t, "Iter", 2) assert.Equal(t, storesCount.Load(), testData.expectedStores) }) } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index ca35034bd43..8a4cb803800 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -39,6 +39,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_limiter "github.com/cortexproject/cortex/pkg/util/limiter" @@ -165,9 +166,16 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { assert.Equal(t, RingNumTokens, len(g.ringLifecycler.GetTokens())) assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens) }) + bucketClient.MockIter(tenant.GlobalMarkersDir, []string{}, nil) bucketClient.MockIter("user-1/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-1", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-1", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) bucketClient.MockIter("user-2/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-2", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-2", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) bucketClient.MockIter("user-disabled/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-disabled", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-disabled", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) // Once successfully started, the instance should be ACTIVE in the ring. require.NoError(t, services.StartAndAwaitRunning(ctx, g)) @@ -199,9 +207,16 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck bucketClient.MockIter("", []string{"user-1", "user-2", "user-disabled"}, nil) + bucketClient.MockIter(tenant.GlobalMarkersDir, []string{}, nil) bucketClient.MockIter("user-1/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-1", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-1", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) bucketClient.MockIter("user-2/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-2", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-2", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) bucketClient.MockIter("user-disabled/", []string{}, nil) + bucketClient.MockExists(path.Join(tenant.GlobalMarkersDir, "user-disabled", cortex_tsdb.TenantDeletionMarkFile), false, nil) + bucketClient.MockExists(path.Join("user-disabled", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) assert.NotNil(t, g.stores.getStore("user-1")) @@ -605,6 +620,7 @@ func TestStoreGateway_ShouldSupportLoadRingTokensFromFile(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter(tenant.GlobalMarkersDir, []string{}, nil) g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil, nil) require.NoError(t, err) @@ -815,6 +831,7 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter(tenant.GlobalMarkersDir, []string{}, nil) g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg, nil) require.NoError(t, err) @@ -877,6 +894,7 @@ func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testin bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter(tenant.GlobalMarkersDir, []string{}, nil) g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil, nil) require.NoError(t, err) diff --git a/pkg/tenant/tenant.go b/pkg/tenant/tenant.go index 07e8156d54c..9d9a7f4aad6 100644 --- a/pkg/tenant/tenant.go +++ b/pkg/tenant/tenant.go @@ -13,9 +13,10 @@ import ( const GlobalMarkersDir = "__markers__" var ( - errTenantIDTooLong = errors.New("tenant ID is too long: max 150 characters") - errTenantIDUnsafe = errors.New("tenant ID is '.' or '..'") - errTenantIDMarkers = errors.New("tenant ID '__markers__' is not allowed") + errTenantIDTooLong = errors.New("tenant ID is too long: max 150 characters") + errTenantIDUnsafe = errors.New("tenant ID is '.' or '..'") + errTenantIDMarkers = errors.New("tenant ID '__markers__' is not allowed") + errTenantIDUserIndex = errors.New("tenant ID 'user-index.json.gz' is not allowed") ) type errTenantIDUnsupportedCharacter struct { @@ -74,6 +75,10 @@ func ValidTenantID(s string) error { return errTenantIDMarkers } + if s == "user-index.json.gz" { + return errTenantIDUserIndex + } + // check tenantID is "." or ".." if containsUnsafePathSegments(s) { return errTenantIDUnsafe diff --git a/pkg/tenant/tenant_test.go b/pkg/tenant/tenant_test.go index 665f699c493..e2ec65bf41a 100644 --- a/pkg/tenant/tenant_test.go +++ b/pkg/tenant/tenant_test.go @@ -41,6 +41,10 @@ func TestValidTenantIDs(t *testing.T) { name: "__markers__", err: strptr("tenant ID '__markers__' is not allowed"), }, + { + name: "user-index.json.gz", + err: strptr("tenant ID 'user-index.json.gz' is not allowed"), + }, } { t.Run(tc.name, func(t *testing.T) { err := ValidTenantID(tc.name) diff --git a/tools/thanosconvert/thanosconvert.go b/tools/thanosconvert/thanosconvert.go index 840c4eadac0..03d042cc8bc 100644 --- a/tools/thanosconvert/thanosconvert.go +++ b/tools/thanosconvert/thanosconvert.go @@ -16,12 +16,13 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" ) // ThanosBlockConverter converts blocks written by Thanos to make them readable by Cortex type ThanosBlockConverter struct { logger log.Logger - bkt objstore.Bucket + bkt objstore.InstrumentedBucket dryRun bool } @@ -60,8 +61,11 @@ func NewThanosBlockConverter(ctx context.Context, cfg bucket.Config, dryRun bool func (c ThanosBlockConverter) Run(ctx context.Context) (Results, error) { results := make(Results) - scanner := cortex_tsdb.NewUsersScanner(c.bkt, cortex_tsdb.AllUsers, c.logger) - users, _, err := scanner.ScanUsers(ctx) + // Hardcode user scan strategy to list. + // We can safely ignore the error as it only fails if the strategy is not supported. + usersScanner, _ := users.NewScanner(cortex_tsdb.UsersScannerConfig{Strategy: cortex_tsdb.UserScanStrategyList}, c.bkt, c.logger, nil) + // Only active users are considered. + users, _, _, err := usersScanner.ScanUsers(ctx) if err != nil { return results, errors.Wrap(err, "error while scanning users") }