Skip to content

Commit

Permalink
Add counter metrics for total requests going to queue (cortexproject#…
Browse files Browse the repository at this point in the history
…5030)

* add counter metrics for total requests going to queue

Signed-off-by: Ben Ye <benye@amazon.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

Signed-off-by: Ben Ye <benye@amazon.com>
Signed-off-by: Alex Le <leqiyue@amazon.com>
  • Loading branch information
yeya24 authored and alexqyle committed May 2, 2023
1 parent 1a15675 commit ace5cd1
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
* [FEATURE] Query Frontend: Log query params in query frontend even if error happens. #5005
* [FEATURE] Ingester: Enable snapshotting of In-memory TSDB on disk during shutdown via `-blocks-storage.tsdb.memory-snapshot-on-shutdown`. #5011
* [FEATURE] Query Frontend/Scheduler: Add a new counter metric `cortex_request_queue_requests_total` for total requests going to queue. #5030
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008

## 1.14.0 2022-12-02
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v1/frontend.go
Expand Up @@ -112,7 +112,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, f.limits)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, f.limits, registerer)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

var err error
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/v1/frontend_test.go
Expand Up @@ -133,6 +133,7 @@ func TestFrontendCheckReady(t *testing.T) {
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
limits,
nil,
),
}
for i := 0; i < tt.connectedClients; i++ {
Expand Down
11 changes: 9 additions & 2 deletions pkg/scheduler/queue/queue.go
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -58,15 +59,20 @@ type RequestQueue struct {
stopped bool

queueLength *prometheus.GaugeVec // Per user and reason.
totalRequests *prometheus.CounterVec // Per user.
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits, registerer prometheus.Registerer) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
totalRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_request_queue_requests_total",
Help: "Total number of query requests going to the request queue.",
}, []string{"user"}),
discardedRequests: discardedRequests,
}

q.cond = sync.NewCond(&q.mtx)
Expand Down Expand Up @@ -94,6 +100,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
return errors.New("no queue found")
}

q.totalRequests.WithLabelValues(userID).Inc()
select {
case queue <- req:
q.queueLength.WithLabelValues(userID).Inc()
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/queue/queue_test.go
Expand Up @@ -27,6 +27,7 @@ func BenchmarkGetNextRequest(b *testing.B) {
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
nil,
)
queues = append(queues, queue)

Expand Down Expand Up @@ -85,6 +86,7 @@ func BenchmarkQueueRequest(b *testing.B) {
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
nil,
)

for ix := 0; ix < queriers; ix++ {
Expand Down Expand Up @@ -119,6 +121,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
nil,
)

// Start the queue service.
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/scheduler.go
Expand Up @@ -111,7 +111,8 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, s.limits)

s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, s.limits, registerer)

s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",
Expand Down

0 comments on commit ace5cd1

Please sign in to comment.