Skip to content

Commit

Permalink
Implementing generalized tree queue with node state implemented by te…
Browse files Browse the repository at this point in the history
…nant-querier assignments (#7873)

* Sketching out new tree queue with shuffle shard nodes

* Add test to illustrate updating tenantQuerierAssignments outside of tree

* pull out dequeue ops

* Deduplicate node logic and extract node state types

* Add a test for reassigning tenantQuerierID map value

* added back lastTenantIndex concept, and empty tenant placeholders

* actually add queuing algorithms

* Port extant, relevant tree queue tests to new Tree, and finish ripping previous tree queue iteration

* Add config to flip tree to query component -> tenant

* Add EnqueueFrontByPath tests, makeQueuePath based on prioritizeQueryComponents config

* Re-include original tree queue with a config switch

* Implement state update fn and bring back TreeQueue-specific tests

* Update CHANGELOG and flag name

* Fix tenant removal on dequeue for legacy TreeQueue

* Fix CHANGELOG, add clarity to comments, and add nil check for queueElement

* Check tree item count after enqueues in test
  • Loading branch information
chencs committed Jul 5, 2024
1 parent bfe49ad commit 40cfafd
Show file tree
Hide file tree
Showing 16 changed files with 2,889 additions and 885 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## main / unreleased
* [ENHANCEMENT] Query-scheduler: Introduce `query-scheduler.use-multi-algorithm-query-queue`, which allows use of an experimental queue structure, with no change in external queue behavior. #7873

### Grafana Mimir

Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -15951,6 +15951,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "use_multi_algorithm_query_queue",
"required": false,
"desc": "Use an experimental version of the query queue which has the same behavior as the existing queue, but integrates tenant selection into the tree model.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "query-scheduler.use-multi-algorithm-query-queue",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "querier_forget_delay",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2323,6 +2323,8 @@ Usage of ./cmd/mimir/mimir:
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-query-scheduler.service-discovery-mode string
[experimental] Service discovery mode that query-frontends and queriers use to find query-scheduler instances. When query-scheduler ring-based service discovery is enabled, this option needs be set on query-schedulers, query-frontends and queriers. Supported values are: dns, ring. (default "dns")
-query-scheduler.use-multi-algorithm-query-queue
[experimental] Use an experimental version of the query queue which has the same behavior as the existing queue, but integrates tenant selection into the tree model.
-ruler-storage.azure.account-key string
Azure storage account key. If unset, Azure managed identities will be used for authentication instead.
-ruler-storage.azure.account-name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,12 @@ The `query_scheduler` block configures the query-scheduler.
# CLI flag: -query-scheduler.additional-query-queue-dimensions-enabled
[additional_query_queue_dimensions_enabled: <boolean> | default = false]

# (experimental) Use an experimental version of the query queue which has the
# same behavior as the existing queue, but integrates tenant selection into the
# tree model.
# CLI flag: -query-scheduler.use-multi-algorithm-query-queue
[use_multi_algorithm_query_queue: <boolean> | default = false]

# (experimental) If a querier disconnects without sending notification about
# graceful shutdown, the query-scheduler will keep the querier in the tenant's
# shard until the forget delay has passed. This feature is useful to reduce the
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
log,
cfg.MaxOutstandingPerTenant,
false,
false,
cfg.QuerierForgetDelay,
f.queueLength,
f.discardedRequests,
Expand Down Expand Up @@ -251,7 +252,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {

/*
We want to dequeue the next unexpired request from the chosen tenant queue.
The chance of choosing a particular tenant for dequeueing is (1/active_tenants).
The chance of choosing a particular tenant for dequeuing is (1/active_tenants).
This is problematic under load, especially with other middleware enabled such as
querier.split-by-interval, where one request may fan out into many.
If expired requests aren't exhausted before checking another tenant, it would take
Expand Down
283 changes: 283 additions & 0 deletions pkg/scheduler/queue/multi_queuing_algorithm_tree_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// SPDX-License-Identifier: AGPL-3.0-only

package queue

import (
"container/list"
"fmt"
)

type QueuePath []string //nolint:revive // disallows types beginning with package name
type QueueIndex int //nolint:revive // disallows types beginning with package name

const localQueueIndex = -1

type Tree interface {
EnqueueFrontByPath(QueuePath, any) error
EnqueueBackByPath(QueuePath, any) error
Dequeue() (QueuePath, any)
ItemCount() int
IsEmpty() bool
}

// MultiQueuingAlgorithmTreeQueue holds metadata and a pointer to the root node of a hierarchical queue implementation.
// The root Node maintains a localQueue and an arbitrary number of child nodes (which themselves
// may have local queues and children). Each Node in MultiQueuingAlgorithmTreeQueue uses a QueuingAlgorithm (determined by
// node depth) to determine dequeue order of that Node's subtree.
//
// Each queuing dimension is modeled as a node in the tree, internally reachable through a QueuePath.
//
// The QueuePath is an ordered array of strings describing the path from the tree root to a Node.
// In addition to child Nodes, each Node contains a local queue (FIFO) of items.
//
// When dequeuing from a given node, a Node will use its QueuingAlgorithm to choose either itself
// or a child node to dequeue from recursively (i.e., a child Node will use its own QueuingAlgorithm
// to determine how to proceed). MultiQueuingAlgorithmTreeQueue will not dequeue from two different Nodes at the same depth
// consecutively, unless the previously-checked Node was empty down to the leaf node.
type MultiQueuingAlgorithmTreeQueue struct {
rootNode *Node
algosByDepth []QueuingAlgorithm
}

func NewTree(queuingAlgorithms ...QueuingAlgorithm) (*MultiQueuingAlgorithmTreeQueue, error) {
if len(queuingAlgorithms) == 0 {
return nil, fmt.Errorf("cannot create a tree without defined QueuingAlgorithm")
}
root, err := newNode("root", 0, queuingAlgorithms[0])
if err != nil {
return nil, err
}
root.depth = 0
return &MultiQueuingAlgorithmTreeQueue{
rootNode: root,
algosByDepth: queuingAlgorithms,
}, nil
}

func (t *MultiQueuingAlgorithmTreeQueue) ItemCount() int {
return t.rootNode.ItemCount()
}

func (t *MultiQueuingAlgorithmTreeQueue) IsEmpty() bool {
return t.rootNode.IsEmpty()
}

// Dequeue removes and returns an item from the front of the next appropriate Node in the MultiQueuingAlgorithmTreeQueue, as
// well as the path to the Node which that item was dequeued from.
//
// Either the root/self node or a child node is chosen according to the Node's QueuingAlgorithm. If
// the root node is chosen, an item will be dequeued from the front of its localQueue. If a child
// node is chosen, it is recursively dequeued from until a node selects its localQueue.
//
// Nodes that empty down to the leaf after being dequeued from (or which are found to be empty leaf
// nodes during the dequeue operation) are deleted as the recursion returns up the stack. This
// maintains structural guarantees relied upon to make IsEmpty() non-recursive.
func (t *MultiQueuingAlgorithmTreeQueue) Dequeue() (QueuePath, any) {
path, v := t.rootNode.dequeue()
// The returned node dequeue path includes the root node; exclude
// this so that the return path can be used if needed to enqueue.
return path[1:], v
}

// EnqueueBackByPath enqueues an item in the back of the local queue of the node
// located at a given path through the tree; nodes for the path are created as needed.
//
// path is relative to the root node; providing a QueuePath beginning with "root"
// will create a child node of the root node which is also named "root."
func (t *MultiQueuingAlgorithmTreeQueue) EnqueueBackByPath(path QueuePath, v any) error {
return t.rootNode.enqueueBackByPath(t, path, v)
}

// EnqueueFrontByPath enqueues an item in the front of the local queue of the Node
// located at a given path through the MultiQueuingAlgorithmTreeQueue; nodes for the path are created as needed.
//
// Enqueueing to the front is intended only for items which were first enqueued to the back
// and then dequeued after reaching the front.
//
// Re-enqueueing to the front is only intended for use in cases where a queue consumer
// fails to complete operations on the dequeued item, but failure is not yet final, and the
// operations should be retried by a subsequent queue consumer. A concrete example is when
// a queue consumer fails or disconnects for unrelated reasons while we are in the process
// of dequeuing a request for it.
//
// path must be relative to the root node; providing a QueuePath beginning with "root"
// will create a child node of root which is also named "root."
func (t *MultiQueuingAlgorithmTreeQueue) EnqueueFrontByPath(path QueuePath, v any) error {
return t.rootNode.enqueueFrontByPath(t, path, v)
}

func (t *MultiQueuingAlgorithmTreeQueue) GetNode(path QueuePath) *Node {
return t.rootNode.getNode(path)
}

// Node maintains node-specific information used to enqueue and dequeue to itself, such as a local
// queue, node depth, references to its children, and position in queue.
// Note that the tenantQuerierAssignments QueuingAlgorithm largely disregards Node's queueOrder and
// queuePosition, managing analogous state instead, because shuffle-sharding + fairness requirements
// necessitate input from the querier.
type Node struct {
name string
localQueue *list.List
queuePosition int // next index in queueOrder to dequeue from
queueOrder []string // order for dequeuing from self/children
queueMap map[string]*Node
depth int
queuingAlgorithm QueuingAlgorithm
childrenChecked int
}

func newNode(name string, depth int, da QueuingAlgorithm) (*Node, error) {
if da == nil {
return nil, fmt.Errorf("cannot create a node without a defined QueuingAlgorithm")
}
return &Node{
name: name,
localQueue: list.New(),
queuePosition: localQueueIndex,
queueOrder: make([]string, 0),
queueMap: make(map[string]*Node, 1),
depth: depth,
queuingAlgorithm: da,
}, nil
}

func (n *Node) IsEmpty() bool {
// avoid recursion to make this a cheap operation
//
// Because we dereference empty child nodes during dequeuing,
// we assume that emptiness means there are no child nodes
// and nothing in this tree node's local queue.
//
// In reality a package member could attach empty child queues with getOrAddNode
// in order to get a functionally-empty tree that would report false for IsEmpty.
// We assume this does not occur or is not relevant during normal operation.
return n.localQueue.Len() == 0 && len(n.queueMap) == 0
}

// ItemCount counts the queue items in the Node and in all its children, recursively.
func (n *Node) ItemCount() int {
items := n.localQueue.Len()
for _, child := range n.queueMap {
items += child.ItemCount()
}
return items
}

func (n *Node) Name() string {
return n.name
}

func (n *Node) getLocalQueue() *list.List {
return n.localQueue
}

func (n *Node) enqueueFrontByPath(tree *MultiQueuingAlgorithmTreeQueue, pathFromNode QueuePath, v any) error {
childNode, err := n.getOrAddNode(pathFromNode, tree)
if err != nil {
return err
}
childNode.localQueue.PushFront(v)
return nil
}

func (n *Node) enqueueBackByPath(tree *MultiQueuingAlgorithmTreeQueue, pathFromNode QueuePath, v any) error {
childNode, err := n.getOrAddNode(pathFromNode, tree)
if err != nil {
return err
}
childNode.localQueue.PushBack(v)
return nil
}

func (n *Node) dequeue() (QueuePath, any) {
var v any
var childPath QueuePath

path := QueuePath{n.name}

if n.IsEmpty() {
return path, nil
}

var checkedAllNodes bool
var dequeueNode *Node
// continue until we've found a value or checked all nodes that need checking
for v == nil && !checkedAllNodes {
dequeueNode, checkedAllNodes = n.queuingAlgorithm.dequeueSelectNode(n)
switch dequeueNode {
// dequeuing from local queue
case n:
if n.localQueue.Len() > 0 {
// dequeueNode is self, local queue non-empty
if elt := n.localQueue.Front(); elt != nil {
n.localQueue.Remove(elt)
v = elt.Value
}
}
// no dequeue-able child found; break out of the loop,
// since we won't find anything to dequeue if we don't
// have a node to dequeue from now
case nil:
checkedAllNodes = true
// dequeue from a child
default:
childPath, v = dequeueNode.dequeue()
}

if v == nil {
n.childrenChecked++
}

n.queuingAlgorithm.dequeueUpdateState(n, dequeueNode)
}
// reset childrenChecked to 0 before completing this dequeue
n.childrenChecked = 0
return append(path, childPath...), v
}

func (n *Node) getNode(pathFromNode QueuePath) *Node {
if len(pathFromNode) == 0 {
return n
}

if n.queueMap == nil {
return nil
}

if childQueue, ok := n.queueMap[pathFromNode[0]]; ok {
return childQueue.getNode(pathFromNode[1:])
}

// no child node matches next path segment
return nil
}

// getOrAddNode recursively gets or adds tree queue nodes based on given relative child path. It
// checks whether the first node in pathFromNode exists in the Node's children; if no node exists,
// one is created and added to the Node's queueOrder, according to the Node's QueuingAlgorithm.
//
// pathFromNode must be relative to the receiver node; providing a QueuePath beginning with
// the receiver/parent node name will create a child node of the same name as the parent.
func (n *Node) getOrAddNode(pathFromNode QueuePath, tree *MultiQueuingAlgorithmTreeQueue) (*Node, error) {
if len(pathFromNode) == 0 {
return n, nil
}

var childNode *Node
var ok bool
var err error
if childNode, ok = n.queueMap[pathFromNode[0]]; !ok {
// child does not exist, create it
if n.depth+1 >= len(tree.algosByDepth) {
return nil, fmt.Errorf("cannot add a node beyond max tree depth: %v", len(tree.algosByDepth))
}
childNode, err = newNode(pathFromNode[0], n.depth+1, tree.algosByDepth[n.depth+1])
if err != nil {
return nil, err
}
// add the newly created child to the node
n.queuingAlgorithm.addChildNode(n, childNode)

}
return childNode.getOrAddNode(pathFromNode[1:], tree)
}
Loading

0 comments on commit 40cfafd

Please sign in to comment.