Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: adds frontend.max-query-capacity to tune per-tenant query capacity #11284

Merged
merged 14 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [10727](https://github.com/grafana/loki/pull/10727) **sandeepsukhani** Native otlp ingestion support
* [11051](https://github.com/grafana/loki/pull/11051) Refactor to not use global logger in modules
* [10956](https://github.com/grafana/loki/pull/10956) **jeschkies** do not wrap requests but send pure Protobuf from frontend v2 via scheduler to querier when `-frontend.encoding=protobuf`.
* [11284](https://github.com/grafana/loki/pull/11284) **ashwanthgoli** Config: Adds `frontend.max-query-capacity` to tune per-tenant query capacity.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
14 changes: 14 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2760,6 +2760,20 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.max-queriers-per-tenant
[max_queriers_per_tenant: <int> | default = 0]

# How much of the available query capacity (queriers) can be used by a single
ashwanthgoli marked this conversation as resolved.
Show resolved Hide resolved
# tenant. Allowed values are 0.0 to 1.0. For example, setting this to 0.5 would
# allow a tenant to use half of the available queriers for processing the query
# workload. If set to 0, query capacity is determined by
# frontend.max-queriers-per-tenant. When both frontend.max-queriers-per-tenant
# and frontend.max-query-capacity are configured, smaller value of the resulting
# querier replica count is considered: min(frontend.max-queriers-per-tenant,
# ceil(querier_replicas * frontend.max-query-capacity)). *All* queriers will
# handle requests for the tenant if neither limits are applied. This option only
# works with queriers connecting to the query-frontend / query-scheduler, not
# when using downstream URL.
ashwanthgoli marked this conversation as resolved.
Show resolved Hide resolved
# CLI flag: -frontend.max-query-capacity
[max_query_capacity: <float> | default = 0]

# Number of days of index to be kept always downloaded for queries. Applies only
# to per user index in boltdb-shipper index store. 0 to disable.
# CLI flag: -store.query-ready-index-num-days
Expand Down
12 changes: 10 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ type Gateway struct {
workerConfig workerConfig
}

type fixedQueueLimits struct {
maxConsumers int
}

func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
return l.maxConsumers
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
Expand All @@ -180,7 +188,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem),
}

g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics)
g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, &fixedQueueLimits{100}, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm)
Expand Down Expand Up @@ -296,7 +304,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID)
g.queue.Enqueue(tenantID, []string{}, task, 100, func() {
g.queue.Enqueue(tenantID, []string{}, task, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})
Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,8 @@ type disabledShuffleShardingLimits struct{}

func (disabledShuffleShardingLimits) MaxQueriersPerUser(_ string) int { return 0 }

func (disabledShuffleShardingLimits) MaxQueryCapacity(_ string) float64 { return 0 }

func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")

Expand Down
16 changes: 8 additions & 8 deletions pkg/lokifrontend/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb"
"github.com/grafana/loki/pkg/querier/stats"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/scheduler/limits"
"github.com/grafana/loki/pkg/util"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
"github.com/grafana/loki/pkg/util/validation"
)

var errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
Expand All @@ -43,6 +43,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Limits interface {
// Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
MaxQueriersPerUser(user string) int

// MaxQueryCapacity returns how much of the available query capacity can be used by this user.
MaxQueryCapacity(user string) float64
}

// Frontend queues HTTP requests, dispatches them to backends, and handles retries
Expand Down Expand Up @@ -80,12 +83,12 @@ type request struct {
}

// New creates a new frontend. Frontend implements service, and must be started and stopped.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, metricsNamespace string) (*Frontend, error) {
func New(cfg Config, frontendLimits Limits, log log.Logger, registerer prometheus.Registerer, metricsNamespace string) (*Frontend, error) {
queueMetrics := queue.NewMetrics(registerer, metricsNamespace, "query_frontend")
f := &Frontend{
cfg: cfg,
log: log,
limits: limits,
limits: frontendLimits,
queueMetrics: queueMetrics,
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Expand All @@ -95,7 +98,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, queueMetrics)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, limits.NewQueueLimits(frontendLimits), queueMetrics)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

var err error
Expand Down Expand Up @@ -312,13 +315,10 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
req.enqueueTime = now
req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued")

// aggregate the max queriers limit in the case of a multi tenant query
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser)

joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)

err = f.requestQueue.Enqueue(joinedTenantID, nil, req, maxQueriers, nil)
err = f.requestQueue.Enqueue(joinedTenantID, nil, req, nil)
if err == queue.ErrTooManyRequests {
return errTooManyRequest
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/lokifrontend/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
querier_worker "github.com/grafana/loki/pkg/querier/worker"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/scheduler/limits"
"github.com/grafana/loki/pkg/util/constants"
)

Expand Down Expand Up @@ -135,7 +136,7 @@ func TestFrontendCheckReady(t *testing.T) {
qm := queue.NewMetrics(nil, constants.Loki, "query_frontend")
f := &Frontend{
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, 0, qm),
requestQueue: queue.NewRequestQueue(5, 0, limits.NewQueueLimits(nil), qm),
}
for i := 0; i < tt.connectedClients; i++ {
f.requestQueue.RegisterConsumerConnection("test")
Expand Down Expand Up @@ -243,7 +244,7 @@ func testFrontend(t *testing.T, config Config, handler queryrangebase.Handler, t
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

v1, err := New(config, limits{}, logger, reg, constants.Loki)
v1, err := New(config, mockLimits{}, logger, reg, constants.Loki)
require.NoError(t, err)
require.NotNil(t, v1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
Expand Down Expand Up @@ -293,10 +294,15 @@ func defaultFrontendConfig() Config {
return config
}

type limits struct {
queriers int
type mockLimits struct {
queriers int
queryCapacity float64
}

func (l limits) MaxQueriersPerUser(_ string) int {
func (l mockLimits) MaxQueriersPerUser(_ string) int {
return l.queriers
}

func (l mockLimits) MaxQueryCapacity(_ string) float64 {
return l.queryCapacity
}
2 changes: 1 addition & 1 deletion pkg/lokifrontend/frontend/v1/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func setupFrontend(t *testing.T, config Config) *Frontend {
logger := log.NewNopLogger()

frontend, err := New(config, limits{queriers: 3}, logger, nil, constants.Loki)
frontend, err := New(config, mockLimits{queriers: 3}, logger, nil, constants.Loki)
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
28 changes: 14 additions & 14 deletions pkg/queue/dequeue_qos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func enqueueRequestsForActor(t testing.TB, actor []string, useActor bool, queue
if !useActor {
actor = nil
}
err := queue.Enqueue("tenant", actor, r, 0, nil)
err := queue.Enqueue("tenant", actor, r, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -58,7 +58,7 @@ func BenchmarkQueryFairness(t *testing.B) {

for _, useActor := range []bool{false, true} {
t.Run(fmt.Sprintf("use hierarchical queues = %v", useActor), func(t *testing.B) {
requestQueue := NewRequestQueue(1024, 0, NewMetrics(nil, constants.Loki, "query_scheduler"))
requestQueue := NewRequestQueue(1024, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler"))
enqueueRequestsForActor(t, []string{}, useActor, requestQueue, numSubRequestsActorA, 50*time.Millisecond)
enqueueRequestsForActor(t, []string{"a"}, useActor, requestQueue, numSubRequestsActorA, 100*time.Millisecond)
enqueueRequestsForActor(t, []string{"b"}, useActor, requestQueue, numSubRequestsActorB, 50*time.Millisecond)
Expand Down Expand Up @@ -133,18 +133,18 @@ func TestQueryFairnessAcrossSameLevel(t *testing.T) {
456: [210]
**/

requestQueue := NewRequestQueue(1024, 0, NewMetrics(nil, constants.Loki, "query_scheduler"))
_ = requestQueue.Enqueue("tenant1", []string{}, r(0), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{}, r(1), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{}, r(2), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(10), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(11), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(12), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(20), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(21), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(22), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "123"}, r(200), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "456"}, r(210), 0, nil)
requestQueue := NewRequestQueue(1024, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler"))
_ = requestQueue.Enqueue("tenant1", []string{}, r(0), nil)
_ = requestQueue.Enqueue("tenant1", []string{}, r(1), nil)
_ = requestQueue.Enqueue("tenant1", []string{}, r(2), nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(10), nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(11), nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(12), nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(20), nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(21), nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(22), nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "123"}, r(200), nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "456"}, r(210), nil)
requestQueue.queues.recomputeUserConsumers()

items := make([]int, 0)
Expand Down
23 changes: 12 additions & 11 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func (ui QueueIndex) ReuseLastIndex() QueueIndex {
return ui - 1
}

type Limits interface {
// MaxConsumers returns the max consumers to use per tenant or 0 to allow all consumers to consume from the queue.
MaxConsumers(user string, allConsumers int) int
}

// Request stored into the queue.
type Request any

Expand All @@ -62,9 +67,9 @@ type RequestQueue struct {
pool *SlicePool[Request]
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, limits Limits, metrics *Metrics) *RequestQueue {
q := &RequestQueue{
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay),
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay, limits),
connectedConsumers: atomic.NewInt32(0),
metrics: metrics,
pool: NewSlicePool[Request](1<<6, 1<<10, 2), // Buckets are [64, 128, 256, 512, 1024].
Expand All @@ -76,23 +81,19 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, met
return q
}

// Enqueue puts the request into the queue. MaxQueries is tenant-specific value that specifies how many queriers can
// this tenant use (zero or negative = all queriers). It is passed to each Enqueue, because it can change
// between calls.
//
// Enqueue puts the request into the queue.
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQueriers int, successFn func()) error {
func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, successFn func()) error {
q.mtx.Lock()
defer q.mtx.Unlock()

if q.stopped {
return ErrStopped
}

queue := q.queues.getOrAddQueue(tenant, path, maxQueriers)
if queue == nil {
// This can only happen if tenant is "".
return errors.New("no queue found")
queue, err := q.queues.getOrAddQueue(tenant, path)
if err != nil {
return fmt.Errorf("no queue found: %w", err)
}

// Optimistically increase queue counter for tenant instead of doing separate
Expand Down
28 changes: 14 additions & 14 deletions pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func BenchmarkGetNextRequest(b *testing.B) {

queues := make([]*RequestQueue, 0, b.N)
for n := 0; n < b.N; n++ {
queue := NewRequestQueue(maxOutstandingPerTenant, 0, NewMetrics(nil, constants.Loki, "query_scheduler"))
queue := NewRequestQueue(maxOutstandingPerTenant, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler"))
queues = append(queues, queue)

for ix := 0; ix < queriers; ix++ {
Expand All @@ -57,7 +57,7 @@ func BenchmarkGetNextRequest(b *testing.B) {
for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
userID := strconv.Itoa(j)
err := queue.Enqueue(userID, benchCase.fn(j), "request", 0, nil)
err := queue.Enqueue(userID, benchCase.fn(j), "request", nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func BenchmarkQueueRequest(b *testing.B) {
requests := make([]string, 0, numTenants)

for n := 0; n < b.N; n++ {
q := NewRequestQueue(maxOutstandingPerTenant, 0, NewMetrics(nil, constants.Loki, "query_scheduler"))
q := NewRequestQueue(maxOutstandingPerTenant, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler"))

for ix := 0; ix < queriers; ix++ {
q.RegisterConsumerConnection(fmt.Sprintf("querier-%d", ix))
Expand All @@ -123,7 +123,7 @@ func BenchmarkQueueRequest(b *testing.B) {
for n := 0; n < b.N; n++ {
for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
err := queues[n].Enqueue(users[j], nil, requests[j], 0, nil)
err := queues[n].Enqueue(users[j], nil, requests[j], nil)
if err != nil {
b.Fatal(err)
}
Expand All @@ -135,7 +135,7 @@ func BenchmarkQueueRequest(b *testing.B) {
func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second

queue := NewRequestQueue(1, forgetDelay, NewMetrics(nil, constants.Loki, "query_scheduler"))
queue := NewRequestQueue(1, forgetDelay, &mockQueueLimits{maxConsumers: 1}, NewMetrics(nil, constants.Loki, "query_scheduler"))

// Start the queue service.
ctx := context.Background()
Expand All @@ -162,7 +162,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe

// Enqueue a request from an user which would be assigned to querier-1.
// NOTE: "user-1" hash falls in the querier-1 shard.
require.NoError(t, queue.Enqueue("user-1", nil, "request", 1, nil))
require.NoError(t, queue.Enqueue("user-1", nil, "request", nil))

startTime := time.Now()
querier2wg.Wait()
Expand Down Expand Up @@ -306,17 +306,17 @@ func TestContextCond(t *testing.T) {
func TestMaxQueueSize(t *testing.T) {
t.Run("queue size is tracked per tenant", func(t *testing.T) {
maxSize := 3
queue := NewRequestQueue(maxSize, 0, NewMetrics(nil, constants.Loki, "query_scheduler"))
queue := NewRequestQueue(maxSize, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler"))
queue.RegisterConsumerConnection("querier")

// enqueue maxSize items with different actors
// different actors have individual channels with maxSize length
assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 1, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 2, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-c"}, 3, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 1, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 2, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-c"}, 3, nil))

// max queue length per tenant is tracked globally for all actors within a tenant
err := queue.Enqueue("tenant", []string{"user-a"}, 4, 0, nil)
err := queue.Enqueue("tenant", []string{"user-a"}, 4, nil)
assert.Equal(t, err, ErrTooManyRequests)

// dequeue and enqueue some items
Expand All @@ -325,10 +325,10 @@ func TestMaxQueueSize(t *testing.T) {
_, _, err = queue.Dequeue(context.Background(), StartIndexWithLocalQueue, "querier")
assert.NoError(t, err)

assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 4, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 5, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 4, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 5, nil))

err = queue.Enqueue("tenant", []string{"user-c"}, 6, 0, nil)
err = queue.Enqueue("tenant", []string{"user-c"}, 6, nil)
assert.Equal(t, err, ErrTooManyRequests)
})
}
Expand Down