Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query laning and load shedding #9407

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6220985
prototype
clintropolis Jan 29, 2020
3f41001
merge QueryScheduler and QueryManager
clintropolis Jan 31, 2020
feae8b1
everything in its right place
clintropolis Feb 25, 2020
554b8b5
adjustments
clintropolis Feb 25, 2020
0597c3f
docs
clintropolis Feb 26, 2020
405c94e
fixes
clintropolis Feb 27, 2020
e22ece1
doc fixes
clintropolis Feb 28, 2020
e98cad7
use resilience4j instead of semaphore
clintropolis Feb 28, 2020
2069437
more tests
clintropolis Feb 28, 2020
eaf1449
simplify
clintropolis Feb 28, 2020
688ca43
checkstyle
clintropolis Feb 28, 2020
f0b3f9f
spelling
clintropolis Feb 28, 2020
87c6cbd
oops heh
clintropolis Feb 28, 2020
5e91bcb
remove unused
clintropolis Feb 28, 2020
912b7bc
simplify
clintropolis Feb 29, 2020
1e384bf
concurrency tests
clintropolis Mar 2, 2020
60861a4
add SqlResource tests, refactor error response
clintropolis Mar 3, 2020
9aed16e
add json config tests
clintropolis Mar 4, 2020
419ab98
use LongAdder instead of AtomicLong
clintropolis Mar 4, 2020
f0d39e1
remove test only stuffs from scheduler
clintropolis Mar 4, 2020
2afaaf1
javadocs, etc
clintropolis Mar 4, 2020
ef029c4
style
clintropolis Mar 4, 2020
059a2d4
partial review stuffs
clintropolis Mar 6, 2020
0ec8a26
adjust
clintropolis Mar 6, 2020
5711fce
review stuffs
clintropolis Mar 7, 2020
aa73a14
more javadoc
clintropolis Mar 7, 2020
50847af
error response documentation
clintropolis Mar 7, 2020
abe3631
spelling
clintropolis Mar 7, 2020
86501ef
preserve user specified lane for NoSchedulingStrategy
clintropolis Mar 7, 2020
8b7b70d
more test, why not
clintropolis Mar 7, 2020
91ad9d9
doc adjustment
clintropolis Mar 7, 2020
373fd11
style
clintropolis Mar 7, 2020
2741501
missed review for make a thing a constant
clintropolis Mar 9, 2020
8575cf8
fixes and tests
clintropolis Mar 9, 2020
25a8bda
fix test
clintropolis Mar 10, 2020
32965a4
Update docs/configuration/index.md
clintropolis Mar 10, 2020
361e06e
Merge remote-tracking branch 'upstream/master' into query-laning-and-…
clintropolis Mar 10, 2020
3ba7808
doc update
clintropolis Mar 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -338,7 +341,8 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
new CacheConfig(),
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool
forkJoinPool,
new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
);
}

Expand Down
1 change: 1 addition & 0 deletions distribution/bin/check-licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def build_compatible_license_names():
compatible_licenses['Apache License, Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['The Apache Software License, Version 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache-2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache 2'] = 'Apache License version 2.0'
compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0'
compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0'
Expand Down
28 changes: 27 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1481,9 +1481,35 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|

##### Query laning

*Laning strategies* allow you to control capacity utilization for heterogeneous query workloads. With laning, the broker examines and classifies a query for the purpose of assigning it to a 'lane'. Lanes have capacity limits, enforced by the broker, that can be used to ensure sufficient resources are available for other lanes or for interactive queries (with no lane), or to limit overall throughput for queries within the lane. Requests in excess of the capacity are discarded with an HTTP 429 status code.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.numThreads`|Maximum number of HTTP threads to dedicate to query processing. To save HTTP thread capacity, this should be lower than `druid.server.http.numThreads`.|Unbounded|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what use case would I ever want to set it something other than druid.server.http.numThreads ? If the recommendation is to set it lower than druid.server.http.numThreads then why the default value is not set to druid.server.http.numThreads - 1 ?
I guess, as a user I don't quite understand the importance of setting this higher/same/lower compared to druid.server.http.numThreads and when I should choose one vs the other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what use case would I ever want to set it something other than druid.server.http.numThreads ?

I actually think we might always want to set it lower than druid.server.http.numThreads, but I was too nervous to make this the default and made it opt in behavior instead (since it grabs and releases locks for each query if there is some bug in releasing locks a broker would eventually stop accepting queries entirely). The primary reason I think we want it lower than druid.server.http.numThreads is to save some 'slack' space for non-query http connections, like accepting health checks, lookup management, and other such things that can be starved when long running queries start to pile up.

If the recommendation is to set it lower than druid.server.http.numThreads then why the default value is not set to druid.server.http.numThreads - 1 ?

See my above nervousness, but I think druid.server.http.numThreads - 1 would probably be a good default. This might want to be adjusted to be even lower depending on how much other non query http traffic the server is receiving (e.g. frequently polled/updated lookups, etc).

I guess, as a user I don't quite understand the importance of setting this higher/same/lower compared to druid.server.http.numThreads and when I should choose one vs the other.

I would agree the current documentation doesn't quite adequately describe how this stuff might be utilized, in a future PR i want to add a section to cluster tuning docs to more properly advise on when and how to set this stuff up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I understand the reasoning now.
lookup end points already have a QoS filter to never consume more than two threads from jetty, I wonder if in this world it makes sense to setup QoS filter for non-query endpoints(say hardcoded to 2) so that we can ensure that they don't end up consuming more jetty threads than intended.
then default druid.query.scheduler.numThreads = druid.server.http.numThreads - numReservedForOthers=4 and users would likely never be expected to touch druid.query.scheduler.numThreads .

Major behavior change with lane usage is really losing the queuing of requests to handle spikes and instead sending 429s immediately. In future, we could introduce mechanism to maintain statically/dynamically sized [per lane] waiting queue ourselves as well along with concurrency limits in lane strategy.

Copy link
Member Author

@clintropolis clintropolis Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would make sense to instead move towards automatically computing druid.server.http.numThreads, since maybe it is easier for operators to only have to think about the number of concurrent queries to serve and just set druid.query.scheduler.numThreads? Druid could probably automatically figure out how many more http threads it needs based on configuration.

Major behavior change with lane usage is really losing the queuing of requests to handle spikes and instead sending 429s immediately. In future, we could introduce mechanism to maintain statically/dynamically sized [per lane] waiting queue ourselves as well along with concurrency limits in lane strategy.

Yeah the current behavior is definitely a hard stop if you are over the line. I agree it would make sense to allow some sort of timed out queuing behavior, which is what jetty QoS filter can sort of provide, which is a large part of why I am still wondering if druid.query.scheduler.numThreads should be a QoS filter instead of enforced as an actual lane like it is currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine to let user provide druid.query.scheduler.numThreads and compute druid.server.http.numThreads , just that one of those should not be touched by user in most cases.

There are few advantages in maintaining the queues ourselves and not letting jetty do it.

  • we have no control over jetty queue, if a request is dropped then end user sees that as a TCP connection close and not a HTTP 429. So, to client, it is not clear whether to retry or backoff.
  • we don't know how much time request waited in jetty queue, consequently request time metrics don't account for that.
  • jetty queue is [probably] static in size, if we managed it ourselves then we have the option of keeping dynamically sized queues and do potentially other cool things.

|`druid.query.scheduler.laning.strategy`|Query laning strategy to use to assign queries to a lane in order to control capacities for certain classes of queries.|`none`|

##### Laning strategies

###### No laning strategy

In this mode, queries are never assigned a lane, and the concurrent query count will only be limited by `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, if set. This is the default Druid query scheduler operating mode. Enable this strategy explicitly by setting `druid.query.scheduler.laning.strategy` to `none`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding:

This strategy can be enabled by setting druid.query.scheduler.laning.type to none.

###### 'High/Low' laning strategy
This laning strategy splits queries with a `priority` below zero into a `low` query lane, automatically. Queries with priority of zero (the default) or above are considered 'interactive'. The limit on `low` queries can be set to some desired percentage of the total capacity (or HTTP thread pool size), reserving capacity for interactive queries. Queries in the `low` lane are _not_ guaranteed their capacity, which may be consumed by interactive queries, but may use up to this limit if total capacity is available.

If the `low` lane is specified in the [query context](../querying/query-context.md) `lane` parameter, this will override the computed lane.

This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=hilo`.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of`druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode|

##### Server Configuration

Druid uses Jetty to serve HTTP requests.
Druid uses Jetty to serve HTTP requests. Each query being processed consumes a single thread from `druid.server.http.numThreads`, so consider defining `druid.query.scheduler.numThreads` to a lower value in order to reserve HTTP threads for responding to health checks, lookup loading, and other non-query, and in most cases comparatively very short lived, HTTP requests.

|Property|Description|Default|
|--------|-----------|-------|
Expand Down
1 change: 1 addition & 0 deletions docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The query context is used for various query configuration parameters. The follow
|-----------------|----------------------------------------|----------------------|
|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [Broker configuration](../configuration/index.html#broker) |
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.html#broker) for more details.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache |
|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache |
Expand Down
2 changes: 2 additions & 0 deletions docs/querying/querying.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ If a query fails, you will get an HTTP 500 response containing a JSON object wit
}
```

If a query request fails due to being limited by the [query scheduler laning configuration](../configuration/index.md#broker), an HTTP 429 response with the same JSON object schema as an error response, but with `errorMessage` of the form: "Total query capacity exceeded" or "Query capacity exceeded for lane 'low'".

The fields in the response are:

|field|description|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
Expand Down Expand Up @@ -361,7 +363,8 @@ public String getFormatString()
return null;
}
},
ForkJoinPool.commonPool()
ForkJoinPool.commonPool(),
new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
);

ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ private TaskToolbox makeToolbox(
new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
public void registerQueryFuture(Query query, ListenableFuture future)
{
// do nothing
}
Expand Down
22 changes: 22 additions & 0 deletions licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,17 @@ libraries:

---

name: Resilience4j
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.3.1
libraries:
- io.github.resilience4j: resilience4j-core
- io.github.resilience4j: resilience4j-bulkhead

---

name: RoaringBitmap
license_category: binary
module: java-core
Expand All @@ -1880,6 +1891,17 @@ libraries:

---

name: vavr
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 0.10.2
libraries:
- io.vavr: vavr
- io.vavr: vavr-match

---

name: Config Magic
license_category: binary
module: java-core
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<log4j.version>2.8.2</log4j.version>
<netty3.version>3.10.6.Final</netty3.version>
<resilience4j.version>1.3.1</resilience4j.version>
<!-- Spark updated in https://github.com/apache/spark/pull/19884 -->
<netty4.version>4.1.45.Final</netty4.version>
<node.version>v10.14.2</node.version>
Expand Down Expand Up @@ -1181,6 +1182,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>${resilience4j.version}</version>
</dependency>

<dependency>
<groupId>org.testng</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Iterable<T> call()
)
);

queryWatcher.registerQuery(query, futures);
queryWatcher.registerQueryFuture(query, futures);

try {
return new MergeIterable<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void waitForFutureCompletion(
)
{
try {
queryWatcher.registerQuery(query, future);
queryWatcher.registerQueryFuture(query, future);
if (QueryContexts.hasTimeout(query)) {
future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
Expand Down
7 changes: 7 additions & 0 deletions processing/src/main/java/org/apache/druid/query/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.granularity.Granularity;
Expand Down Expand Up @@ -146,4 +147,10 @@ default Query<T> optimizeForSegment(PerSegmentQueryOptimizationContext optimizat
{
return this;
}

default Query<T> withLane(String lane)
{
return withOverriddenContext(ImmutableMap.of(QueryContexts.LANE_KEY, lane));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public class QueryContexts
{
public static final String PRIORITY_KEY = "priority";
public static final String LANE_KEY = "lane";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this added to the docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I hadn't documented since I hadn't decided the behavior of whether or not a lane specified in the query context by the user should override the laning strategy, or if it should be laning strategy specific, see other comment about this

public static final String TIMEOUT_KEY = "timeout";
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes";
Expand Down Expand Up @@ -202,6 +203,11 @@ public static <T> int getPriority(Query<T> query, int defaultValue)
return parseInt(query, PRIORITY_KEY, defaultValue);
}

public static <T> String getLane(Query<T> query)
{
return (String) query.getContextValue(LANE_KEY);
}

public static <T> boolean getEnableParallelMerges(Query<T> query)
{
return parseBoolean(query, BROKER_PARALLEL_MERGE_KEY, DEFAULT_ENABLE_PARALLEL_MERGE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

/**
* Base serializable error response
*
* QueryResource and SqlResource are expected to emit the JSON form of this object when errors happen.
*/
public class QueryException extends RuntimeException
{
private final String errorCode;
private final String errorClass;
private final String host;

public QueryException(Throwable cause, String errorCode, String errorClass, String host)
{
super(cause == null ? null : cause.getMessage(), cause);
this.errorCode = errorCode;
this.errorClass = errorClass;
this.host = host;
}

@JsonCreator
public QueryException(
@JsonProperty("error") @Nullable String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") @Nullable String errorClass,
@JsonProperty("host") @Nullable String host
)
{
super(errorMessage);
this.errorCode = errorCode;
this.errorClass = errorClass;
this.host = host;
}

@Nullable
@JsonProperty("error")
public String getErrorCode()
{
return errorCode;
}

@JsonProperty("errorMessage")
@Override
public String getMessage()
{
return super.getMessage();
}

@JsonProperty
public String getErrorClass()
{
return errorClass;
}

@JsonProperty
public String getHost()
{
return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* The QueryResource is expected to emit the JSON form of this object when errors happen, and the DirectDruidClient
* deserializes and wraps them.
*/
public class QueryInterruptedException extends RuntimeException
public class QueryInterruptedException extends QueryException
{
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_TIMEOUT = "Query timeout";
Expand All @@ -52,10 +52,6 @@ public class QueryInterruptedException extends RuntimeException
public static final String UNSUPPORTED_OPERATION = "Unsupported operation";
public static final String UNKNOWN_EXCEPTION = "Unknown exception";

private final String errorCode;
private final String errorClass;
private final String host;

@JsonCreator
public QueryInterruptedException(
@JsonProperty("error") @Nullable String errorCode,
Expand All @@ -64,10 +60,7 @@ public QueryInterruptedException(
@JsonProperty("host") @Nullable String host
)
{
super(errorMessage);
this.errorCode = errorCode;
this.errorClass = errorClass;
this.host = host;
super(errorCode, errorMessage, errorClass, host);
}

/**
Expand All @@ -83,36 +76,7 @@ public QueryInterruptedException(Throwable cause)

public QueryInterruptedException(Throwable cause, String host)
{
super(cause == null ? null : cause.getMessage(), cause);
this.errorCode = getErrorCodeFromThrowable(cause);
this.errorClass = getErrorClassFromThrowable(cause);
this.host = host;
}

@Nullable
@JsonProperty("error")
public String getErrorCode()
{
return errorCode;
}

@JsonProperty("errorMessage")
@Override
public String getMessage()
{
return super.getMessage();
}

@JsonProperty
public String getErrorClass()
{
return errorClass;
}

@JsonProperty
public String getHost()
{
return host;
super(cause, getErrorCodeFromThrowable(cause), getErrorClassFromThrowable(cause), host);
}

@Override
Expand All @@ -121,9 +85,9 @@ public String toString()
return StringUtils.format(
"QueryInterruptedException{msg=%s, code=%s, class=%s, host=%s}",
getMessage(),
errorCode,
errorClass,
host
getErrorCode(),
getErrorClass(),
getHost()
);
}

Expand Down
Loading