Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable streaming chunks from store-gateways to queriers by default #6646

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [CHANGE] Added new metric `cortex_compactor_disk_out_of_space_errors_total` which counts how many times a compaction failed due to the compactor being out of disk. #8237
* [CHANGE] Anonymous usage statistics tracking: report active series in addition to in-memory series. #8279
* [CHANGE] Ruler: `evaluation_delay` field in the rule group configuration has been deprecated. Please use `query_offset` instead (it has the same exact meaning and behaviour). #8295
* [CHANGE] Store-gateway / querier: enable streaming chunks from store-gateways to queriers by default. #6646
* [CHANGE] General: remove `-log.buffered`. The configuration option has been enabled by default and deprecated since Mimir 2.11.
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1860,7 +1860,7 @@
"required": false,
"desc": "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldDefaultValue": true,
"fieldFlag": "querier.prefer-streaming-chunks-from-store-gateways",
"fieldType": "boolean",
"fieldCategory": "experimental"
Expand All @@ -1885,7 +1885,7 @@
"fieldDefaultValue": 256,
"fieldFlag": "querier.streaming-chunks-per-store-gateway-buffer-size",
"fieldType": "int",
"fieldCategory": "experimental"
"fieldCategory": "advanced"
},
{
"kind": "field",
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1794,7 +1794,7 @@ Usage of ./cmd/mimir/mimir:
-querier.minimize-ingester-requests-hedging-delay duration
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this. (default true)
-querier.promql-engine string
[experimental] PromQL engine to use, either 'prometheus' or 'mimir' (default "prometheus")
-querier.promql-experimental-functions-enabled
Expand Down Expand Up @@ -1872,7 +1872,7 @@ Usage of ./cmd/mimir/mimir:
-querier.streaming-chunks-per-ingester-buffer-size uint
Number of series to buffer per ingester when streaming chunks from ingesters. (default 256)
-querier.streaming-chunks-per-store-gateway-buffer-size uint
[experimental] Number of series to buffer per store-gateway when streaming chunks from store-gateways. (default 256)
Number of series to buffer per store-gateway when streaming chunks from store-gateways. (default 256)
-querier.timeout duration
The timeout for a query. This config option should be set on query-frontend too when query sharding is enabled. This also applies to queries evaluated by the ruler (internally or remotely). (default 2m0s)
-query-frontend.active-series-write-timeout duration
Expand Down
4 changes: 3 additions & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ The following features are currently experimental:
- `-ingester.client.circuit-breaker.cooldown-period`
- Querier
- Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`)
- Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`, `-querier.streaming-chunks-per-store-gateway-buffer-size`)
- Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`)
- Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`)
- Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`)
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
Expand Down Expand Up @@ -214,6 +214,8 @@ The following features or configuration parameters are currently deprecated and
- `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`
- Mimirtool
- the flag `--rule-files`
- Querier
- the flag `-querier.prefer-streaming-chunks-from-store-gateways`

The following features or configuration parameters are currently deprecated and will be **removed in a future release (to be announced)**:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,15 +1389,15 @@ store_gateway_client:
# respond with a stream of chunks if the target store-gateway supports this, and
# this preference will be ignored by store-gateways that do not support this.
# CLI flag: -querier.prefer-streaming-chunks-from-store-gateways
[prefer_streaming_chunks_from_store_gateways: <boolean> | default = false]
[prefer_streaming_chunks_from_store_gateways: <boolean> | default = true]

# (advanced) Number of series to buffer per ingester when streaming chunks from
# ingesters.
# CLI flag: -querier.streaming-chunks-per-ingester-buffer-size
[streaming_chunks_per_ingester_series_buffer_size: <int> | default = 256]

# (experimental) Number of series to buffer per store-gateway when streaming
# chunks from store-gateways.
# (advanced) Number of series to buffer per store-gateway when streaming chunks
# from store-gateways.
# CLI flag: -querier.streaming-chunks-per-store-gateway-buffer-size
[streaming_chunks_per_store_gateway_series_buffer_size: <int> | default = 256]

Expand Down
34 changes: 18 additions & 16 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/grafana/e2e"
e2ecache "github.com/grafana/e2e/cache"
e2edb "github.com/grafana/e2e/db"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -446,6 +445,8 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.NoError(t, cluster.WaitSumMetrics(e2e.GreaterOrEqual(float64(shippedBlocks*seriesReplicationFactor)), "cortex_bucket_store_blocks_loaded"))

var expectedCacheRequests int
var expectedCacheHits int
var expectedMemcachedOps int

// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
// For every series in a block we expect
Expand All @@ -459,51 +460,53 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor * 3 // expanded postings, postings, series
expectedCacheRequests += seriesReplicationFactor * 3 // expanded postings, postings, series
expectedMemcachedOps += (seriesReplicationFactor * 3) + (seriesReplicationFactor * 3) // Same reasoning as for expectedCacheRequests, but this also includes a set for each get that is not a hit

result, err = c.Query(series2Name, series2Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector2, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor*3 + seriesReplicationFactor // expanded postings, postings, series for 1 time range; only expaned postings for another
expectedCacheRequests += seriesReplicationFactor*3 + seriesReplicationFactor // expanded postings, postings, series for 1 time range; only expanded postings for another
expectedMemcachedOps += 2 * (seriesReplicationFactor*3 + seriesReplicationFactor) // Same reasoning as for expectedCacheRequests, but this also includes a set for each get that is not a hit

result, err = c.Query(series3Name, series3Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector3, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor * 2 // expanded postings for 2 time ranges

expectedMemcachedOps := 2 * expectedCacheRequests // Same reasoning as for expectedCacheRequests, but this also includes a set for each get
expectedCacheRequests += seriesReplicationFactor * 2 // expanded postings for 2 time ranges
expectedMemcachedOps += seriesReplicationFactor*2 + seriesReplicationFactor*2 // Same reasoning as for expectedCacheRequests, but this also includes a set for each get that is not a hit

// Check the in-memory index cache metrics (in the store-gateway).
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"), "expected %v requests", expectedCacheRequests)
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheHits)), "thanos_store_index_cache_hits_total"), "expected %v hits", expectedCacheHits)
if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"), "expected %v operations", expectedMemcachedOps)
}

// Query back again the 1st series from storage. This time it should use the index cache.
// It should get a hit on expanded postings; this means that it will not request individual postings for matchers.
// It should get a hit on series.
// We expect 3 cache requests and 3 cache hits.
// We expect 2 cache requests and 2 cache hits.
result, err = c.Query(series1Name, series1Timestamp)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector1, result.(model.Vector))
expectedCacheRequests += seriesReplicationFactor * 2 // expanded postings and series
expectedMemcachedOps += seriesReplicationFactor * 2 // there is no set after the gets this time
expectedCacheHits += seriesReplicationFactor * 2
expectedMemcachedOps += seriesReplicationFactor * 2 // there is no set after the gets this time

require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheRequests)), "thanos_store_index_cache_requests_total"), "expected %v requests", expectedCacheRequests)
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedCacheHits)), "thanos_store_index_cache_hits_total"), "expected %v hits", expectedCacheHits) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((2*2+2+3)*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"))
require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(expectedMemcachedOps)), "thanos_cache_operations_total"), "expected %v operations", expectedMemcachedOps)
}

// Query metadata.
Expand Down Expand Up @@ -881,8 +884,7 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) {
// missing from the storage.
_, err = c.Query(series1Name, series1Timestamp)
require.Error(t, err)
assert.Contains(t, err.Error(), "500")
assert.Contains(t, err.(*promv1.Error).Detail, "failed to fetch some blocks")
assert.Contains(t, err.Error(), "get range reader: The specified key does not exist")

// We expect this to still be queryable as it was not in the cleared storage
_, err = c.Query(series2Name, series2Timestamp)
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type Config struct {

ShuffleShardingIngestersEnabled bool `yaml:"shuffle_sharding_ingesters_enabled" category:"advanced"`

PreferStreamingChunksFromStoreGateways bool `yaml:"prefer_streaming_chunks_from_store_gateways" category:"experimental"`
PreferStreamingChunksFromStoreGateways bool `yaml:"prefer_streaming_chunks_from_store_gateways" category:"experimental"` // Enabled by default as of Mimir 2.13, remove altogether in 2.14.
PreferAvailabilityZone string `yaml:"prefer_availability_zone" category:"experimental" doc:"hidden"`
StreamingChunksPerIngesterSeriesBufferSize uint64 `yaml:"streaming_chunks_per_ingester_series_buffer_size" category:"advanced"`
StreamingChunksPerStoreGatewaySeriesBufferSize uint64 `yaml:"streaming_chunks_per_store_gateway_series_buffer_size" category:"experimental"`
StreamingChunksPerStoreGatewaySeriesBufferSize uint64 `yaml:"streaming_chunks_per_store_gateway_series_buffer_size" category:"advanced"`
MinimizeIngesterRequests bool `yaml:"minimize_ingester_requests" category:"advanced"`
MinimiseIngesterRequestsHedgingDelay time.Duration `yaml:"minimize_ingester_requests_hedging_delay" category:"advanced"`

Expand All @@ -77,7 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.QueryStoreAfter, queryStoreAfterFlag, 12*time.Hour, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
f.BoolVar(&cfg.ShuffleShardingIngestersEnabled, "querier.shuffle-sharding-ingesters-enabled", true, fmt.Sprintf("Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -%s. If this setting is false or -%s is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).", validation.QueryIngestersWithinFlag, validation.QueryIngestersWithinFlag))
f.BoolVar(&cfg.PreferStreamingChunksFromStoreGateways, "querier.prefer-streaming-chunks-from-store-gateways", false, "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.")
f.BoolVar(&cfg.PreferStreamingChunksFromStoreGateways, "querier.prefer-streaming-chunks-from-store-gateways", true, "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.")
f.StringVar(&cfg.PreferAvailabilityZone, "querier.prefer-availability-zone", "", "Preferred availability zone to query ingesters from when using the ingest storage.")

const minimiseIngesterRequestsFlagName = "querier.minimize-ingester-requests"
Expand Down
Loading