Skip to content

Commit

Permalink
Broker backpressure. (#6313)
Browse files Browse the repository at this point in the history
* Broker backpressure.

Adds a new property "druid.broker.http.maxQueuedBytes" and a new context
parameter "maxQueuedBytes". Both represent a maximum number of bytes queued
per query before exerting backpressure on the channel to the data server.

Fixes #4933.

* Fix query context doc.
  • Loading branch information
gianm committed Sep 10, 2018
1 parent 4669f08 commit d6cbdf8
Show file tree
Hide file tree
Showing 28 changed files with 429 additions and 134 deletions.
18 changes: 12 additions & 6 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None|

#### Concurrent Requests
#### Server Configuration

Druid uses Jetty to serve HTTP requests.

Expand All @@ -1178,16 +1178,22 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5M|
|`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. Queries that exceed this limit will fail. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE|
|`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|`PT0S` (do not wait)|
|`druid.server.http.unannouncePropagationDelay`|How long to wait for zookeeper unannouncements to propagate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0S` (do not wait)|
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
|`druid.broker.http.unusedConnectionTimeout`|The timeout for idle connections in connection pool. This timeout should be less than `druid.broker.http.readTimeout`. Set this timeout = ~90% of `druid.broker.http.readTimeout`|`PT4M`|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|
|`druid.server.http.maxRequestHeaderSize`|Maximum size of a request header in bytes. Larger headers consume more memory and can make a server more vulnerable to denial of service attacks. |8 * 1024|

#### Client Configuration

Druid Brokers use an HTTP client to communicate with with data servers (historical servers and real-time tasks). This
client has the following configuration options.

|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical servers and real-time tasks.|PT15M|
|`druid.broker.http.unusedConnectionTimeout`|The timeout for idle connections in connection pool. This timeout should be less than `druid.broker.http.readTimeout`. Set this timeout = ~90% of `druid.broker.http.readTimeout`|`PT4M`|
|`druid.broker.http.maxQueuedBytes`|Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `druid.server.http.maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled. Can be overridden by the ["maxQueuedBytes" query context parameter](../querying/query-context.html).|0 (disabled)|

#### Retry Policy

Expand Down
3 changes: 2 additions & 1 deletion docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ The query context is used for various query configuration parameters. The follow
|property |default | description |
|-----------------|----------------------------------------|----------------------|
|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](../configuration/index.html#broker) |
|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](../configuration/index.html#broker) for more details.|
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache |
Expand All @@ -20,6 +19,8 @@ The query context is used for various query configuration parameters. The follow
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](../configuration/index.html#broker) for more details.|
|maxQueuedBytes | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.|
|serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node|
|serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private static class ResponseHandler implements HttpResponseHandler<StatusRespon
protected static final Logger log = new Logger(ResponseHandler.class);

@Override
public ClientResponse<StatusResponseHolder> handleResponse(HttpResponse response)
public ClientResponse<StatusResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
return ClientResponse.unfinished(
new StatusResponseHolder(
Expand All @@ -212,7 +212,8 @@ public ClientResponse<StatusResponseHolder> handleResponse(HttpResponse response
@Override
public ClientResponse<StatusResponseHolder> handleChunk(
ClientResponse<StatusResponseHolder> response,
HttpChunk chunk
HttpChunk chunk,
long chunkNum
)
{
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class BytesFullResponseHandler implements HttpResponseHandler<FullResponseHolder, FullResponseHolder>
{
@Override
public ClientResponse<FullResponseHolder> handleResponse(HttpResponse response)
public ClientResponse<FullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
BytesFullResponseHolder holder = new BytesFullResponseHolder(
response.getStatus(),
Expand All @@ -47,7 +47,8 @@ public ClientResponse<FullResponseHolder> handleResponse(HttpResponse response)
@Override
public ClientResponse<FullResponseHolder> handleChunk(
ClientResponse<FullResponseHolder> response,
HttpChunk chunk
HttpChunk chunk,
long chunkNum
)
{
BytesFullResponseHolder holder = (BytesFullResponseHolder) response.getObj();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import com.google.common.base.Function;
import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.common.logger.Logger;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpResponse;
Expand All @@ -49,7 +49,7 @@ public ResponseCookieHandler(URI uri, CookieManager manager, HttpResponseHandler
}

@Override
public ClientResponse<Intermediate> handleResponse(HttpResponse httpResponse)
public ClientResponse<Intermediate> handleResponse(HttpResponse httpResponse, TrafficCop trafficCop)
{
try {
final HttpHeaders headers = httpResponse.headers();
Expand All @@ -66,14 +66,18 @@ public List<String> apply(String input)
log.error(e, "Error while processing Cookies from header");
}
finally {
return delegate.handleResponse(httpResponse);
return delegate.handleResponse(httpResponse, trafficCop);
}
}

@Override
public ClientResponse<Intermediate> handleChunk(ClientResponse<Intermediate> clientResponse, HttpChunk httpChunk)
public ClientResponse<Intermediate> handleChunk(
ClientResponse<Intermediate> clientResponse,
HttpChunk httpChunk,
long chunkNum
)
{
return delegate.handleChunk(clientResponse, httpChunk);
return delegate.handleChunk(clientResponse, httpChunk, chunkNum);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

package org.apache.druid.security.kerberos;

import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.common.logger.Logger;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -39,29 +39,33 @@ public RetryIfUnauthorizedResponseHandler(HttpResponseHandler<Intermediate, Fina
}

@Override
public ClientResponse<RetryResponseHolder<Intermediate>> handleResponse(HttpResponse httpResponse)
public ClientResponse<RetryResponseHolder<Intermediate>> handleResponse(
HttpResponse httpResponse,
TrafficCop trafficCop
)
{
log.debug("UnauthorizedResponseHandler - Got response status [%s]", httpResponse.getStatus());
if (httpResponse.getStatus().equals(HttpResponseStatus.UNAUTHORIZED)) {
// Drain the buffer
httpResponse.getContent().toString();
return ClientResponse.unfinished(RetryResponseHolder.retry());
} else {
return wrap(httpResponseHandler.handleResponse(httpResponse));
return wrap(httpResponseHandler.handleResponse(httpResponse, trafficCop));
}
}

@Override
public ClientResponse<RetryResponseHolder<Intermediate>> handleChunk(
ClientResponse<RetryResponseHolder<Intermediate>> clientResponse,
HttpChunk httpChunk
HttpChunk httpChunk,
long chunkNum
)
{
if (clientResponse.getObj().shouldRetry()) {
httpChunk.getContent().toString();
return clientResponse;
} else {
return wrap(httpResponseHandler.handleChunk(unwrap(clientResponse), httpChunk));
return wrap(httpResponseHandler.handleChunk(unwrap(clientResponse), httpChunk, chunkNum));
}
}

Expand All @@ -84,9 +88,12 @@ public void exceptionCaught(ClientResponse<RetryResponseHolder<Intermediate>> cl
private <T> ClientResponse<RetryResponseHolder<T>> wrap(ClientResponse<T> response)
{
if (response.isFinished()) {
return ClientResponse.finished(new RetryResponseHolder<T>(false, response.getObj()));
return ClientResponse.finished(new RetryResponseHolder<>(false, response.getObj()));
} else {
return ClientResponse.unfinished(new RetryResponseHolder<T>(false, response.getObj()));
return ClientResponse.unfinished(
new RetryResponseHolder<>(false, response.getObj()),
response.isContinueReading()
);
}
}

Expand All @@ -95,7 +102,7 @@ private <T> ClientResponse<T> unwrap(ClientResponse<RetryResponseHolder<T>> resp
if (response.isFinished()) {
return ClientResponse.finished(response.getObj().getObj());
} else {
return ClientResponse.unfinished(response.getObj().getObj());
return ClientResponse.unfinished(response.getObj().getObj(), response.isContinueReading());
}
}

Expand Down
Loading

0 comments on commit d6cbdf8

Please sign in to comment.