Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query laning and load shedding #9407

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6220985
prototype
clintropolis Jan 29, 2020
3f41001
merge QueryScheduler and QueryManager
clintropolis Jan 31, 2020
feae8b1
everything in its right place
clintropolis Feb 25, 2020
554b8b5
adjustments
clintropolis Feb 25, 2020
0597c3f
docs
clintropolis Feb 26, 2020
405c94e
fixes
clintropolis Feb 27, 2020
e22ece1
doc fixes
clintropolis Feb 28, 2020
e98cad7
use resilience4j instead of semaphore
clintropolis Feb 28, 2020
2069437
more tests
clintropolis Feb 28, 2020
eaf1449
simplify
clintropolis Feb 28, 2020
688ca43
checkstyle
clintropolis Feb 28, 2020
f0b3f9f
spelling
clintropolis Feb 28, 2020
87c6cbd
oops heh
clintropolis Feb 28, 2020
5e91bcb
remove unused
clintropolis Feb 28, 2020
912b7bc
simplify
clintropolis Feb 29, 2020
1e384bf
concurrency tests
clintropolis Mar 2, 2020
60861a4
add SqlResource tests, refactor error response
clintropolis Mar 3, 2020
9aed16e
add json config tests
clintropolis Mar 4, 2020
419ab98
use LongAdder instead of AtomicLong
clintropolis Mar 4, 2020
f0d39e1
remove test only stuffs from scheduler
clintropolis Mar 4, 2020
2afaaf1
javadocs, etc
clintropolis Mar 4, 2020
ef029c4
style
clintropolis Mar 4, 2020
059a2d4
partial review stuffs
clintropolis Mar 6, 2020
0ec8a26
adjust
clintropolis Mar 6, 2020
5711fce
review stuffs
clintropolis Mar 7, 2020
aa73a14
more javadoc
clintropolis Mar 7, 2020
50847af
error response documentation
clintropolis Mar 7, 2020
abe3631
spelling
clintropolis Mar 7, 2020
86501ef
preserve user specified lane for NoSchedulingStrategy
clintropolis Mar 7, 2020
8b7b70d
more test, why not
clintropolis Mar 7, 2020
91ad9d9
doc adjustment
clintropolis Mar 7, 2020
373fd11
style
clintropolis Mar 7, 2020
2741501
missed review for make a thing a constant
clintropolis Mar 9, 2020
8575cf8
fixes and tests
clintropolis Mar 9, 2020
25a8bda
fix test
clintropolis Mar 10, 2020
32965a4
Update docs/configuration/index.md
clintropolis Mar 10, 2020
361e06e
Merge remote-tracking branch 'upstream/master' into query-laning-and-…
clintropolis Mar 10, 2020
3ba7808
doc update
clintropolis Mar 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -338,7 +340,8 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
new CacheConfig(),
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool
forkJoinPool,
new QueryScheduler(Integer.MAX_VALUE, NoQueryLaningStrategy.INSTANCE)
);
}

Expand Down
24 changes: 23 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1476,9 +1476,31 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|

##### Query laning

Druid provides facilities to aid in query capacity reservation for heterogenous query workloads in the form of 'laning' strategies, which provide a variety of mechanisms examine and classify a query at the broker, assigning it to a 'lane'. Lanes are defined with capacity limits which the broker will enforce, causing requests in excess of the capacity to be discarded with an HTTP 429 status code, reserving resources for other lanes or for interactive queries (with no lane).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a variety of mechanisms examine
I believe a "to" is missing here ?


|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.numThreads`|Maximum number of HTTP threads to dedicate to query processing. To save HTTP thread capacity, this should be lower than `druid.server.http.numThreads`.|Unbounded|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what use case would I ever want to set it something other than druid.server.http.numThreads ? If the recommendation is to set it lower than druid.server.http.numThreads then why the default value is not set to druid.server.http.numThreads - 1 ?
I guess, as a user I don't quite understand the importance of setting this higher/same/lower compared to druid.server.http.numThreads and when I should choose one vs the other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what use case would I ever want to set it something other than druid.server.http.numThreads ?

I actually think we might always want to set it lower than druid.server.http.numThreads, but I was too nervous to make this the default and made it opt in behavior instead (since it grabs and releases locks for each query if there is some bug in releasing locks a broker would eventually stop accepting queries entirely). The primary reason I think we want it lower than druid.server.http.numThreads is to save some 'slack' space for non-query http connections, like accepting health checks, lookup management, and other such things that can be starved when long running queries start to pile up.

If the recommendation is to set it lower than druid.server.http.numThreads then why the default value is not set to druid.server.http.numThreads - 1 ?

See my above nervousness, but I think druid.server.http.numThreads - 1 would probably be a good default. This might want to be adjusted to be even lower depending on how much other non query http traffic the server is receiving (e.g. frequently polled/updated lookups, etc).

I guess, as a user I don't quite understand the importance of setting this higher/same/lower compared to druid.server.http.numThreads and when I should choose one vs the other.

I would agree the current documentation doesn't quite adequately describe how this stuff might be utilized, in a future PR i want to add a section to cluster tuning docs to more properly advise on when and how to set this stuff up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I understand the reasoning now.
lookup end points already have a QoS filter to never consume more than two threads from jetty, I wonder if in this world it makes sense to setup QoS filter for non-query endpoints(say hardcoded to 2) so that we can ensure that they don't end up consuming more jetty threads than intended.
then default druid.query.scheduler.numThreads = druid.server.http.numThreads - numReservedForOthers=4 and users would likely never be expected to touch druid.query.scheduler.numThreads .

Major behavior change with lane usage is really losing the queuing of requests to handle spikes and instead sending 429s immediately. In future, we could introduce mechanism to maintain statically/dynamically sized [per lane] waiting queue ourselves as well along with concurrency limits in lane strategy.

Copy link
Member Author

@clintropolis clintropolis Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would make sense to instead move towards automatically computing druid.server.http.numThreads, since maybe it is easier for operators to only have to think about the number of concurrent queries to serve and just set druid.query.scheduler.numThreads? Druid could probably automatically figure out how many more http threads it needs based on configuration.

Major behavior change with lane usage is really losing the queuing of requests to handle spikes and instead sending 429s immediately. In future, we could introduce mechanism to maintain statically/dynamically sized [per lane] waiting queue ourselves as well along with concurrency limits in lane strategy.

Yeah the current behavior is definitely a hard stop if you are over the line. I agree it would make sense to allow some sort of timed out queuing behavior, which is what jetty QoS filter can sort of provide, which is a large part of why I am still wondering if druid.query.scheduler.numThreads should be a QoS filter instead of enforced as an actual lane like it is currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine to let user provide druid.query.scheduler.numThreads and compute druid.server.http.numThreads , just that one of those should not be touched by user in most cases.

There are few advantages in maintaining the queues ourselves and not letting jetty do it.

  • we have no control over jetty queue, if a request is dropped then end user sees that as a TCP connection close and not a HTTP 429. So, to client, it is not clear whether to retry or backoff.
  • we don't know how much time request waited in jetty queue, consequently request time metrics don't account for that.
  • jetty queue is [probably] static in size, if we managed it ourselves then we have the option of keeping dynamically sized queues and do potentially other cool things.

|`druid.query.scheduler.laning.type`|Query laning strategy to use to assign queries to a lane in order to control capacities for certain classes of queries.|`none`|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the setting name ("type") and the name in the docs ("strategy") should be made consistent.

In terms of documentation flow, it may be helpful to add a section below (e.g. "Laning strategy") and reference it here. "No laning strategy", etc. would be children on this new section.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

druid.query.scheduler.laning.strategy seems reasonable, will switch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With regards to documentation flow, I had made the suggested change in a branch I intend to follow-up with: clintropolis/druid@query-laning-and-load-shedding...clintropolis:query-auto-prioritization

I wasn't sure if it made sense to break down yet in this branch because only the laning strategy exists, where as in that branch the scheduler also now has a prioritization strategy. I can go ahead and pull that part back of the doc change back into this PR though.


###### No laning strategy

In this mode, queries are never assigned a lane and only limited by `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, if set. This is the default Druid query scheduler operating mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding:

This strategy can be enabled by setting druid.query.scheduler.laning.type to none.

###### 'High/Low' laning strategy
This laning strategy splits queries with a `priority` below zero into a `low` query lane, automatically. The limit on `low` queries can be set to some desired fraction of the total capacity (or HTTP thread pool size), reserving capacity for interactive queries. Queries in the `low` lane are _not_ guaranteed their capacity, which may be consumed by interactive queries, but may use up to this limit if total capacity is available.

This strategy can be enabled by setting `druid.query.scheduler.laning.type` to `hilo`.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.maxLowThreads`|Maximum number of HTTP threads that can be used by queries with a priority lower than 0.|No default, must be set if using this mode|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps rename to maxLowHttpThreads so that it's not confused with the number of processing threads

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I agree that it is particularly confusing, since the HTTP setting is druid.server.http.numThreads and the new scheduler 'total' lane is druid.query.scheduler.numThreads. Are you considering the hypothetical of if we ever decided to extend the concept of laning to the processing pool I guess?

Would this setting be better as a percentage, so one property could be applicable to either usage? It doesn't seem like it would be hard to switch, would just need to adjust QueryLaningStrategy.getLaneLimits to take the 'total' limit, and make ServerConfig available to the QueryScheduler so it could fall back to druid.server.http.numThreads if druid.query.scheduler.numThreads isn't set. ServerConfig should probably already be provided to QueryScheduler so it can treat the case where druid.query.scheduler.numThreads is higher than druid.server.http.numThreads the same as not setting the scheduler numThreads at all and ignoring total limiter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reworked/renamed to maxLowPercent to be more flexible


##### Server Configuration

Druid uses Jetty to serve HTTP requests.
Druid uses Jetty to serve HTTP requests. Each query being processed consumes a single thread from `druid.server.http.numThreads`, so consider defining `druid.query.scheduler.numThreads` to a lower value in order to reserve HTTP threads for responding to health checks, lookup loading, and other non-query, and in most cases comparatively very short lived, HTTP requests.

|Property|Description|Default|
|--------|-----------|-------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
Expand Down Expand Up @@ -361,7 +363,8 @@ public String getFormatString()
return null;
}
},
ForkJoinPool.commonPool()
ForkJoinPool.commonPool(),
new QueryScheduler(Integer.MAX_VALUE, NoQueryLaningStrategy.INSTANCE)
);

ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ private TaskToolbox makeToolbox(
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
public void registerQueryFuture(Query query, ListenableFuture future)
{
// do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Iterable<T> call()
)
);

queryWatcher.registerQuery(query, futures);
queryWatcher.registerQueryFuture(query, futures);

try {
return new MergeIterable<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void waitForFutureCompletion(
)
{
try {
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
if (QueryContexts.hasTimeout(query)) {
future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public class QueryContexts
{
public static final String PRIORITY_KEY = "priority";
public static final String LANE_KEY = "lane";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this added to the docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I hadn't documented since I hadn't decided the behavior of whether or not a lane specified in the query context by the user should override the laning strategy, or if it should be laning strategy specific, see other comment about this

public static final String TIMEOUT_KEY = "timeout";
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes";
Expand Down Expand Up @@ -200,6 +201,11 @@ public static <T> int getPriority(Query<T> query, int defaultValue)
return parseInt(query, PRIORITY_KEY, defaultValue);
}

public static <T> String getLane(Query<T> query)
{
return (String) query.getContextValue(LANE_KEY);
}

public static <T> boolean getEnableParallelMerges(Query<T> query)
{
return parseBoolean(query, BROKER_PARALLEL_MERGE_KEY, DEFAULT_ENABLE_PARALLEL_MERGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ public interface QueryWatcher
* @param query a query, which may be a subset of a larger query, as long as the underlying queryId is unchanged
* @param future the future holding the execution status of the query
*/
void registerQuery(Query query, ListenableFuture future);
void registerQueryFuture(Query<?> query, ListenableFuture<?> future);
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private void waitForFutureCompletion(
{
try {
if (queryWatcher != null) {
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
}

if (hasTimeout && timeout <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public Sequence<SegmentAnalysis> call()
}
);
try {
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
if (QueryContexts.hasTimeout(query)) {
return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public int getNumThreads()

Capture<ListenableFuture> capturedFuture = EasyMock.newCapture();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(
watcher.registerQueryFuture(
EasyMock.anyObject(),
EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))
);
Expand Down Expand Up @@ -207,7 +207,7 @@ public int getNumThreads()

Capture<ListenableFuture> capturedFuture = Capture.newInstance();
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
watcher.registerQuery(
watcher.registerQueryFuture(
EasyMock.anyObject(),
EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
public void registerQueryFuture(Query query, ListenableFuture future)
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
public void registerQueryFuture(Query query, ListenableFuture future)
{

}
Expand Down
Loading