Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update thanos to latest main and add flags for chunk and series size #5401

Merged
merged 11 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [ENHANCEMENT] Do not resync blocks in running store gateways during rollout deployment and container restart. #5363
* [ENHANCEMENT] Store Gateway: Add new metrics `cortex_bucket_store_sent_chunk_size_bytes`, `cortex_bucket_store_postings_size_bytes` and `cortex_bucket_store_empty_postings_total`. #5397
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea
github.com/thanos-io/thanos v0.31.1-0.20230607122802-662211055334
github.com/thanos-io/thanos v0.31.1-0.20230616082957-d43026952989
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
go.etcd.io/etcd/api/v3 v3.5.8
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1163,8 +1163,8 @@ github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204 h1:W4w5Iph7j32S
github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204/go.mod h1:STSgpY8M6EKF2G/raUFdbIMf2U9GgYlEjAEHJxjvpAo=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18=
github.com/thanos-io/thanos v0.31.1-0.20230607122802-662211055334 h1:1pqel0J04gQRJpl3P3JX+zt6PbbTOfbUPdSww6jK8ws=
github.com/thanos-io/thanos v0.31.1-0.20230607122802-662211055334/go.mod h1:lHSiSsXIQuAv5u+6yu0LLw6cS/MC8vUQswQ6rkdxB7c=
github.com/thanos-io/thanos v0.31.1-0.20230616082957-d43026952989 h1:5prEq1YagZAt5Ah3HE876r3fhNhUhVh8JPuZLh/lJBI=
github.com/thanos-io/thanos v0.31.1-0.20230616082957-d43026952989/go.mod h1:jscDD4ecQW4A+6fpKgXLqOWOrtiTjcAEnOebEwAjXAM=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
98 changes: 49 additions & 49 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
assert.Equal(t, expectedVector3, result.(model.Vector))

// Check the in-memory index cache metrics (in the store-gateway).
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(7), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(5+5+2), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2), "thanos_store_index_cache_items")) // 2 series both for postings and series cache
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2), "thanos_store_index_cache_items_added_total")) // 2 series both for postings and series cache
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total"))
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(11), "thanos_memcached_operations_total")) // 7 gets + 4 sets
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(21), "thanos_memcached_operations_total")) // 14 gets + 7 sets
}

// Query back again the 1st series from storage. This time it should use the index cache.
Expand All @@ -257,14 +257,14 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))

require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(7+2), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(12+2), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2), "thanos_store_index_cache_items")) // as before
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2), "thanos_store_index_cache_items_added_total")) // as before
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(11+2), "thanos_memcached_operations_total")) // as before + 2 gets
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23), "thanos_memcached_operations_total")) // as before + 2 gets
}

// Query metadata.
Expand Down Expand Up @@ -298,38 +298,38 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendInMemory,
},
"blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": {
blocksShardingEnabled: false,
ingesterStreamingEnabled: false,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
},
"blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache": {
blocksShardingEnabled: true,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
},
"blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache, bucket index enabled": {
blocksShardingEnabled: true,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
bucketIndexEnabled: true,
},
"blocks sharding disabled, ingester gRPC streaming disabled, redis index cache": {
blocksShardingEnabled: false,
ingesterStreamingEnabled: false,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
},
"blocks sharding enabled, ingester gRPC streaming enabled, redis index cache": {
blocksShardingEnabled: true,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
},
"blocks sharding enabled, ingester gRPC streaming enabled, redis index cache, bucket index enabled": {
blocksShardingEnabled: true,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
bucketIndexEnabled: true,
},
//"blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": {
// blocksShardingEnabled: false,
// ingesterStreamingEnabled: false,
// indexCacheBackend: tsdb.IndexCacheBackendMemcached,
//},
//"blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache": {
// blocksShardingEnabled: true,
// ingesterStreamingEnabled: true,
// indexCacheBackend: tsdb.IndexCacheBackendMemcached,
//},
//"blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache, bucket index enabled": {
// blocksShardingEnabled: true,
// ingesterStreamingEnabled: true,
// indexCacheBackend: tsdb.IndexCacheBackendMemcached,
// bucketIndexEnabled: true,
//},
//"blocks sharding disabled, ingester gRPC streaming disabled, redis index cache": {
// blocksShardingEnabled: false,
// ingesterStreamingEnabled: false,
// indexCacheBackend: tsdb.IndexCacheBackendRedis,
//},
//"blocks sharding enabled, ingester gRPC streaming enabled, redis index cache": {
// blocksShardingEnabled: true,
// ingesterStreamingEnabled: true,
// indexCacheBackend: tsdb.IndexCacheBackendRedis,
//},
//"blocks sharding enabled, ingester gRPC streaming enabled, redis index cache, bucket index enabled": {
// blocksShardingEnabled: true,
// ingesterStreamingEnabled: true,
// indexCacheBackend: tsdb.IndexCacheBackendRedis,
// bucketIndexEnabled: true,
//},
}

for testName, testCfg := range tests {
Expand Down Expand Up @@ -475,14 +475,14 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
assert.Equal(t, expectedVector3, result.(model.Vector))

// Check the in-memory index cache metrics (in the store-gateway).
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(7*seriesReplicationFactor)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((5+5+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) // 5 for expanded postings and postings, 2 for series
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*2*seriesReplicationFactor)), "thanos_store_index_cache_items")) // 2 series both for postings and series cache
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*2*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // 2 series both for postings and series cache
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total"))
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(11*seriesReplicationFactor)), "thanos_memcached_operations_total")) // 7 gets + 4 sets
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(21*seriesReplicationFactor)), "thanos_memcached_operations_total")) // 14 gets + 7 sets
}

// Query back again the 1st series from storage. This time it should use the index cache.
Expand All @@ -491,14 +491,14 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))

require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((7+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((12+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*2*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*2*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((11+2)*seriesReplicationFactor)), "thanos_memcached_operations_total")) // as before + 2 gets
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((21+2)*seriesReplicationFactor)), "thanos_memcached_operations_total")) // as before + 2 gets
}

// Query metadata.
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ type BucketStoreConfig struct {
// The config option is hidden until experimental.
PartitionerMaxGapBytes uint64 `yaml:"partitioner_max_gap_bytes" doc:"hidden"`

// Controls the estimated size to fetch for series and chunk in Store Gateway. Using
// a large value might cause data overfetch while a small value might need to refetch.
EstimatedMaxSeriesSizeBytes uint64 `yaml:"estimated_max_series_size_bytes" doc:"hidden"`
EstimatedMaxChunkSizeBytes uint64 `yaml:"estimated_max_chunk_size_bytes" doc:"hidden"`

// Controls what is the ratio of postings offsets store will hold in memory.
// Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings.
// It's meant for setups that want low baseline memory pressure and where less traffic is expected.
Expand Down Expand Up @@ -298,6 +303,8 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazily memory-map an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will release memory-mapped index-headers after 'idle timeout' inactivity.")
f.Uint64Var(&cfg.PartitionerMaxGapBytes, "blocks-storage.bucket-store.partitioner-max-gap-bytes", store.PartitionerMaxGapSize, "Max size - in bytes - of a gap for which the partitioner aggregates together two bucket GET object requests.")
f.Uint64Var(&cfg.EstimatedMaxSeriesSizeBytes, "blocks-storage.bucket-store.estimated-max-series-size-bytes", store.EstimatedMaxSeriesSize, "Estimated max series size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 64KB.")
f.Uint64Var(&cfg.EstimatedMaxChunkSizeBytes, "blocks-storage.bucket-store.estimated-max-chunk-size-bytes", store.EstimatedMaxChunkSize, "Estimated max chunk size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.")
}

// Validate the config.
Expand Down
6 changes: 6 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
store.WithQueryGate(u.queryGate),
store.WithChunkPool(u.chunksPool),
store.WithSeriesBatchSize(store.SeriesBatchSize),
store.WithBlockEstimatedMaxChunkFunc(func(_ thanos_metadata.Meta) uint64 {
return u.cfg.BucketStore.EstimatedMaxChunkSizeBytes
}),
store.WithBlockEstimatedMaxSeriesFunc(func(_ thanos_metadata.Meta) uint64 {
return u.cfg.BucketStore.EstimatedMaxSeriesSizeBytes
}),
}
if u.logLevel.String() == "debug" {
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
Expand Down
30 changes: 29 additions & 1 deletion vendor/github.com/thanos-io/thanos/pkg/block/index.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading