Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a dedicated threadpool for store-gateway requests #1812

Merged
merged 2 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Grafana Mimir

* [CHANGE] Increased default configuration for `-server.grpc-max-recv-msg-size-bytes` and `-server.grpc-max-send-msg-size-bytes` from 4MB to 100MB. #1883
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883

### Mixin
Expand Down
22 changes: 11 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5096,17 +5096,6 @@
"fieldFlag": "blocks-storage.bucket-store.posting-offsets-in-mem-sampling",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "index_header_thread_pool_size",
"required": false,
"desc": "Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "blocks-storage.bucket-store.index-header-thread-pool-size",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down Expand Up @@ -6410,6 +6399,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "thread_pool_size",
"required": false,
"desc": "Number of OS threads that are dedicated for handling requests. Set to 0 to disable use of dedicated OS threads for handling requests.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "store-gateway.thread-pool-size",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ Usage of ./cmd/mimir/mimir:
If enabled, store-gateway will lazy load an index-header only once required by a query. (default true)
-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout duration
If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s)
-blocks-storage.bucket-store.index-header-thread-pool-size int
[experimental] Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.
-blocks-storage.bucket-store.max-chunk-pool-bytes uint
Max size - in bytes - of a chunks pool, used to reduce memory allocations. The pool is shared across all tenants. 0 to disable the limit. (default 2147483648)
-blocks-storage.bucket-store.max-concurrent int
Expand Down Expand Up @@ -1656,6 +1654,8 @@ Usage of ./cmd/mimir/mimir:
True to enable zone-awareness and replicate blocks across different availability zones. This option needs be set both on the store-gateway, querier and ruler when running in microservices mode.
-store-gateway.tenant-shard-size int
The tenant's shard size, used when store-gateway sharding is enabled. Value of 0 disables shuffle sharding for the tenant, that is all tenant blocks are sharded across all store-gateway replicas.
-store-gateway.thread-pool-size uint
[experimental] Number of OS threads that are dedicated for handling requests. Set to 0 to disable use of dedicated OS threads for handling requests.
-store.max-labels-query-length value
Limit the time range (end - start time) of series, label names and values queries. This limit is enforced in the querier. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.
-store.max-query-length value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3373,12 +3373,6 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.posting-offsets-in-mem-sampling
[postings_offsets_in_mem_sampling: <int> | default = 32]

# (experimental) Number of threads that are dedicated for use reading index
# headers. Set to 0 to disable use of dedicated threads for reading index
# headers.
# CLI flag: -blocks-storage.bucket-store.index-header-thread-pool-size
[index_header_thread_pool_size: <int> | default = 0]

tsdb:
# Directory to store TSDBs (including WAL) in the ingesters. This directory is
# required to be persisted between restarts.
Expand Down Expand Up @@ -3795,6 +3789,11 @@ sharding_ring:
# Unregister from the ring upon clean shutdown.
# CLI flag: -store-gateway.sharding-ring.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]

# (experimental) Number of OS threads that are dedicated for handling requests.
# Set to 0 to disable use of dedicated OS threads for handling requests.
# CLI flag: -store-gateway.thread-pool-size
[thread_pool_size: <int> | default = 0]
```

### sse
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ type BucketStoreConfig struct {
// On the contrary, smaller value will increase baseline memory usage, but improve latency slightly.
// 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.
PostingOffsetsInMemSampling int `yaml:"postings_offsets_in_mem_sampling" category:"advanced"`

// IndexHeaderThreadPoolSize controls the number of threads that are dedicated for use reading index headers.
IndexHeaderThreadPoolSize int `yaml:"index_header_thread_pool_size" category:"experimental"`
}

// RegisterFlags registers the BucketStore flags
Expand Down Expand Up @@ -318,7 +315,6 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", true, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 60*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", DefaultPartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.")
f.IntVar(&cfg.IndexHeaderThreadPoolSize, "blocks-storage.bucket-store.index-header-thread-pool-size", 0, "Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.")
}

// Validate the config.
Expand Down
10 changes: 0 additions & 10 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
mimir_indexheader "github.com/grafana/mimir/pkg/storegateway/indexheader"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -125,9 +124,6 @@ type BucketStore struct {
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

// Threadpool for performing operations that block the OS thread (mmap page faults)
threadPool *mimir_indexheader.Threadpool

// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

Expand Down Expand Up @@ -220,7 +216,6 @@ func NewBucketStore(
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioner Partitioner,
threadPool *mimir_indexheader.Threadpool,
blockSyncConcurrency int,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
Expand All @@ -244,7 +239,6 @@ func NewBucketStore(
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
partitioner: partitioner,
threadPool: threadPool,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
seriesHashCache: seriesHashCache,
Expand Down Expand Up @@ -414,10 +408,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
return errors.Wrap(err, "create index header reader")
}

if s.threadPool != nil {
indexHeaderReader = mimir_indexheader.NewThreadedReader(s.threadPool, indexHeaderReader)
}

defer func() {
if err != nil {
runutil.CloseWithErrCapture(&err, indexHeaderReader, "index-header")
Expand Down
1 change: 0 additions & 1 deletion pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
chunksLimiterFactory,
seriesLimiterFactory,
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
true,
Expand Down
26 changes: 0 additions & 26 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -38,16 +37,13 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
mimir_indexheader "github.com/grafana/mimir/pkg/storegateway/indexheader"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)

// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
type BucketStores struct {
services.Service

logger log.Logger
cfg tsdb.BlocksStorageConfig
limits *validation.Overrides
Expand All @@ -73,9 +69,6 @@ type BucketStores struct {
// Gate used to limit query concurrency across all tenants.
queryGate gate.Gate

// Thread pool used for mmap operations that may page fault
threadPool *mimir_indexheader.Threadpool

// Keeps a bucket store for each tenant.
storesMu sync.RWMutex
stores map[string]*BucketStore
Expand Down Expand Up @@ -115,7 +108,6 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
threadPool: mimir_indexheader.NewThreadPool(cfg.BucketStore.IndexHeaderThreadPoolSize, reg),
seriesHashCache: hashcache.NewSeriesHashCache(cfg.BucketStore.SeriesHashCacheMaxBytes),
syncBackoffConfig: backoff.Config{
MinBackoff: 1 * time.Second,
Expand Down Expand Up @@ -161,26 +153,9 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
reg.MustRegister(u.metaFetcherMetrics)
}

u.Service = services.NewIdleService(u.starting, u.stopping)
return u, nil
}

func (u *BucketStores) starting(_ context.Context) error {
if u.threadPool != nil {
u.threadPool.Start()
}

return nil
}

func (u *BucketStores) stopping(_ error) error {
if u.threadPool != nil {
u.threadPool.StopAndWait()
}

return nil
}

// InitialSync does an initial synchronization of blocks for all users.
func (u *BucketStores) InitialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
Expand Down Expand Up @@ -508,7 +483,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
newChunksLimiterFactory(u.limits, userID),
NewSeriesLimiterFactory(0), // No series limiter.
u.partitioner,
u.threadPool,
u.cfg.BucketStore.BlockSyncConcurrency,
u.cfg.BucketStore.PostingOffsetsInMemSampling,
true, // Enable series hints.
Expand Down
17 changes: 0 additions & 17 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -77,8 +76,6 @@ func TestBucketStores_InitialSync(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Query series before the initial sync.
for userID, metricName := range userToMetric {
Expand Down Expand Up @@ -156,8 +153,6 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Initial sync should succeed even if a transient error occurs.
require.NoError(t, stores.InitialSync(ctx))
Expand Down Expand Up @@ -219,8 +214,6 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Run an initial sync to discover 1 block.
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
Expand Down Expand Up @@ -277,7 +270,6 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
test.VerifyNoLeak(t)

allUsers := []string{"user-1", "user-2", "user-3"}
ctx := context.Background()

tests := map[string]struct {
shardingStrategy ShardingStrategy
Expand Down Expand Up @@ -307,8 +299,6 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {

stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Sync user stores and count the number of times the callback is called.
var storesCount atomic.Int32
Expand Down Expand Up @@ -354,8 +344,6 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

require.NoError(t, stores.InitialSync(ctx))

Expand Down Expand Up @@ -443,9 +431,6 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -621,8 +606,6 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Perform sync.
sharding.users = []string{user1, user2}
Expand Down
4 changes: 0 additions & 4 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,6 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
false,
Expand Down Expand Up @@ -1722,7 +1721,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
true,
Expand Down Expand Up @@ -1813,7 +1811,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
true,
Expand Down Expand Up @@ -1997,7 +1994,6 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
true,
Expand Down
Loading