Skip to content

Commit

Permalink
feat: instrument the query layer to track rate-limited queries (#3894)
Browse files Browse the repository at this point in the history
* feat: instrument the query layer to track rate-limited queries

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>

* chore: update changelog

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>

* fix: fix goimports linting error

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>

* fix per PR comments

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>

* rename discarded_queries --> discarded_requests

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>

* fix lint

Signed-off-by: Jacob Lisi <jacob.t.lisi@gmail.com>
  • Loading branch information
jtlisi committed Mar 2, 2021
1 parent e02797a commit 03d4318
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -30,6 +30,9 @@
* `-alertmanager.alertmanager-client.tls-server-name`
* `-alertmanager.alertmanager-client.tls-insecure-skip-verify`
* [FEATURE] Compactor: added blocks storage per-tenant retention support. This is configured via `-compactor.retention-period`, and can be overridden on a per-tenant basis. #3879
* [ENHANCEMENT] Queries: Instrument queries that were discarded due to the configured `max_outstanding_requests_per_tenant`. #3894
* `cortex_query_frontend_discarded_requests_total`
* `cortex_query_scheduler_discarded_requests_total`
* [ENHANCEMENT] Ruler: Add TLS and explicit basis authentication configuration options for the HTTP client the ruler uses to communicate with the alertmanager. #3752
* `-ruler.alertmanager-client.basic-auth-username`: Configure the basic authentication username used by the client. Takes precedent over a URL configured username.
* `-ruler.alertmanager-client.basic-auth-password`: Configure the basic authentication password used by the client. Takes precedent over a URL configured password.
Expand Down
14 changes: 10 additions & 4 deletions pkg/frontend/v1/frontend.go
Expand Up @@ -57,9 +57,10 @@ type Frontend struct {
activeUsers *util.ActiveUsersCleanupService

// Metrics.
queueLength *prometheus.GaugeVec
numClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
queueLength *prometheus.GaugeVec
discardedRequests *prometheus.CounterVec
numClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
}

type request struct {
Expand All @@ -83,14 +84,18 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"}),
discardedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"}),
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength, f.discardedRequests)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Expand All @@ -114,6 +119,7 @@ func (f *Frontend) stopping(_ error) error {

func (f *Frontend) cleanupInactiveUserMetrics(user string) {
f.queueLength.DeleteLabelValues(user)
f.discardedRequests.DeleteLabelValues(user)
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
Expand Down
7 changes: 5 additions & 2 deletions pkg/frontend/v1/frontend_test.go
Expand Up @@ -127,8 +127,11 @@ func TestFrontendCheckReady(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
f := &Frontend{
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"})),
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
),
}
for i := 0; i < tt.connectedClients; i++ {
f.requestQueue.RegisterQuerierConnection("test")
Expand Down
7 changes: 5 additions & 2 deletions pkg/scheduler/queue/queue.go
Expand Up @@ -47,14 +47,16 @@ type RequestQueue struct {
queues *queues
stopped bool

queueLength *prometheus.GaugeVec // Per user.
queueLength *prometheus.GaugeVec // Per user and reason.
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
}

q.cond = sync.NewCond(&q.mtx)
Expand Down Expand Up @@ -91,6 +93,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
}
return nil
default:
q.discardedRequests.WithLabelValues(userID).Inc()
return ErrTooManyRequests
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/scheduler/queue/queue_test.go
Expand Up @@ -17,7 +17,10 @@ func BenchmarkGetNextRequest(b *testing.B) {
queues := make([]*RequestQueue, 0, b.N)

for n := 0; n < b.N; n++ {
queue := NewRequestQueue(maxOutstandingPerTenant, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}))
queue := NewRequestQueue(maxOutstandingPerTenant,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
queues = append(queues, queue)

for ix := 0; ix < queriers; ix++ {
Expand Down Expand Up @@ -71,7 +74,10 @@ func BenchmarkQueueRequest(b *testing.B) {
requests := make([]string, 0, numTenants)

for n := 0; n < b.N; n++ {
q := NewRequestQueue(maxOutstandingPerTenant, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}))
q := NewRequestQueue(maxOutstandingPerTenant,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)

for ix := 0; ix < queriers; ix++ {
q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
Expand Down
9 changes: 8 additions & 1 deletion pkg/scheduler/scheduler.go
Expand Up @@ -55,6 +55,7 @@ type Scheduler struct {

// Metrics.
queueLength *prometheus.GaugeVec
discardedRequests *prometheus.CounterVec
connectedQuerierClients prometheus.GaugeFunc
connectedFrontendClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
Expand Down Expand Up @@ -100,7 +101,12 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, s.queueLength)

s.discardedRequests = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, s.queueLength, s.discardedRequests)

s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",
Expand Down Expand Up @@ -471,6 +477,7 @@ func (s *Scheduler) stopping(_ error) error {

func (s *Scheduler) cleanupMetricsForInactiveUser(user string) {
s.queueLength.DeleteLabelValues(user)
s.discardedRequests.DeleteLabelValues(user)
}

func (s *Scheduler) getConnectedFrontendClientsMetric() float64 {
Expand Down

0 comments on commit 03d4318

Please sign in to comment.