Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max query-scheduler instances support #3005

Merged
merged 10 commits into from
Sep 22, 2022
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [CHANGE] Anonymous usage statistics tracking: added the minimum and maximum value of `-ingester.out-of-order-time-window`. #2940
* [FEATURE] Query-scheduler: added an experimental ring-based service discovery support for the query-scheduler. Refer to [query-scheduler configuration](https://grafana.com/docs/mimir/next/operators-guide/architecture/components/query-scheduler/#configuration) for more information. #2957
* [FEATURE] Introduced `/api/v1/user_limits` endpoint exposed by all components that load runtime configuration. This endpoint exposes realtime limits for the authenticated tenant, in JSON format. #2864
* [FEATURE] Query-scheduler: added the experimental configuration option `-query-scheduler.max-used-instances` to restrict the number of query-schedulers effectively used regardless how many replicas are running. This feature can be useful when using the experimental read-write deployment mode. #3005
* [ENHANCEMENT] Distributor: Add `cortex_distributor_query_ingester_chunks_deduped_total` and `cortex_distributor_query_ingester_chunks_total` metrics for determining how effective ingester chunk deduplication at query time is. #2713
* [ENHANCEMENT] Go: updated to go 1.19.1. #2637
* [ENHANCEMENT] Runtime config: don't unmarshal runtime configuration files if they haven't changed. This can save a bit of CPU and memory on every component using runtime config. #2954
Expand All @@ -20,6 +21,7 @@
* [ENHANCEMENT] Distributor: added support forwarding push requests via gRPC, using `httpgrpc` messages from weaveworks/common library. #2996
* [BUGFIX] Querier: Fix 400 response while handling streaming remote read. #2963
* [BUGFIX] Fix a bug causing query-frontend, query-scheduler, and querier not failing if one of their internal components fail. #2978
* [BUGFIX] Querier: re-balance the querier worker connections when a query-frontend or query-scheduler is terminated. #3005

### Mixin

Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -10797,6 +10797,17 @@
],
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "max_used_instances",
"required": false,
"desc": "The maximum number of query-scheduler instances to use, regardless how many replicas are running. This option can be set only when -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all available query-scheduler instances.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "query-scheduler.max-used-instances",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,8 @@ Usage of ./cmd/mimir/mimir:
Override the expected name on the server certificate.
-query-scheduler.max-outstanding-requests-per-tenant int
Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100)
-query-scheduler.max-used-instances int
[experimental] The maximum number of query-scheduler instances to use, regardless how many replicas are running. This option can be set only when -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all available query-scheduler instances.
-query-scheduler.querier-forget-delay duration
[experimental] If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.
-query-scheduler.ring.consul.acl-token string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The following features are currently experimental:
- Query-scheduler
- `-query-scheduler.querier-forget-delay`
- Ring-based service discovery (`-query-scheduler.service-discovery-mode` and `-query-scheduler.ring.*`)
- Max number of used instances (`-query-scheduler.max-used-instances`)
- Store-gateway
- `-blocks-storage.bucket-store.index-header-thread-pool-size`
- Blocks Storage, Alertmanager, and Ruler support for partitioning access to the same storage bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,13 @@ ring:
# (advanced) IP address to advertise in the ring. Default is auto-detected.
# CLI flag: -query-scheduler.ring.instance-addr
[instance_addr: <string> | default = ""]

# (experimental) The maximum number of query-scheduler instances to use,
# regardless how many replicas are running. This option can be set only when
# -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all
# available query-scheduler instances.
# CLI flag: -query-scheduler.max-used-instances
[max_used_instances: <int> | default = 0]
```

### ruler
Expand Down
121 changes: 121 additions & 0 deletions integration/query_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// SPDX-License-Identifier: AGPL-3.0-only
//go:build requires_docker
// +build requires_docker

package integration

import (
"sort"
"testing"
"time"

"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2emimir"
)

func TestQuerySchedulerWithMaxUsedInstances(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
"-query-scheduler.service-discovery-mode": "ring",
"-query-scheduler.ring.store": "consul",
"-query-scheduler.max-used-instances": "1",
"-querier.max-concurrent": "4",
},
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

flags["-query-scheduler.ring.consul.hostname"] = consul.NetworkHTTPEndpoint()

// Start 2 query-scheduler. We override the address registered in the ring so that we can easily predict it
// when computing the expected in-use query-scheduler instance.
queryScheduler1 := e2emimir.NewQueryScheduler("query-scheduler-1", mergeFlags(flags, map[string]string{"-query-scheduler.ring.instance-addr": e2e.NetworkContainerHost(s.NetworkName(), "query-scheduler-1")}))
queryScheduler2 := e2emimir.NewQueryScheduler("query-scheduler-2", mergeFlags(flags, map[string]string{"-query-scheduler.ring.instance-addr": e2e.NetworkContainerHost(s.NetworkName(), "query-scheduler-2")}))
require.NoError(t, s.StartAndWaitReady(queryScheduler1, queryScheduler2))

// Start all other Mimir services.
queryFrontend := e2emimir.NewQueryFrontend("query-frontend", flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(queryFrontend, querier, ingester, distributor))

// Wait until distributor and querier have updated the ingesters ring.
for _, service := range []*e2emimir.MimirService{distributor, querier} {
require.NoError(t, service.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))),
"Service: %s", service.Name())
}

// Wait until query-frontend and querier have updated the query-schedulers ring.
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "query-frontend-query-scheduler-client"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "querier-query-scheduler-client"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Compute which is the expected in-use query-scheduler.
schedulers := []*e2emimir.MimirService{queryScheduler1, queryScheduler2}
sort.Slice(schedulers, func(i, j int) bool { return schedulers[i].NetworkGRPCEndpoint() < schedulers[j].NetworkGRPCEndpoint() })
inUseScheduler := schedulers[0]
notInUseScheduler := schedulers[1]

// We expect the querier to open 4 connections to the in-use scheduler, and 1 connection to the not-in-use one.
require.NoError(t, inUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(4), []string{"cortex_query_scheduler_connected_querier_clients"}))
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_scheduler_connected_querier_clients"}))

// We expect the query-frontend to only open connections to the in-use scheduler.
require.NoError(t, inUseScheduler.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_query_scheduler_connected_frontend_clients"}))
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_scheduler_connected_frontend_clients"}))

// Push some series to Mimir.
now := time.Now()
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})

c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query the series.
result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))

// Terminate the in-use query-scheduler.
require.NoError(t, s.Stop(inUseScheduler))

// We expect the querier to open 4 connections to the previously not-in-use scheduler.
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(4), []string{"cortex_query_scheduler_connected_querier_clients"}))

// We expect the query-frontend to open connections to the previously not-in-use scheduler.
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_query_scheduler_connected_frontend_clients"}))

// Query the series.
result, err = c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}
37 changes: 30 additions & 7 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util/servicediscovery"
)

const (
Expand Down Expand Up @@ -68,7 +69,7 @@ func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, requestsCh
}

var err error
f.schedulerDiscovery, err = schedulerdiscovery.NewServiceDiscovery(cfg.QuerySchedulerDiscovery, cfg.SchedulerAddress, cfg.DNSLookupPeriod, "query-frontend", f, log, reg)
f.schedulerDiscovery, err = schedulerdiscovery.New(cfg.QuerySchedulerDiscovery, cfg.SchedulerAddress, cfg.DNSLookupPeriod, "query-frontend", f, log, reg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -106,7 +107,14 @@ func (f *frontendSchedulerWorkers) stopping(_ error) error {
return err
}

func (f *frontendSchedulerWorkers) AddressAdded(address string) {
func (f *frontendSchedulerWorkers) InstanceAdded(instance servicediscovery.Instance) {
// Connect only to in-use query-scheduler instances.
if instance.InUse {
f.addScheduler(instance.Address)
}
}

func (f *frontendSchedulerWorkers) addScheduler(address string) {
f.mu.Lock()
ws := f.workers
w := f.workers[address]
Expand All @@ -118,10 +126,10 @@ func (f *frontendSchedulerWorkers) AddressAdded(address string) {
}
f.mu.Unlock()

level.Info(f.log).Log("msg", "adding connection to scheduler", "addr", address)
level.Info(f.log).Log("msg", "adding connection to query-scheduler", "addr", address)
conn, err := f.connectToScheduler(context.Background(), address)
if err != nil {
level.Error(f.log).Log("msg", "error connecting to scheduler", "addr", address, "err", err)
level.Error(f.log).Log("msg", "error connecting to query-scheduler", "addr", address, "err", err)
return
}

Expand All @@ -144,21 +152,36 @@ func (f *frontendSchedulerWorkers) AddressAdded(address string) {
w.start()
}

func (f *frontendSchedulerWorkers) AddressRemoved(address string) {
level.Info(f.log).Log("msg", "removing connection to scheduler", "addr", address)
func (f *frontendSchedulerWorkers) InstanceRemoved(instance servicediscovery.Instance) {
f.removeScheduler(instance.Address)
}

func (f *frontendSchedulerWorkers) removeScheduler(address string) {
f.mu.Lock()
// This works fine if f.workers is nil already.
// This works fine if f.workers is nil already or the worker is missing
// because the query-scheduler instance was not in use.
w := f.workers[address]
delete(f.workers, address)
f.mu.Unlock()

if w != nil {
level.Info(f.log).Log("msg", "removing connection to query-scheduler", "addr", address)
w.stop()
}
f.enqueuedRequests.Delete(prometheus.Labels{schedulerAddressLabel: address})
}

func (f *frontendSchedulerWorkers) InstanceChanged(instance servicediscovery.Instance) {
// Ensure the query-frontend connects to in-use query-scheduler instances and disconnect from ones no more in use.
// The called methods correctly handle the case the query-frontend is already connected/disconnected
// to/from the given query-scheduler instance.
if instance.InUse {
f.addScheduler(instance.Address)
} else {
f.removeScheduler(instance.Address)
}
}

// Get number of workers.
func (f *frontendSchedulerWorkers) getWorkersCount() int {
f.mu.Lock()
Expand Down
5 changes: 3 additions & 2 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util/servicediscovery"
)

const testFrontendWorkerConcurrency = 5
Expand Down Expand Up @@ -165,7 +166,7 @@ func TestFrontendRequestsPerWorkerMetric(t *testing.T) {
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))

// Manually remove the address, check that label is removed.
f.schedulerWorkers.AddressRemoved(f.cfg.SchedulerAddress)
f.schedulerWorkers.InstanceRemoved(servicediscovery.Instance{Address: f.cfg.SchedulerAddress, InUse: true})
expectedMetrics = ``
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_query_frontend_workers_enqueued_requests_total"))
}
Expand Down Expand Up @@ -308,7 +309,7 @@ func TestFrontendFailedCancellation(t *testing.T) {
}
f.schedulerWorkers.mu.Unlock()

f.schedulerWorkers.AddressRemoved(addr)
f.schedulerWorkers.InstanceRemoved(servicediscovery.Instance{Address: addr, InUse: true})

// Wait for worker goroutines to stop.
time.Sleep(100 * time.Millisecond)
Expand Down
Loading