Skip to content

Commit

Permalink
indexheader.Reader implementation that runs in a thread pool (#1660)
Browse files Browse the repository at this point in the history
* indexheader.Reader implementation that runs in a thread pool

This adds a new Thanos indexheader.Reader implementation that uses
a dedicated pool of OS threads (via `runtime.LockOSThread()`) to
ensure that mmap page faults (caused by reading index files from
disk using mmap) do not block a thread used for scheduling other
goroutines.

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>

* Code review fixes

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Apr 19, 2022
1 parent 176c1c9 commit d2ae62f
Show file tree
Hide file tree
Showing 17 changed files with 520 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Ruler: Add more detailed query information to ruler query stats logging. #1411
* [ENHANCEMENT] Admin: Admin API now has some styling. #1482 #1549
* [ENHANCEMENT] Alertmanager: added `insight=true` field to alertmanager dispatch logs. #1379
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run index header operations in a dedicated thread pool. This feature can be configured using `-blocks-storage.bucket-store.index-header-thread-pool-size` and is disabled by default. #1660
* [BUGFIX] Query-frontend: do not shard queries with a subquery unless the subquery is inside a shardable aggregation function call. #1542
* [BUGFIX] Query-frontend: added `component=query-frontend` label to results cache memcached metrics to fix a panic when Mimir is running in single binary mode and results cache is enabled. #1704
* [BUGFIX] Mimir: services' status content-type is now correctly set to `text/html`. #1575
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5107,6 +5107,17 @@
"fieldFlag": "blocks-storage.bucket-store.posting-offsets-in-mem-sampling",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "index_header_thread_pool_size",
"required": false,
"desc": "Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "blocks-storage.bucket-store.index-header-thread-pool-size",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ Usage of ./cmd/mimir/mimir:
If enabled, store-gateway will lazy load an index-header only once required by a query. (default true)
-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout duration
If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s)
-blocks-storage.bucket-store.index-header-thread-pool-size int
[experimental] Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.
-blocks-storage.bucket-store.max-chunk-pool-bytes uint
Max size - in bytes - of a chunks pool, used to reduce memory allocations. The pool is shared across all tenants. 0 to disable the limit. (default 2147483648)
-blocks-storage.bucket-store.max-concurrent int
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/operators-guide/configuring/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ The following features are currently experimental:
- `-query-frontend.querier-forget-delay`
- Query-scheduler
- `-query-scheduler.querier-forget-delay`
- Store-gateway
- `-blocks-storage.bucket-store.index-header-thread-pool-size`

## Deprecated features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3339,6 +3339,12 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.posting-offsets-in-mem-sampling
[postings_offsets_in_mem_sampling: <int> | default = 32]

# (experimental) Number of threads that are dedicated for use reading index
# headers. Set to 0 to disable use of dedicated threads for reading index
# headers.
# CLI flag: -blocks-storage.bucket-store.index-header-thread-pool-size
[index_header_thread_pool_size: <int> | default = 0]

tsdb:
# Directory to store TSDBs (including WAL) in the ingesters. This directory is
# required to be persisted between restarts.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ type BucketStoreConfig struct {
// On the contrary, smaller value will increase baseline memory usage, but improve latency slightly.
// 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.
PostingOffsetsInMemSampling int `yaml:"postings_offsets_in_mem_sampling" category:"advanced"`

// IndexHeaderThreadPoolSize controls the number of threads that are dedicated for use reading index headers.
IndexHeaderThreadPoolSize int `yaml:"index_header_thread_pool_size" category:"experimental"`
}

// RegisterFlags registers the BucketStore flags
Expand Down Expand Up @@ -313,6 +316,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", true, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 60*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", DefaultPartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.")
f.IntVar(&cfg.IndexHeaderThreadPoolSize, "blocks-storage.bucket-store.index-header-thread-pool-size", 0, "Number of threads that are dedicated for use reading index headers. Set to 0 to disable use of dedicated threads for reading index headers.")
}

// Validate the config.
Expand Down
11 changes: 11 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
mimir_indexheader "github.com/grafana/mimir/pkg/storegateway/indexheader"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -146,6 +147,9 @@ type BucketStore struct {
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

// Threadpool for performing operations that block the OS thread (mmap page faults)
threadPool *mimir_indexheader.Threadpool

// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

Expand Down Expand Up @@ -245,6 +249,7 @@ func NewBucketStore(
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioner Partitioner,
threadPool *mimir_indexheader.Threadpool,
blockSyncConcurrency int,
enableCompatibilityLabel bool,
postingOffsetsInMemSampling int,
Expand All @@ -269,6 +274,7 @@ func NewBucketStore(
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
partitioner: partitioner,
threadPool: threadPool,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
Expand Down Expand Up @@ -455,6 +461,11 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
if err != nil {
return errors.Wrap(err, "create index header reader")
}

if s.threadPool != nil {
indexHeaderReader = mimir_indexheader.NewThreadedReader(s.threadPool, indexHeaderReader)
}

defer func() {
if err != nil {
runutil.CloseWithErrCapture(&err, indexHeaderReader, "index-header")
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
chunksLimiterFactory,
seriesLimiterFactory,
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down
26 changes: 26 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -38,13 +39,16 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
mimir_indexheader "github.com/grafana/mimir/pkg/storegateway/indexheader"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)

// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
type BucketStores struct {
services.Service

logger log.Logger
cfg tsdb.BlocksStorageConfig
limits *validation.Overrides
Expand All @@ -69,6 +73,9 @@ type BucketStores struct {
// Gate used to limit query concurrency across all tenants.
queryGate gate.Gate

// Thread pool used for mmap operations that may page fault
threadPool *mimir_indexheader.Threadpool

// Keeps a bucket store for each tenant.
storesMu sync.RWMutex
stores map[string]*BucketStore
Expand Down Expand Up @@ -108,6 +115,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
threadPool: mimir_indexheader.NewThreadPool(cfg.BucketStore.IndexHeaderThreadPoolSize, reg),
seriesHashCache: hashcache.NewSeriesHashCache(cfg.BucketStore.SeriesHashCacheMaxBytes),
}

Expand Down Expand Up @@ -148,9 +156,26 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
reg.MustRegister(u.metaFetcherMetrics)
}

u.Service = services.NewIdleService(u.starting, u.stopping)
return u, nil
}

func (u *BucketStores) starting(_ context.Context) error {
if u.threadPool != nil {
u.threadPool.Start()
}

return nil
}

func (u *BucketStores) stopping(_ error) error {
if u.threadPool != nil {
u.threadPool.StopAndWait()
}

return nil
}

// InitialSync does an initial synchronization of blocks for all users.
func (u *BucketStores) InitialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
Expand Down Expand Up @@ -504,6 +529,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
newChunksLimiterFactory(u.limits, userID),
NewSeriesLimiterFactory(0), // No series limiter.
u.partitioner,
u.threadPool,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
u.cfg.BucketStore.PostingOffsetsInMemSampling,
Expand Down
18 changes: 18 additions & 0 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -76,6 +77,8 @@ func TestBucketStores_InitialSync(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Query series before the initial sync.
for userID, metricName := range userToMetric {
Expand Down Expand Up @@ -153,6 +156,8 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Initial sync should succeed even if a transient error occurs.
require.NoError(t, stores.InitialSync(ctx))
Expand Down Expand Up @@ -214,6 +219,8 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Run an initial sync to discover 1 block.
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
Expand Down Expand Up @@ -270,6 +277,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
test.VerifyNoLeak(t)

allUsers := []string{"user-1", "user-2", "user-3"}
ctx := context.Background()

tests := map[string]struct {
shardingStrategy ShardingStrategy
Expand Down Expand Up @@ -299,6 +307,8 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {

stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Sync user stores and count the number of times the callback is called.
var storesCount atomic.Int32
Expand Down Expand Up @@ -344,6 +354,9 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -430,6 +443,9 @@ func TestBucketStore_Series_ShouldQueryBlockWithOutOfOrderChunks(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, newNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

require.NoError(t, stores.InitialSync(ctx))

tests := map[string]struct {
Expand Down Expand Up @@ -605,6 +621,8 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, stores))
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, stores)) })

// Perform sync.
sharding.users = []string{user1, user2}
Expand Down
6 changes: 6 additions & 0 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func TestBucketStore_Info(t *testing.T) {
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -763,6 +764,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -1711,6 +1713,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
1,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -2076,6 +2079,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -2167,6 +2171,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down Expand Up @@ -2351,6 +2356,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
10,
false,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) {

// First of all we register the instance in the ring and wait
// until the lifecycler successfully started.
if g.subservices, err = services.NewManager(g.ringLifecycler, g.ring); err != nil {
if g.subservices, err = services.NewManager(g.ringLifecycler, g.ring, g.stores); err != nil {
return errors.Wrap(err, "unable to start store-gateway dependencies")
}

Expand Down
Loading

0 comments on commit d2ae62f

Please sign in to comment.