Skip to content

Commit

Permalink
Get rid of -querier.prefer-streaming-chunks-from-ingesters (#7639)
Browse files Browse the repository at this point in the history
* Get rid of querier.prefer-streaming-chunks-from-ingesters

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing a failing test

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing failing tests

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Make lint happy

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

---------

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Mar 19, 2024
1 parent eaef377 commit 8ed42e1
Show file tree
Hide file tree
Showing 15 changed files with 326 additions and 339 deletions.
11 changes: 0 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1810,17 +1810,6 @@
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "prefer_streaming_chunks_from_ingesters",
"required": false,
"desc": "Request ingesters stream chunks. Ingesters will only respond with a stream of chunks if the target ingester supports this, and this preference will be ignored by ingesters that do not support this.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.prefer-streaming-chunks-from-ingesters",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "prefer_streaming_chunks_from_store_gateways",
Expand Down
2 changes: 0 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1717,8 +1717,6 @@ Usage of ./cmd/mimir/mimir:
If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true)
-querier.minimize-ingester-requests-hedging-delay duration
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-ingesters
[experimental] Request ingesters stream chunks. Ingesters will only respond with a stream of chunks if the target ingester supports this, and this preference will be ignored by ingesters that do not support this. (default true)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.
-querier.promql-experimental-functions-enabled
Expand Down
1 change: 0 additions & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ The following features are currently experimental:
- `-ingester.client.circuit-breaker.cooldown-period`
- Querier
- Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`)
- Streaming chunks from ingester to querier (`-querier.prefer-streaming-chunks-from-ingesters`)
- Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`, `-querier.streaming-chunks-per-store-gateway-buffer-size`)
- Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`)
- Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,12 +1291,6 @@ store_gateway_client:
# CLI flag: -querier.shuffle-sharding-ingesters-enabled
[shuffle_sharding_ingesters_enabled: <boolean> | default = true]

# (experimental) Request ingesters stream chunks. Ingesters will only respond
# with a stream of chunks if the target ingester supports this, and this
# preference will be ignored by ingesters that do not support this.
# CLI flag: -querier.prefer-streaming-chunks-from-ingesters
[prefer_streaming_chunks_from_ingesters: <boolean> | default = true]

# (experimental) Request store-gateways stream chunks. Store-gateways will only
# respond with a stream of chunks if the target store-gateway supports this, and
# this preference will be ignored by store-gateways that do not support this.
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/release-notes/v2.12.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ The default value of the following CLI flags have been changed:
The following deprecated configuration options are removed in Grafana Mimir 2.12:

- The YAML setting `frontend.cache_unaligned_requests`.
- Experimental CLI flag `-querier.prefer-streaming-chunks-from-ingesters`.

The following configuration options are deprecated and will be removed in Grafana Mimir 2.14:

Expand Down
277 changes: 133 additions & 144 deletions integration/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package integration

import (
"fmt"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -494,114 +493,13 @@ func TestIngesterQuerying(t *testing.T) {

for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled),
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

res, err := client.Push(tc.inSeries)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

result, err := client.QueryRange(query, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedQueryResult, result)

// The PromQL engine does some special handling for the timestamp() function which previously
// caused queries to fail when streaming chunks was enabled, so check that this regression
// has not been reintroduced.
result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedTimestampQueryResult, result)

queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", status),
),
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

successfulQueryRequests, err := queryRequestCount("OK")
require.NoError(t, err)

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)

totalQueryRequests, err := queryRequestCount(".*")
require.NoError(t, err)

// We expect two query requests: the first query request and the timestamp query request
require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
})
}
})
}
}

func TestIngesterQueryingWithRequestMinimization(t *testing.T) {
for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-ingester.ring.zone-awareness-enabled": "true",
"-ingester.ring.replication-factor": "3",
"-querier.minimize-ingester-requests": "true",
"-querier.prefer-streaming-chunks-from-ingesters": strconv.FormatBool(streamingEnabled),
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
}

flags := mergeFlags(
Expand All @@ -615,68 +513,159 @@ func TestIngesterQueryingWithRequestMinimization(t *testing.T) {
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

ingesterFlags := func(zone string) map[string]string {
return mergeFlags(flags, map[string]string{
"-ingester.ring.instance-availability-zone": zone,
})
}

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-a"))
ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-b"))
ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-c"))
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor and querier have updated the ring.
for _, component := range []*e2emimir.MimirService{distributor, querier} {
require.NoError(t, component.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
}
// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push some data to the cluster.
seriesName := "test_series"
now := time.Now()
series, expectedVector, _ := generateFloatSeries(seriesName, now, prompb.Label{Name: "foo", Value: "bar"})

res, err := client.Push(series)
res, err := client.Push(tc.inSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
require.Equal(t, http.StatusOK, res.StatusCode)

// Verify we can query the data we just pushed.
queryResult, err := client.Query(seriesName, now)
result, err := client.QueryRange(query, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, model.ValVector, queryResult.Type())
require.Equal(t, expectedVector, queryResult.(model.Vector))
require.Equal(t, tc.expectedQueryResult, result)

// Check that we only queried two of the three ingesters.
totalQueryRequests := 0.0
// The PromQL engine does some special handling for the timestamp() function which previously
// caused queries to fail when streaming chunks was enabled, so check that this regression
// has not been reintroduced.
result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedTimestampQueryResult, result)

for _, ingester := range []*e2emimir.MimirService{ingester1, ingester2, ingester3} {
sums, err := ingester.SumMetrics(
[]string{"cortex_request_duration_seconds"},
queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchEqual, "status_code", "OK"),
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", status),
),
e2e.SkipMissingMetrics,
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

require.NoError(t, err)
queryRequests := sums[0]
require.LessOrEqual(t, queryRequests, 1.0)
totalQueryRequests += queryRequests
if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

require.Equal(t, 2.0, totalQueryRequests)
successfulQueryRequests, err := queryRequestCount("OK")
require.NoError(t, err)

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)

totalQueryRequests, err := queryRequestCount(".*")
require.NoError(t, err)

// We expect two query requests: the first query request and the timestamp query request
require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
})
}
}

func TestIngesterQueryingWithRequestMinimization(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-ingester.ring.zone-awareness-enabled": "true",
"-ingester.ring.replication-factor": "3",
"-querier.minimize-ingester-requests": "true",
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

ingesterFlags := func(zone string) map[string]string {
return mergeFlags(flags, map[string]string{
"-ingester.ring.instance-availability-zone": zone,
})
}

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-a"))
ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-b"))
ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), ingesterFlags("zone-c"))
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))

// Wait until distributor and querier have updated the ring.
for _, component := range []*e2emimir.MimirService{distributor, querier} {
require.NoError(t, component.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
}

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push some data to the cluster.
seriesName := "test_series"
now := time.Now()
series, expectedVector, _ := generateFloatSeries(seriesName, now, prompb.Label{Name: "foo", Value: "bar"})

res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Verify we can query the data we just pushed.
queryResult, err := client.Query(seriesName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, queryResult.Type())
require.Equal(t, expectedVector, queryResult.(model.Vector))

// Check that we only queried two of the three ingesters.
totalQueryRequests := 0.0

for _, ingester := range []*e2emimir.MimirService{ingester1, ingester2, ingester3} {
sums, err := ingester.SumMetrics(
[]string{"cortex_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchEqual, "status_code", "OK"),
),
e2e.SkipMissingMetrics,
e2e.WithMetricCount,
)

require.NoError(t, err)
queryRequests := sums[0]
require.LessOrEqual(t, queryRequests, 1.0)
totalQueryRequests += queryRequests
}

require.Equal(t, 2.0, totalQueryRequests)
}

func TestIngesterReportGRPCStatusCodes(t *testing.T) {
Expand Down
Loading

0 comments on commit 8ed42e1

Please sign in to comment.