Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add counter metrics for total requests going to queue #5030

Merged
merged 4 commits into from Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
* [FEATURE] Query Frontend: Log query params in query frontend even if error happens. #5005
* [FEATURE] Ingester: Enable snapshotting of In-memory TSDB on disk during shutdown via `-blocks-storage.tsdb.memory-snapshot-on-shutdown`. #5011
* [FEATURE] Query Frontend/Scheduler: Add a new counter metric `cortex_request_queue_requests_total` for total requests going to queue. #5030
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008

## 1.14.0 2022-12-02
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v1/frontend.go
Expand Up @@ -112,7 +112,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, f.limits)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests, f.limits, registerer)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

var err error
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/v1/frontend_test.go
Expand Up @@ -133,6 +133,7 @@ func TestFrontendCheckReady(t *testing.T) {
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
limits,
nil,
),
}
for i := 0; i < tt.connectedClients; i++ {
Expand Down
11 changes: 9 additions & 2 deletions pkg/scheduler/queue/queue.go
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -58,15 +59,20 @@ type RequestQueue struct {
stopped bool

queueLength *prometheus.GaugeVec // Per user and reason.
totalRequests *prometheus.CounterVec // Per user.
discardedRequests *prometheus.CounterVec // Per user.
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits, registerer prometheus.Registerer) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
totalRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_request_queue_requests_total",
Help: "Total number of query requests going to the request queue.",
}, []string{"user"}),
discardedRequests: discardedRequests,
}

q.cond = sync.NewCond(&q.mtx)
Expand Down Expand Up @@ -94,6 +100,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
return errors.New("no queue found")
}

q.totalRequests.WithLabelValues(userID).Inc()
select {
case queue <- req:
q.queueLength.WithLabelValues(userID).Inc()
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/queue/queue_test.go
Expand Up @@ -27,6 +27,7 @@ func BenchmarkGetNextRequest(b *testing.B) {
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
nil,
)
queues = append(queues, queue)

Expand Down Expand Up @@ -85,6 +86,7 @@ func BenchmarkQueueRequest(b *testing.B) {
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
nil,
)

for ix := 0; ix < queriers; ix++ {
Expand Down Expand Up @@ -119,6 +121,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
MockLimits{MaxOutstanding: 100},
nil,
)

// Start the queue service.
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/scheduler.go
Expand Up @@ -111,7 +111,8 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, s.limits)

s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, s.limits, registerer)

s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",
Expand Down