Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query laning and load shedding #9407

Merged

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Feb 26, 2020

Description

Initial piece of #6993, adding basic query laning facilities for capacity enforcement. Note that to keep things simpler to review, this PR does not include automatic prioritization, this will be done in a follow-up PR that builds on top of this branch.

This PR introduces the concept of query lanes, and using them as a mean to control capacity allotments to 'classes' of queries. Based on the vivid discussion in #6993, a QueryLaningStrategy interface has been introduced to try and ensure this foundation is appropriately expressive for the variety of strategies discussed in that PR.

To make this happen, an opinionated stance has been taken on the purpose of QueryManager, which has been renamed QueryScheduler. QueryScheduler, in addition to continuing to provide cancellation facilities of QueryManager, is now also responsible for enforcing lane limits, by computing what lane a query belongs to with the help of the configured QueryLaningStrategy, and trying to acquire a semaphore. If available, query processing proceeds, otherwise a new QueryCapacityExceededException is thrown, which currently is mapped to an HTTP 429 response code (instead of 503 for the reasons discussed in #6993) by the QueryResource and SqlResource endpoints.

The lane assignment is happens on the broker, in CachingClusteredClient so that the laning strategy has access to the number of segments taking part in the query as well as the pool of servers available to query for those segments.

As a proof of concept, two initial laning strategies are included in this PR, a 'No laning strategy' which does nothing and behaves as the current behavior of the only limit being the size of the HTTP thread pool, and a 'High/Low' laning strategy, which classes any query with a priority set in the query context less than 0 to be in the 'low' lane.

The lane semaphores are provided through resilience4j-bulkhead, which simplified the code a bit compared to the map of semaphores my initial prototype relied on, but functionally appears to have little difference. I encountered this library from #8357, which proposes using an alternative library, concurrency-limits which is very interesting, but I was worried about the initial complexity of using measurement based dynamic limits for the lane enforcement.

I do think this is worth exploring in the future, either strictly for total capacity enforcement or for more I am currently unsure. AbstractPartitionedLimiter seems very close to what we might want to enforce laning, but it provides a slightly different at least guarantee than the at most guarantee that this PR is currently providing, where I feel the at most works a bit better with our fixed Jetty thread pool size. However, since lane limit enforcement is entirely contained within QueryScheduler, it is likely not a ton of effort to switch libraries if we find in the future that we can get better behavior with concurrency-limits, just as I swapped from my map of Semaphore to using BulkheadRegistry.

Why not QosFilter

Jetty has a QoSFilter which almost provided the machinery required for the total capacity enforcement, but logically does not occur late enough to have sufficient information to be able to perform some of the decisions desired in #6993, particularly with regards to things like number of segments or information about which servers are available to take part in a query, which required being further down in CachingClusteredClient closer to where the actual query stuffs are happening.

Follow-up improvements

Manual laning and end to end integration tests

A query lane is just a query context property, so allowing a custom static lane limit configuration is a low effort follow-up that could probably support a lot of simple custom capacity planning solutions, and easier testing of this functionality in integration tests. clintropolis/druid@query-laning-and-load-shedding...clintropolis:manual-query-laning

Automatic prioritization

The other half of #6993, I want to follow-up this PR immediately with another that will add in the threshold based automatic prioritization described in the design document.
clintropolis/druid@query-laning-and-load-shedding...clintropolis:query-auto-prioritization

Lane based TierSelectorStrategy

Part of the motivation for laning is providing Druid with the means to avoid the necessity to run multiple pools of brokers. One of the reasons this is currently done is to have separate pools of Brokers which have differently configured tier selection strategies to router different types of queries to reduce historical contention (druid.broker.select.tier, etc). By extending QueryLaningStrategy to allow fetching a TierSelectorStrategy, e.g.:

TierSelectorStrategy getTierSelectorStrategy(Query<T> query);

which the brokers CachingClusteredClient could use along with a lightly modified ServerSelector that allows providing a TierSelectorStrategy to the pick method in order to override the default broker configuration.

Tier based laning

Adding a new TieredQueryLaningStrategy to allow capacity control based on the historical tier that a query will be routed to, for scenarios where historical tiers serve disjoint intervals or datasources. This is to serve as a partial replacement/alternative to druid.router.tierToBrokerMap, which allows a Druid router to route queries to different pools of Druid brokers based on what tier a query belongs to, allowing partitioning of queries between different pools of resources. Instead, this will be able to be done within a single broker pool, reserving capacity for queries going to each lane.

Better utilization and automatic cancellation of lower priority queries

The initial solution will introduce an under-utilization problem, if all queries are low priority they will still be limited by the lane limits. Longer term, we should be able to utilize the entire HTTP thread pool with any lane of query, and cancel them when higher priority lanes come in.

Lane limit enforcement on historicals and peons/indexers

Everything will already have a QueryScheduler as of this PR, because it is the only implementation of QueryWatcher, we just need to wire a call to run into these services to allow them to enforce these limits as well. On the broker side, we will want to watch for these responses and cancel the query for over-running capacity.

Adaptive limits

The current iteration of this PR is going to require a bunch of calculations to figure out what the correct lane limits are for a given cluster size. Longer term, it would be nice for these to be adaptively computed to adjust lane limits to be higher or lower based on cluster size and/or back-pressure from historicals.

Processing pool enhancements

With lane limits in place, we can explore additional options for how the processing pool functions such as exploring preemption of lower priority queries by higher priority lanes. Using the existing priorities, lower priority queries can already experience starvation, but this becomes greatly exaggerated if the processing pool allows preempting the per segment results. But with lane limits in place, we can ensure that the per query resources such as the HTTP thread pools cannot be overrun by starving low priority queries.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • QueryManager -> QueryScheduler
  • QuerySchedulerConfig
  • QueryLaningStrategy and implementations
  • QueryResource and SqlResource

@@ -1476,9 +1476,31 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|

##### Query laning

Druid provides facilities to aid in query capacity reservation for heterogenous query workloads in the form of 'laning' strategies, which provide a variety of mechanisms examine and classify a query at the broker, assigning it to a 'lane'. Lanes are defined with capacity limits which the broker will enforce, causing requests in excess of the capacity to be discarded with an HTTP 429 status code, reserving resources for other lanes or for interactive queries (with no lane).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a variety of mechanisms examine
I believe a "to" is missing here ?

@himanshug
Copy link
Contributor

/subscribe

@clintropolis clintropolis removed the WIP label Mar 3, 2020
@clintropolis
Copy link
Member Author

Are there any useful diagnostics (e.g., log messages) that can be added?

These will be caught and logged as having errors in the request logger, which maybe seems sufficient for a first pass, but am willing to add additional information if we think it would be useful. I suppose periodic metrics on the amount of queries that get rejected could be potentially interesting in helping operators know when to either scale up or tell the users of certain lanes to chill out a bit maybe? I could look into that as a follow-up PR if we think that or something else would be useful.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

public Object2IntMap<String> getLaneLimits(int totalLimit)
{
Object2IntMap<String> onlyLow = new Object2IntArrayMap<>(1);
onlyLow.put(LOW, Ints.checkedCast(Math.round(totalLimit * ((double) maxLowPercent / 100))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior here is a bit different from the docs, which say the value is rounded up.

Since this allows a lane limit of zero or the total limit, it is a bit inconsistent with disallowing maxLowPercent from being 0 or 100 percent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I meant to round up, will adjust

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, good catch. fixed, clarified error and docs, and added some tests

@clintropolis
Copy link
Member Author

Thanks for review @jihoonson and @ccaominh 🤘

@sascha-coenen or @himanshug (or anyone else) have any comments?

Copy link

@sthetland sthetland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc review: it looks pretty good as is, but have added a few suggested rewrites to consider....optional.

@@ -1476,9 +1476,35 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|

##### Query laning

The Broker provides facilities to aid in query capacity reservation for heterogeneous query workloads in the form of 'laning' strategies, which provide a variety of mechanisms to examine and classify a query, assigning it to a 'lane'. Lanes are defined with capacity limits which the broker will enforce, causing requests in excess of the capacity to be discarded with an HTTP 429 status code, reserving resources for other lanes or for interactive queries (with no lane).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not bad as is, but if i were to try to boil it down just a little, it might be something like:

Suggested change
The Broker provides facilities to aid in query capacity reservation for heterogeneous query workloads in the form of 'laning' strategies, which provide a variety of mechanisms to examine and classify a query, assigning it to a 'lane'. Lanes are defined with capacity limits which the broker will enforce, causing requests in excess of the capacity to be discarded with an HTTP 429 status code, reserving resources for other lanes or for interactive queries (with no lane).
*Laning strategies* allow you to control capacity utilization for heterogeneous query workloads. With laning, the broker examines and classifies a query for the purpose of assigning it to a 'lane'. Lanes have capacity limits, enforced by the broker, that ensure sufficient resources are available for other lanes or for interactive queries (with no lane). Requests in excess of the capacity are discarded with an HTTP 429 status code.

Copy link
Member Author

@clintropolis clintropolis Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, applied with some minor adjustment

docs/configuration/index.md Outdated Show resolved Hide resolved
@himanshug
Copy link
Contributor

@clintropolis sorry, I will take a look at this tonight. that said, it is already approved by ppl so good to merge irrespectively.

Copy link
Contributor

@himanshug himanshug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a good starting point. LGTM overall, I think interfaces might need some adjustments as more use cases are made concrete. But, the two PRs (this + clintropolis/druid@query-laning-and-load-shedding...clintropolis:query-auto-prioritization ) would fulfill the original scope of lanes at broker(to control jetty thread pool utilization at broker) and auto query prioritization as described in "Proposed Changes" in the description of #6993 .

public class QuerySchedulerConfig
{
@JsonProperty
private Integer numThreads = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this Integer and not int considering it has a default non-null value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that is a good point, will change in one of the follow-up PRs.


|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.numThreads`|Maximum number of HTTP threads to dedicate to query processing. To save HTTP thread capacity, this should be lower than `druid.server.http.numThreads`.|Unbounded|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what use case would I ever want to set it something other than druid.server.http.numThreads ? If the recommendation is to set it lower than druid.server.http.numThreads then why the default value is not set to druid.server.http.numThreads - 1 ?
I guess, as a user I don't quite understand the importance of setting this higher/same/lower compared to druid.server.http.numThreads and when I should choose one vs the other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what use case would I ever want to set it something other than druid.server.http.numThreads ?

I actually think we might always want to set it lower than druid.server.http.numThreads, but I was too nervous to make this the default and made it opt in behavior instead (since it grabs and releases locks for each query if there is some bug in releasing locks a broker would eventually stop accepting queries entirely). The primary reason I think we want it lower than druid.server.http.numThreads is to save some 'slack' space for non-query http connections, like accepting health checks, lookup management, and other such things that can be starved when long running queries start to pile up.

If the recommendation is to set it lower than druid.server.http.numThreads then why the default value is not set to druid.server.http.numThreads - 1 ?

See my above nervousness, but I think druid.server.http.numThreads - 1 would probably be a good default. This might want to be adjusted to be even lower depending on how much other non query http traffic the server is receiving (e.g. frequently polled/updated lookups, etc).

I guess, as a user I don't quite understand the importance of setting this higher/same/lower compared to druid.server.http.numThreads and when I should choose one vs the other.

I would agree the current documentation doesn't quite adequately describe how this stuff might be utilized, in a future PR i want to add a section to cluster tuning docs to more properly advise on when and how to set this stuff up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I understand the reasoning now.
lookup end points already have a QoS filter to never consume more than two threads from jetty, I wonder if in this world it makes sense to setup QoS filter for non-query endpoints(say hardcoded to 2) so that we can ensure that they don't end up consuming more jetty threads than intended.
then default druid.query.scheduler.numThreads = druid.server.http.numThreads - numReservedForOthers=4 and users would likely never be expected to touch druid.query.scheduler.numThreads .

Major behavior change with lane usage is really losing the queuing of requests to handle spikes and instead sending 429s immediately. In future, we could introduce mechanism to maintain statically/dynamically sized [per lane] waiting queue ourselves as well along with concurrency limits in lane strategy.

Copy link
Member Author

@clintropolis clintropolis Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would make sense to instead move towards automatically computing druid.server.http.numThreads, since maybe it is easier for operators to only have to think about the number of concurrent queries to serve and just set druid.query.scheduler.numThreads? Druid could probably automatically figure out how many more http threads it needs based on configuration.

Major behavior change with lane usage is really losing the queuing of requests to handle spikes and instead sending 429s immediately. In future, we could introduce mechanism to maintain statically/dynamically sized [per lane] waiting queue ourselves as well along with concurrency limits in lane strategy.

Yeah the current behavior is definitely a hard stop if you are over the line. I agree it would make sense to allow some sort of timed out queuing behavior, which is what jetty QoS filter can sort of provide, which is a large part of why I am still wondering if druid.query.scheduler.numThreads should be a QoS filter instead of enforced as an actual lane like it is currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine to let user provide druid.query.scheduler.numThreads and compute druid.server.http.numThreads , just that one of those should not be touched by user in most cases.

There are few advantages in maintaining the queues ourselves and not letting jetty do it.

  • we have no control over jetty queue, if a request is dropped then end user sees that as a TCP connection close and not a HTTP 429. So, to client, it is not clear whether to retry or backoff.
  • we don't know how much time request waited in jetty queue, consequently request time metrics don't account for that.
  • jetty queue is [probably] static in size, if we managed it ourselves then we have the option of keeping dynamically sized queues and do potentially other cool things.

* Provide a map of lane names to the limit on the number of concurrent queries for that lane
* @param totalLimit
*/
Object2IntMap<String> getLaneLimits(int totalLimit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it expected that sum(returned_map.values) < totalLimit ?
nit: Also wonder why limits returned here were not percentages but absolute counts given that each implementation would probably end up doing that calculation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it expected that sum(returned_map.values) < totalLimit ?

Right now I am not requiring this needs to be true since the limits are not guaranteed capacity, but rather maximums. It seemed more flexible to leave it to individual QueryLaningStrategy implementations to enforce that if they wish.

nit: Also wonder why limits returned here were not percentages but absolute counts given that each implementation would probably end up doing that calculation.

Hmm, I think that is definitely worth considering, though @jihoonson was asking for absolute limits in this comment #9407 (comment), so in the very least I will add a utility method to the QueryLaningStrategy interface for doing this conversion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe, that happens with multiple reviewers. Yeah, having a utility method is equally good.

@clintropolis
Copy link
Member Author

Thanks for taking a look @himanshug!

@clintropolis clintropolis merged commit 8b9fe6f into apache:master Mar 10, 2020
@clintropolis clintropolis deleted the query-laning-and-load-shedding branch March 10, 2020 09:57
@jihoonson jihoonson added this to the 0.18.0 milestone Mar 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants