Skip to content

Commit

Permalink
Use a dedicated threadpool for store-gateway requests
Browse files Browse the repository at this point in the history
Remove the use of a dedicated threadpool for index-header operations
because the call overhead is prohibitively expensive. Instead, use a
dedicated threadpool for entire store-gateway requests so that the cost
of switching between threads is only paid a single time. This allows
for isolation in the case of page faults during mmap accesses without
too much overhead.

Fixes #1804

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed May 4, 2022
1 parent 221caaa commit 9032b67
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 218 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* [ENHANCEMENT] Ruler: Add more detailed query information to ruler query stats logging. #1411
* [ENHANCEMENT] Admin: Admin API now has some styling. #1482 #1549
* [ENHANCEMENT] Alertmanager: added `insight=true` field to alertmanager dispatch logs. #1379
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run index header operations in a dedicated thread pool. This feature can be configured using `-blocks-storage.bucket-store.index-header-thread-pool-size` and is disabled by default. #1660
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. #1660 #1812
* [ENHANCEMENT] Store-gateway: don't drop all blocks if instance finds itself as unhealthy in the ring. #1806
* [ENHANCEMENT] Querier: wait until inflight queries are completed when shutting down queriers. #1756 #1767
* [BUGFIX] Query-frontend: do not shard queries with a subquery unless the subquery is inside a shardable aggregation function call. #1542
Expand Down
22 changes: 11 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5107,17 +5107,6 @@
"fieldFlag": "blocks-storage.bucket-store.posting-offsets-in-mem-sampling",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "index_header_thread_pool_size",
"required": false,
"desc": "Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "blocks-storage.bucket-store.index-header-thread-pool-size",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down Expand Up @@ -6421,6 +6410,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "thread_pool_size",
"required": false,
"desc": "Number of OS threads that are dedicated for handling requests. Set to 0 to disable use of dedicated OS threads for handling requests.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "store-gateway.thread-pool-size",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ Usage of ./cmd/mimir/mimir:
If enabled, store-gateway will lazy load an index-header only once required by a query. (default true)
-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout duration
If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s)
-blocks-storage.bucket-store.index-header-thread-pool-size int
[experimental] Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.
-blocks-storage.bucket-store.max-chunk-pool-bytes uint
Max size - in bytes - of a chunks pool, used to reduce memory allocations. The pool is shared across all tenants. 0 to disable the limit. (default 2147483648)
-blocks-storage.bucket-store.max-concurrent int
Expand Down Expand Up @@ -1638,6 +1636,8 @@ Usage of ./cmd/mimir/mimir:
True to enable zone-awareness and replicate blocks across different availability zones. This option needs be set both on the store-gateway, querier and ruler when running in microservices mode.
-store-gateway.tenant-shard-size int
The tenant's shard size, used when store-gateway sharding is enabled. Value of 0 disables shuffle sharding for the tenant, that is all tenant blocks are sharded across all store-gateway replicas.
-store-gateway.thread-pool-size uint
[experimental] Number of OS threads that are dedicated for handling requests. Set to 0 to disable use of dedicated OS threads for handling requests.
-store.max-labels-query-length value
Limit the time range (end - start time) of series, label names and values queries. This limit is enforced in the querier. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.
-store.max-query-length value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3339,12 +3339,6 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.posting-offsets-in-mem-sampling
[postings_offsets_in_mem_sampling: <int> | default = 32]

# (experimental) Number of threads that are dedicated for use reading index
# headers. Set to 0 to disable use of dedicated threads for reading index
# headers.
# CLI flag: -blocks-storage.bucket-store.index-header-thread-pool-size
[index_header_thread_pool_size: <int> | default = 0]

tsdb:
# Directory to store TSDBs (including WAL) in the ingesters. This directory is
# required to be persisted between restarts.
Expand Down Expand Up @@ -3761,6 +3755,11 @@ sharding_ring:
# Unregister from the ring upon clean shutdown.
# CLI flag: -store-gateway.sharding-ring.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]

# (experimental) Number of OS threads that are dedicated for handling requests.
# Set to 0 to disable use of dedicated OS threads for handling requests.
# CLI flag: -store-gateway.thread-pool-size
[thread_pool_size: <int> | default = 0]
```

### sse
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ type BucketStoreConfig struct {
// On the contrary, smaller value will increase baseline memory usage, but improve latency slightly.
// 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.
PostingOffsetsInMemSampling int `yaml:"postings_offsets_in_mem_sampling" category:"advanced"`

// IndexHeaderThreadPoolSize controls the number of threads that are dedicated for use reading index headers.
IndexHeaderThreadPoolSize int `yaml:"index_header_thread_pool_size" category:"experimental"`
}

// RegisterFlags registers the BucketStore flags
Expand Down Expand Up @@ -318,7 +315,6 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", true, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 60*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", DefaultPartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.")
f.IntVar(&cfg.IndexHeaderThreadPoolSize, "blocks-storage.bucket-store.index-header-thread-pool-size", 0, "Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.")
}

// Validate the config.
Expand Down
10 changes: 0 additions & 10 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
mimir_indexheader "github.com/grafana/mimir/pkg/storegateway/indexheader"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -147,9 +146,6 @@ type BucketStore struct {
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

// Threadpool for performing operations that block the OS thread (mmap page faults)
threadPool *mimir_indexheader.Threadpool

// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

Expand Down Expand Up @@ -249,7 +245,6 @@ func NewBucketStore(
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioner Partitioner,
threadPool *mimir_indexheader.Threadpool,
blockSyncConcurrency int,
enableCompatibilityLabel bool,
postingOffsetsInMemSampling int,
Expand All @@ -274,7 +269,6 @@ func NewBucketStore(
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
partitioner: partitioner,
threadPool: threadPool,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
Expand Down Expand Up @@ -462,10 +456,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
return errors.Wrap(err, "create index header reader")
}

if s.threadPool != nil {
indexHeaderReader = mimir_indexheader.NewThreadedReader(s.threadPool, indexHeaderReader)
}

defer func() {
if err != nil {
runutil.CloseWithErrCapture(&err, indexHeaderReader, "index-header")
Expand Down
1 change: 0 additions & 1 deletion pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
chunksLimiterFactory,
seriesLimiterFactory,
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down
26 changes: 0 additions & 26 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -39,16 +38,13 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
mimir_indexheader "github.com/grafana/mimir/pkg/storegateway/indexheader"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)

// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
type BucketStores struct {
services.Service

logger log.Logger
cfg tsdb.BlocksStorageConfig
limits *validation.Overrides
Expand All @@ -73,9 +69,6 @@ type BucketStores struct {
// Gate used to limit query concurrency across all tenants.
queryGate gate.Gate

// Thread pool used for mmap operations that may page fault
threadPool *mimir_indexheader.Threadpool

// Keeps a bucket store for each tenant.
storesMu sync.RWMutex
stores map[string]*BucketStore
Expand Down Expand Up @@ -115,7 +108,6 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
threadPool: mimir_indexheader.NewThreadPool(cfg.BucketStore.IndexHeaderThreadPoolSize, reg),
seriesHashCache: hashcache.NewSeriesHashCache(cfg.BucketStore.SeriesHashCacheMaxBytes),
}

Expand Down Expand Up @@ -156,26 +148,9 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
reg.MustRegister(u.metaFetcherMetrics)
}

u.Service = services.NewIdleService(u.starting, u.stopping)
return u, nil
}

func (u *BucketStores) starting(_ context.Context) error {
if u.threadPool != nil {
u.threadPool.Start()
}

return nil
}

func (u *BucketStores) stopping(_ error) error {
if u.threadPool != nil {
u.threadPool.StopAndWait()
}

return nil
}

// InitialSync does an initial synchronization of blocks for all users.
func (u *BucketStores) InitialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
Expand Down Expand Up @@ -529,7 +504,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
newChunksLimiterFactory(u.limits, userID),
NewSeriesLimiterFactory(0), // No series limiter.
u.partitioner,
u.threadPool,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
u.cfg.BucketStore.PostingOffsetsInMemSampling,
Expand Down
17 changes: 0 additions & 17 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -77,8 +76,6 @@ func TestBucketStores_InitialSync(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Query series before the initial sync.
for userID, metricName := range userToMetric {
Expand Down Expand Up @@ -156,8 +153,6 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Initial sync should succeed even if a transient error occurs.
require.NoError(t, stores.InitialSync(ctx))
Expand Down Expand Up @@ -219,8 +214,6 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Run an initial sync to discover 1 block.
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
Expand Down Expand Up @@ -277,7 +270,6 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
test.VerifyNoLeak(t)

allUsers := []string{"user-1", "user-2", "user-3"}
ctx := context.Background()

tests := map[string]struct {
shardingStrategy ShardingStrategy
Expand Down Expand Up @@ -307,8 +299,6 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {

stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Sync user stores and count the number of times the callback is called.
var storesCount atomic.Int32
Expand Down Expand Up @@ -354,8 +344,6 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

require.NoError(t, stores.InitialSync(ctx))

Expand Down Expand Up @@ -443,9 +431,6 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -621,8 +606,6 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Perform sync.
sharding.users = []string{user1, user2}
Expand Down
6 changes: 0 additions & 6 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ func TestBucketStore_Info(t *testing.T) {
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -764,7 +763,6 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -1713,7 +1711,6 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
1,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -2079,7 +2076,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -2171,7 +2167,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -2356,7 +2351,6 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down
Loading

0 comments on commit 9032b67

Please sign in to comment.