From d6cbdf86c2e01a2bfbecb2263f7aad62bbbda5e2 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 10 Sep 2018 09:33:29 -0700 Subject: [PATCH] Broker backpressure. (#6313) * Broker backpressure. Adds a new property "druid.broker.http.maxQueuedBytes" and a new context parameter "maxQueuedBytes". Both represent a maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Fixes #4933. * Fix query context doc. --- docs/content/configuration/index.md | 18 ++- docs/content/querying/query-context.md | 3 +- .../security/basic/CommonCacheNotifier.java | 5 +- .../BytesFullResponseHandler.java | 5 +- .../kerberos/ResponseCookieHandler.java | 14 +- .../RetryIfUnauthorizedResponseHandler.java | 23 ++-- .../util/http/client/NettyHttpClient.java | 69 +++++++--- .../http/client/response/ClientResponse.java | 31 ++++- .../client/response/FullResponseHandler.java | 5 +- .../client/response/HttpResponseHandler.java | 51 ++++++- .../response/InputStreamResponseHandler.java | 6 +- .../SequenceInputStreamResponseHandler.java | 6 +- .../response/StatusResponseHandler.java | 5 +- ...equenceInputStreamResponseHandlerTest.java | 19 +-- .../org/apache/druid/query/QueryContexts.java | 6 +- .../org/apache/druid/query/QueryPlus.java | 13 ++ .../druid/client/CachingClusteredClient.java | 43 ++++-- .../druid/client/DirectDruidClient.java | 124 ++++++++++++------ .../druid/client/InputStreamHolder.java | 65 +++++++++ .../guice/http/DruidHttpClientConfig.java | 12 ++ .../coordination/ChangeRequestHttpSyncer.java | 4 +- .../server/coordinator/HttpLoadQueuePeon.java | 4 +- .../cache/LookupCoordinatorManager.java | 4 +- ...chingClusteredClientFunctionalityTest.java | 10 +- .../client/CachingClusteredClientTest.java | 8 ++ .../client/HttpServerInventoryViewTest.java | 4 +- .../ChangeRequestHttpSyncerTest.java | 4 +- .../coordinator/HttpLoadQueuePeonTest.java | 2 +- 28 files changed, 429 insertions(+), 134 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/client/InputStreamHolder.java diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index f60e60b28f38..f430db7e0fcc 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -1167,7 +1167,7 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi |`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`| |`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None| -#### Concurrent Requests +#### Server Configuration Druid uses Jetty to serve HTTP requests. @@ -1178,16 +1178,22 @@ Druid uses Jetty to serve HTTP requests. |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5M| |`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false| |`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| -|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE| +|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. Queries that exceed this limit will fail. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE| |`druid.server.http.gracefulShutdownTimeout`|The maximum amount of time Jetty waits after receiving shutdown signal. After this timeout the threads will be forcefully shutdown. This allows any queries that are executing to complete.|`PT0S` (do not wait)| |`druid.server.http.unannouncePropagationDelay`|How long to wait for zookeeper unannouncements to propagate before shutting down Jetty. This is a minimum and `druid.server.http.gracefulShutdownTimeout` does not start counting down until after this period elapses.|`PT0S` (do not wait)| -|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| -|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| -|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| -|`druid.broker.http.unusedConnectionTimeout`|The timeout for idle connections in connection pool. This timeout should be less than `druid.broker.http.readTimeout`. Set this timeout = ~90% of `druid.broker.http.readTimeout`|`PT4M`| |`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](../querying/query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE| |`druid.server.http.maxRequestHeaderSize`|Maximum size of a request header in bytes. Larger headers consume more memory and can make a server more vulnerable to denial of service attacks. |8 * 1024| +#### Client Configuration + +Druid Brokers use an HTTP client to communicate with with data servers (historical servers and real-time tasks). This +client has the following configuration options. + +|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| +|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| +|`druid.broker.http.readTimeout`|The timeout for data reads from historical servers and real-time tasks.|PT15M| +|`druid.broker.http.unusedConnectionTimeout`|The timeout for idle connections in connection pool. This timeout should be less than `druid.broker.http.readTimeout`. Set this timeout = ~90% of `druid.broker.http.readTimeout`|`PT4M`| +|`druid.broker.http.maxQueuedBytes`|Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `druid.server.http.maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled. Can be overridden by the ["maxQueuedBytes" query context parameter](../querying/query-context.html).|0 (disabled)| #### Retry Policy diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 9e3f87f92456..3df148b20b57 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -10,7 +10,6 @@ The query context is used for various query configuration parameters. The follow |property |default | description | |-----------------|----------------------------------------|----------------------| |timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](../configuration/index.html#broker) | -|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](../configuration/index.html#broker) for more details.| |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | @@ -20,6 +19,8 @@ The query context is used for various query configuration parameters. The follow |bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | +|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](../configuration/index.html#broker) for more details.| +|maxQueuedBytes | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.| |serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node| |serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node| diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java index d38af5e47221..21a009bb2304 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java @@ -199,7 +199,7 @@ private static class ResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { return ClientResponse.unfinished( new StatusResponseHolder( @@ -212,7 +212,8 @@ public ClientResponse handleResponse(HttpResponse response @Override public ClientResponse handleChunk( ClientResponse response, - HttpChunk chunk + HttpChunk chunk, + long chunkNum ) { return response; diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/BytesFullResponseHandler.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/BytesFullResponseHandler.java index 0f75ae2f3726..bb8196a0f892 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/BytesFullResponseHandler.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/BytesFullResponseHandler.java @@ -29,7 +29,7 @@ public class BytesFullResponseHandler implements HttpResponseHandler { @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { BytesFullResponseHolder holder = new BytesFullResponseHolder( response.getStatus(), @@ -47,7 +47,8 @@ public ClientResponse handleResponse(HttpResponse response) @Override public ClientResponse handleChunk( ClientResponse response, - HttpChunk chunk + HttpChunk chunk, + long chunkNum ) { BytesFullResponseHolder holder = (BytesFullResponseHolder) response.getObj(); diff --git a/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/ResponseCookieHandler.java b/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/ResponseCookieHandler.java index 02212ac91fed..50973447b57d 100644 --- a/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/ResponseCookieHandler.java +++ b/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/ResponseCookieHandler.java @@ -21,9 +21,9 @@ import com.google.common.base.Function; import com.google.common.collect.Maps; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.response.ClientResponse; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; -import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -49,7 +49,7 @@ public ResponseCookieHandler(URI uri, CookieManager manager, HttpResponseHandler } @Override - public ClientResponse handleResponse(HttpResponse httpResponse) + public ClientResponse handleResponse(HttpResponse httpResponse, TrafficCop trafficCop) { try { final HttpHeaders headers = httpResponse.headers(); @@ -66,14 +66,18 @@ public List apply(String input) log.error(e, "Error while processing Cookies from header"); } finally { - return delegate.handleResponse(httpResponse); + return delegate.handleResponse(httpResponse, trafficCop); } } @Override - public ClientResponse handleChunk(ClientResponse clientResponse, HttpChunk httpChunk) + public ClientResponse handleChunk( + ClientResponse clientResponse, + HttpChunk httpChunk, + long chunkNum + ) { - return delegate.handleChunk(clientResponse, httpChunk); + return delegate.handleChunk(clientResponse, httpChunk, chunkNum); } @Override diff --git a/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/RetryIfUnauthorizedResponseHandler.java b/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/RetryIfUnauthorizedResponseHandler.java index 06d765123957..9f3b732fc2c2 100644 --- a/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/RetryIfUnauthorizedResponseHandler.java +++ b/extensions-core/druid-kerberos/src/main/java/org/apache/druid/security/kerberos/RetryIfUnauthorizedResponseHandler.java @@ -19,9 +19,9 @@ package org.apache.druid.security.kerberos; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.response.ClientResponse; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; -import org.apache.druid.java.util.common.logger.Logger; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; @@ -39,7 +39,10 @@ public RetryIfUnauthorizedResponseHandler(HttpResponseHandler> handleResponse(HttpResponse httpResponse) + public ClientResponse> handleResponse( + HttpResponse httpResponse, + TrafficCop trafficCop + ) { log.debug("UnauthorizedResponseHandler - Got response status [%s]", httpResponse.getStatus()); if (httpResponse.getStatus().equals(HttpResponseStatus.UNAUTHORIZED)) { @@ -47,21 +50,22 @@ public ClientResponse> handleResponse(HttpResp httpResponse.getContent().toString(); return ClientResponse.unfinished(RetryResponseHolder.retry()); } else { - return wrap(httpResponseHandler.handleResponse(httpResponse)); + return wrap(httpResponseHandler.handleResponse(httpResponse, trafficCop)); } } @Override public ClientResponse> handleChunk( ClientResponse> clientResponse, - HttpChunk httpChunk + HttpChunk httpChunk, + long chunkNum ) { if (clientResponse.getObj().shouldRetry()) { httpChunk.getContent().toString(); return clientResponse; } else { - return wrap(httpResponseHandler.handleChunk(unwrap(clientResponse), httpChunk)); + return wrap(httpResponseHandler.handleChunk(unwrap(clientResponse), httpChunk, chunkNum)); } } @@ -84,9 +88,12 @@ public void exceptionCaught(ClientResponse> cl private ClientResponse> wrap(ClientResponse response) { if (response.isFinished()) { - return ClientResponse.finished(new RetryResponseHolder(false, response.getObj())); + return ClientResponse.finished(new RetryResponseHolder<>(false, response.getObj())); } else { - return ClientResponse.unfinished(new RetryResponseHolder(false, response.getObj())); + return ClientResponse.unfinished( + new RetryResponseHolder<>(false, response.getObj()), + response.isContinueReading() + ); } } @@ -95,7 +102,7 @@ private ClientResponse unwrap(ClientResponse> resp if (response.isFinished()) { return ClientResponse.finished(response.getObj().getObj()); } else { - return ClientResponse.unfinished(response.getObj().getObj()); + return ClientResponse.unfinished(response.getObj().getObj(), response.isContinueReading()); } } diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java index 78439a64aeb9..d15e1e6aa763 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -72,13 +73,6 @@ public class NettyHttpClient extends AbstractHttpClient private final HttpClientConfig.CompressionCodec compressionCodec; private final Duration defaultReadTimeout; - public NettyHttpClient( - ResourcePool pool - ) - { - this(pool, null, HttpClientConfig.DEFAULT_COMPRESSION_CODEC, null); - } - NettyHttpClient( ResourcePool pool, Duration defaultReadTimeout, @@ -138,6 +132,9 @@ public ListenableFuture go( ); } else { channel = channelFuture.getChannel(); + + // In case we get a channel that never had its readability turned back on. + channel.setReadable(true); } final String urlFile = StringUtils.nullToEmptyNonDruidDataString(url.getFile()); final HttpRequest httpRequest = new DefaultHttpRequest( @@ -183,6 +180,16 @@ public ListenableFuture go( { private volatile ClientResponse response = null; + // Chunk number most recently assigned. + private long currentChunkNum = 0; + + // Suspend and resume watermarks (respectively: last chunk number that triggered a suspend, and that was + // provided to the TrafficCop's resume method). Synchronized access since they are not always accessed + // from an I/O thread. (TrafficCops can be called from any thread.) + private final Object watermarkLock = new Object(); + private long suspendWatermark = -1; + private long resumeWatermark = -1; + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { @@ -198,11 +205,25 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) log.debug("[%s] Got response: %s", requestDesc, httpResponse.getStatus()); } - response = handler.handleResponse(httpResponse); + HttpResponseHandler.TrafficCop trafficCop = resumeChunkNum -> { + synchronized (watermarkLock) { + resumeWatermark = Math.max(resumeWatermark, resumeChunkNum); + + if (suspendWatermark >= 0 && resumeWatermark >= suspendWatermark) { + suspendWatermark = -1; + channel.setReadable(true); + log.debug("[%s] Resumed reads from channel (chunkNum = %,d).", requestDesc, resumeChunkNum); + } + } + }; + response = handler.handleResponse(httpResponse, trafficCop); if (response.isFinished()) { retVal.set((Final) response.getObj()); } + assert currentChunkNum == 0; + possiblySuspendReads(response); + if (!httpResponse.isChunked()) { finishRequest(); } @@ -220,10 +241,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) if (httpChunk.isLast()) { finishRequest(); } else { - response = handler.handleChunk(response, httpChunk); + response = handler.handleChunk(response, httpChunk, ++currentChunkNum); if (response.isFinished() && !retVal.isDone()) { retVal.set((Final) response.getObj()); } + possiblySuspendReads(response); } } else { throw new IllegalStateException(StringUtils.format("Unknown message type[%s]", msg.getClass())); @@ -242,22 +264,37 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) } } + private void possiblySuspendReads(ClientResponse response) + { + if (!response.isContinueReading()) { + synchronized (watermarkLock) { + suspendWatermark = Math.max(suspendWatermark, currentChunkNum); + if (suspendWatermark > resumeWatermark) { + channel.setReadable(false); + log.debug("[%s] Suspended reads from channel (chunkNum = %,d).", requestDesc, currentChunkNum); + } + } + } + } + private void finishRequest() { ClientResponse finalResponse = handler.done(response); - if (!finalResponse.isFinished()) { - throw new IllegalStateException( - StringUtils.format( - "[%s] Didn't get a completed ClientResponse Object from [%s]", - requestDesc, - handler.getClass() - ) + + if (!finalResponse.isFinished() || !finalResponse.isContinueReading()) { + throw new ISE( + "[%s] Didn't get a completed ClientResponse Object from [%s] (finished = %s, continueReading = %s)", + requestDesc, + handler.getClass(), + finalResponse.isFinished(), + finalResponse.isContinueReading() ); } if (!retVal.isDone()) { retVal.set(finalResponse.getObj()); } removeHandlers(); + channel.setReadable(true); channelResourceContainer.returnResource(); } diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/ClientResponse.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/ClientResponse.java index 9528480a6c46..febfb9884bd8 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/ClientResponse.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/ClientResponse.java @@ -19,29 +19,42 @@ package org.apache.druid.java.util.http.client.response; +import javax.annotation.Nullable; + /** */ public class ClientResponse { private final boolean finished; + private final boolean continueReading; + + @Nullable private final T obj; public static ClientResponse finished(T obj) { - return new ClientResponse(true, obj); + return new ClientResponse<>(true, true, obj); + } + + public static ClientResponse finished(T obj, boolean continueReading) + { + return new ClientResponse<>(true, continueReading, obj); } public static ClientResponse unfinished(T obj) { - return new ClientResponse(false, obj); + return new ClientResponse<>(false, true, obj); } - protected ClientResponse( - boolean finished, - T obj - ) + public static ClientResponse unfinished(T obj, boolean continueReading) + { + return new ClientResponse<>(false, continueReading, obj); + } + + public ClientResponse(final boolean finished, final boolean continueReading, @Nullable final T obj) { this.finished = finished; + this.continueReading = continueReading; this.obj = obj; } @@ -50,6 +63,12 @@ public boolean isFinished() return finished; } + public boolean isContinueReading() + { + return continueReading; + } + + @Nullable public T getObj() { return obj; diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHandler.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHandler.java index 7c11eb7ba003..34a9c3ebd1fc 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHandler.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHandler.java @@ -36,7 +36,7 @@ public FullResponseHandler(Charset charset) } @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { return ClientResponse.unfinished( new FullResponseHolder( @@ -50,7 +50,8 @@ public ClientResponse handleResponse(HttpResponse response) @Override public ClientResponse handleChunk( ClientResponse response, - HttpChunk chunk + HttpChunk chunk, + long chunkNum ) { final StringBuilder builder = response.getObj().getBuilder(); diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java index 32fcb6035751..02d6caa6f2f4 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java @@ -38,17 +38,60 @@ * * Note: if you return a finished ClientResponse object from anything other than the done() method, IntermediateType * must be castable to FinalType + * + * This handler can exert backpressure by returning a response with "continueReading" set to false from handleResponse() + * or handleChunk(). In this case, the HTTP client will stop reading soon thereafter. It may not happen immediately, so + * be prepared for more handleChunk() calls to happen. To resume reads, call resume() on the TrafficCop provided by + * handleResponse() with a chunk number at least as high as the one provided by the handleChunk() call from which you + * returned a suspend-reading response. If you are resuming reads after suspending them from handleResponse(), use 0 + * for the chunk number. */ public interface HttpResponseHandler { /** * Handles the initial HttpResponse object that comes back from Netty. * - * @param response - response from Netty - * @return + * @param response response from Netty + * @param trafficCop flow controller, allows resuming suspended reads + * + * @return response that may be "finished" or "unfinished". + */ + ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop); + + /** + * Called for chunked responses, indicating another HttpChunk has arrived. + * + * @param clientResponse last response returned by the prior handleResponse() or handleChunk() + * @param chunk the new chunk of data + * @param chunkNum the sequence number of this chunk (increases monotonically) + * + * @return response that may be "finished" or "unfinished". + */ + ClientResponse handleChunk( + ClientResponse clientResponse, + HttpChunk chunk, + long chunkNum + ); + + /** + * Called after the final handleResponse() or handleChunk() call, signifying that no more data + * will arrive. + * + * @param clientResponse last response returned by handleResponse() or handleChunk() + * + * @return response containing an object to hand back to the caller. It must be a "finished" response. */ - ClientResponse handleResponse(HttpResponse response); - ClientResponse handleChunk(ClientResponse clientResponse, HttpChunk chunk); ClientResponse done(ClientResponse clientResponse); + void exceptionCaught(ClientResponse clientResponse, Throwable e); + + interface TrafficCop + { + /** + * Call this to resume reading after you have suspended it. + * + * @param chunkNum chunk number corresponding to the handleChunk() or handleResponse() call from which you + */ + void resume(long chunkNum); + } } diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamResponseHandler.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamResponseHandler.java index e3a17b9ca3e2..66855d20daa0 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamResponseHandler.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamResponseHandler.java @@ -31,7 +31,7 @@ public class InputStreamResponseHandler implements HttpResponseHandler { @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); in.add(getContentBytes(response.getContent())); @@ -40,7 +40,9 @@ public ClientResponse handleResponse(HttpRespons @Override public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk + ClientResponse clientResponse, + HttpChunk chunk, + long chunkNum ) { clientResponse.getObj().add(getContentBytes(chunk.getContent())); diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java index cf8995fb7030..cd6a0ceea70c 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.java @@ -55,7 +55,7 @@ public class SequenceInputStreamResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { try { queue.put(new ChannelBufferInputStream(response.getContent())); @@ -99,7 +99,9 @@ public InputStream nextElement() @Override public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk + ClientResponse clientResponse, + HttpChunk chunk, + long chunkNum ) { final ChannelBuffer channelBuffer = chunk.getContent(); diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java index 603cd7bddd0e..1da8095c9df5 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java @@ -36,7 +36,7 @@ public StatusResponseHandler(Charset charset) } @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { return ClientResponse.unfinished( new StatusResponseHolder( @@ -49,7 +49,8 @@ public ClientResponse handleResponse(HttpResponse response @Override public ClientResponse handleChunk( ClientResponse response, - HttpChunk chunk + HttpChunk chunk, + long chunkNum ) { final StringBuilder builder = response.getObj().getBuilder(); diff --git a/java-util/src/test/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandlerTest.java b/java-util/src/test/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandlerTest.java index 45c626d88074..0fedaacba052 100644 --- a/java-util/src/test/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandlerTest.java +++ b/java-util/src/test/java/org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandlerTest.java @@ -84,8 +84,9 @@ public void testExceptionalChunkedStream() throws IOException SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.setChunked(true); - ClientResponse clientResponse = responseHandler.handleResponse(response); + ClientResponse clientResponse = responseHandler.handleResponse(response, null); final int failAt = Math.abs(RANDOM.nextInt()) % allBytes.length; + long chunkNum = 0; while (it.hasNext()) { final DefaultHttpChunk chunk = new DefaultHttpChunk( new BigEndianHeapChannelBuffer(it.next()) @@ -100,7 +101,7 @@ public void getBytes(int index, byte[] dst, int dstIndex, int length) } } ); - clientResponse = responseHandler.handleChunk(clientResponse, chunk); + clientResponse = responseHandler.handleChunk(clientResponse, chunk, ++chunkNum); } clientResponse = responseHandler.done(clientResponse); @@ -132,7 +133,7 @@ public void getBytes(int index, byte[] dst, int dstIndex, int length) } } ); - ClientResponse clientResponse = responseHandler.handleResponse(response); + ClientResponse clientResponse = responseHandler.handleResponse(response, null); clientResponse = responseHandler.done(clientResponse); final InputStream stream = clientResponse.getObj(); @@ -148,10 +149,11 @@ public void simpleMultiStreamTest() throws IOException SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.setChunked(true); - ClientResponse clientResponse = responseHandler.handleResponse(response); + ClientResponse clientResponse = responseHandler.handleResponse(response, null); + long chunkNum = 0; while (it.hasNext()) { final DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer(it.next())); - clientResponse = responseHandler.handleChunk(clientResponse, chunk); + clientResponse = responseHandler.handleChunk(clientResponse, chunk, ++chunkNum); } clientResponse = responseHandler.done(clientResponse); @@ -178,10 +180,11 @@ public void alignedMultiStreamTest() throws IOException SequenceInputStreamResponseHandler responseHandler = new SequenceInputStreamResponseHandler(); final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.setChunked(true); - ClientResponse clientResponse = responseHandler.handleResponse(response); + ClientResponse clientResponse = responseHandler.handleResponse(response, null); + long chunkNum = 0; while (it.hasNext()) { final DefaultHttpChunk chunk = new DefaultHttpChunk(new BigEndianHeapChannelBuffer(it.next())); - clientResponse = responseHandler.handleChunk(clientResponse, chunk); + clientResponse = responseHandler.handleChunk(clientResponse, chunk, ++chunkNum); } clientResponse = responseHandler.done(clientResponse); @@ -206,7 +209,7 @@ public void simpleSingleStreamTest() throws IOException final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.setChunked(false); response.setContent(new BigEndianHeapChannelBuffer(allBytes)); - ClientResponse clientResponse = responseHandler.handleResponse(response); + ClientResponse clientResponse = responseHandler.handleResponse(response, null); clientResponse = responseHandler.done(clientResponse); final InputStream stream = clientResponse.getObj(); diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 7fb168a2eaa3..3399aa9b1a26 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -33,6 +33,7 @@ public class QueryContexts public static final String PRIORITY_KEY = "priority"; public static final String TIMEOUT_KEY = "timeout"; public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; + public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; @@ -171,7 +172,10 @@ public static Query verifyMaxQueryTimeout(Query query, long maxQueryTi } } - + public static long getMaxQueuedBytes(Query query, long defaultValue) + { + return parseLong(query, MAX_QUEUED_BYTES_KEY, defaultValue); + } public static long getMaxScatterGatherBytes(Query query) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryPlus.java b/processing/src/main/java/org/apache/druid/query/QueryPlus.java index 08983027be2c..caa235053f01 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryPlus.java +++ b/processing/src/main/java/org/apache/druid/query/QueryPlus.java @@ -20,6 +20,7 @@ package org.apache.druid.query; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.spec.QuerySegmentSpec; @@ -132,6 +133,18 @@ public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics, identity); } + /** + * Equivalent of withQuery(getQuery().withOverriddenContext(ImmutableMap.of(MAX_QUEUED_BYTES_KEY, maxQueuedBytes))). + */ + public QueryPlus withMaxQueuedBytes(long maxQueuedBytes) + { + return new QueryPlus<>( + query.withOverriddenContext(ImmutableMap.of(QueryContexts.MAX_QUEUED_BYTES_KEY, maxQueuedBytes)), + queryMetrics, + identity + ); + } + /** * Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}. */ diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 8fece183cac4..99df5f0a06f9 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -33,12 +33,15 @@ import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.inject.Inject; +import org.apache.commons.codec.binary.Base64; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulator; import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.guice.annotations.Client; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -70,7 +73,6 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; -import org.apache.commons.codec.binary.Base64; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -97,6 +99,7 @@ public class CachingClusteredClient implements QuerySegmentWalker private final ObjectMapper objectMapper; private final CachePopulator cachePopulator; private final CacheConfig cacheConfig; + private final DruidHttpClientConfig httpClientConfig; @Inject public CachingClusteredClient( @@ -105,7 +108,8 @@ public CachingClusteredClient( Cache cache, @Smile ObjectMapper objectMapper, CachePopulator cachePopulator, - CacheConfig cacheConfig + CacheConfig cacheConfig, + @Client DruidHttpClientConfig httpClientConfig ) { this.warehouse = warehouse; @@ -114,6 +118,7 @@ public CachingClusteredClient( this.objectMapper = objectMapper; this.cachePopulator = cachePopulator; this.cacheConfig = cacheConfig; + this.httpClientConfig = httpClientConfig; if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { log.warn( @@ -543,13 +548,17 @@ private void addSequencesFromServer( final MultipleSpecificSegmentSpec segmentsOfServerSpec = new MultipleSpecificSegmentSpec(segmentsOfServer); - Sequence serverResults; + // Divide user-provided maxQueuedBytes by the number of servers, and limit each server to that much. + final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, httpClientConfig.getMaxQueuedBytes()); + final long maxQueuedBytesPerServer = maxQueuedBytes / segmentsByServer.size(); + final Sequence serverResults; + if (isBySegment) { - serverResults = getBySegmentServerResults(serverRunner, segmentsOfServerSpec); + serverResults = getBySegmentServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); } else if (!server.segmentReplicatable() || !populateCache) { - serverResults = getSimpleServerResults(serverRunner, segmentsOfServerSpec); + serverResults = getSimpleServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); } else { - serverResults = getAndCacheServerResults(serverRunner, segmentsOfServerSpec); + serverResults = getAndCacheServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); } listOfSequences.add(serverResults); }); @@ -558,11 +567,15 @@ private void addSequencesFromServer( @SuppressWarnings("unchecked") private Sequence getBySegmentServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec + final MultipleSpecificSegmentSpec segmentsOfServerSpec, + long maxQueuedBytesPerServer ) { Sequence>> resultsBySegments = serverRunner - .run(queryPlus.withQuerySegmentSpec(segmentsOfServerSpec), responseContext); + .run( + queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + responseContext + ); // bySegment results need to be de-serialized, see DirectDruidClient.run() return (Sequence) resultsBySegments .map(result -> result.map( @@ -575,22 +588,28 @@ private Sequence getBySegmentServerResults( @SuppressWarnings("unchecked") private Sequence getSimpleServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec + final MultipleSpecificSegmentSpec segmentsOfServerSpec, + long maxQueuedBytesPerServer ) { - return serverRunner.run(queryPlus.withQuerySegmentSpec(segmentsOfServerSpec), responseContext); + return serverRunner.run( + queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + responseContext + ); } private Sequence getAndCacheServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec + final MultipleSpecificSegmentSpec segmentsOfServerSpec, + long maxQueuedBytesPerServer ) { @SuppressWarnings("unchecked") final Sequence>> resultsBySegments = serverRunner.run( queryPlus .withQuery((Query>>) downstreamQuery) - .withQuerySegmentSpec(segmentsOfServerSpec), + .withQuerySegmentSpec(segmentsOfServerSpec) + .withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); final Function cacheFn = strategy.prepareForSegmentLevelCache(); diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 52a28c203393..60dfd7a0c45c 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -27,8 +27,8 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.io.ByteSource; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -63,7 +63,7 @@ import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -193,17 +193,20 @@ public Sequence run(final QueryPlus queryPlus, final Map c log.debug("Querying queryId[%s] url[%s]", query.getId(), url); final long requestStartTimeNs = System.nanoTime(); - - long timeoutAt = query.getContextValue(QUERY_FAIL_TIME); - long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); - AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); + final long timeoutAt = query.getContextValue(QUERY_FAIL_TIME); + final long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); + final AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); + final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, 0); + final boolean usingBackpressure = maxQueuedBytes > 0; final HttpResponseHandler responseHandler = new HttpResponseHandler() { - private final AtomicLong byteCount = new AtomicLong(0); - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final AtomicLong totalByteCount = new AtomicLong(0); + private final AtomicLong queuedByteCount = new AtomicLong(0); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference fail = new AtomicReference<>(); + private final AtomicReference trafficCopRef = new AtomicReference<>(); private QueryMetrics> queryMetrics; private long responseStartTimeNs; @@ -217,9 +220,41 @@ private QueryMetrics> acquireResponseMetrics() return queryMetrics; } + /** + * Queue a buffer. Returns true if we should keep reading, false otherwise. + */ + private boolean enqueue(ChannelBuffer buffer, long chunkNum) throws InterruptedException + { + // Increment queuedByteCount before queueing the object, so queuedByteCount is at least as high as + // the actual number of queued bytes at any particular time. + final InputStreamHolder holder = InputStreamHolder.fromChannelBuffer(buffer, chunkNum); + final long currentQueuedByteCount = queuedByteCount.addAndGet(holder.getLength()); + queue.put(holder); + + // True if we should keep reading. + return !usingBackpressure || currentQueuedByteCount < maxQueuedBytes; + } + + private InputStream dequeue() throws InterruptedException + { + final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); + if (holder == null) { + throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); + } + + final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength()); + if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) { + Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this be?") + .resume(holder.getChunkNum()); + } + + return holder.getStream(); + } + @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { + trafficCopRef.set(trafficCop); checkQueryTimeout(); checkTotalBytesLimit(response.getContent().readableBytes()); @@ -227,6 +262,7 @@ public ClientResponse handleResponse(HttpResponse response) responseStartTimeNs = System.nanoTime(); acquireResponseMetrics().reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter); + final boolean continueReading; try { final String responseContext = response.headers().get("X-Druid-Response-Context"); // context may be null in case of error or query timeout @@ -237,7 +273,7 @@ public ClientResponse handleResponse(HttpResponse response) ) ); } - queue.put(new ChannelBufferInputStream(response.getContent())); + continueReading = enqueue(response.getContent(), 0L); } catch (final IOException e) { log.error(e, "Error parsing response context from url [%s]", url); @@ -257,7 +293,7 @@ public int read() throws IOException Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(response.getContent().readableBytes()); + totalByteCount.addAndGet(response.getContent().readableBytes()); return ClientResponse.finished( new SequenceInputStream( new Enumeration() @@ -285,12 +321,7 @@ public InputStream nextElement() } try { - InputStream is = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); - if (is != null) { - return is; - } else { - throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); - } + return dequeue(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -298,13 +329,16 @@ public InputStream nextElement() } } } - ) + ), + continueReading ); } @Override public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk + ClientResponse clientResponse, + HttpChunk chunk, + long chunkNum ) { checkQueryTimeout(); @@ -314,18 +348,20 @@ public ClientResponse handleChunk( checkTotalBytesLimit(bytes); + boolean continueReading = true; if (bytes > 0) { try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + continueReading = enqueue(channelBuffer, chunkNum); } catch (InterruptedException e) { log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(bytes); + totalByteCount.addAndGet(bytes); } - return clientResponse; + + return ClientResponse.finished(clientResponse.getObj(), continueReading); } @Override @@ -338,29 +374,26 @@ public ClientResponse done(ClientResponse clientRespon "Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", query.getId(), url, - byteCount.get(), + totalByteCount.get(), nodeTimeMs, - byteCount.get() / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception + // Floating math; division by zero will yield Inf, not exception + totalByteCount.get() / (0.001 * nodeTimeMs) ); QueryMetrics> responseMetrics = acquireResponseMetrics(); responseMetrics.reportNodeTime(nodeTimeNs); - responseMetrics.reportNodeBytes(byteCount.get()); + responseMetrics.reportNodeBytes(totalByteCount.get()); responseMetrics.emit(emitter); synchronized (done) { try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out // after done is set to true, regardless of the rest of the stream's state. - queue.put(ByteSource.empty().openStream()); + queue.put(InputStreamHolder.fromChannelBuffer(ChannelBuffers.EMPTY_BUFFER, Long.MAX_VALUE)); } catch (InterruptedException e) { log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - catch (IOException e) { - // This should never happen - throw Throwables.propagate(e); - } finally { done.set(true); } @@ -384,19 +417,24 @@ private void setupResponseReadFailure(String msg, Throwable th) { fail.set(msg); queue.clear(); - queue.offer(new InputStream() - { - @Override - public int read() throws IOException - { - if (th != null) { - throw new IOException(msg, th); - } else { - throw new IOException(msg); - } - } - }); - + queue.offer( + InputStreamHolder.fromStream( + new InputStream() + { + @Override + public int read() throws IOException + { + if (th != null) { + throw new IOException(msg, th); + } else { + throw new IOException(msg); + } + } + }, + -1, + 0 + ) + ); } // Returns remaining timeout or throws exception if timeout already elapsed. diff --git a/server/src/main/java/org/apache/druid/client/InputStreamHolder.java b/server/src/main/java/org/apache/druid/client/InputStreamHolder.java new file mode 100644 index 000000000000..a0def4ba3f73 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/InputStreamHolder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferInputStream; + +import java.io.InputStream; + +public class InputStreamHolder +{ + private final InputStream stream; + private final long chunkNum; + private final long length; + + public InputStreamHolder(final InputStream stream, final long chunkNum, final long length) + { + this.stream = stream; + this.chunkNum = chunkNum; + this.length = length; + } + + public static InputStreamHolder fromStream(final InputStream stream, final long chunkNum, final long length) + { + return new InputStreamHolder(stream, chunkNum, length); + } + + public static InputStreamHolder fromChannelBuffer(final ChannelBuffer buffer, final long chunkNum) + { + final int length = buffer.readableBytes(); + return new InputStreamHolder(new ChannelBufferInputStream(buffer), chunkNum, length); + } + + public InputStream getStream() + { + return stream; + } + + public long getChunkNum() + { + return chunkNum; + } + + public long getLength() + { + return length; + } +} diff --git a/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java b/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java index deadd859b4b0..d0944bf4f04c 100644 --- a/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java +++ b/server/src/main/java/org/apache/druid/guice/http/DruidHttpClientConfig.java @@ -58,6 +58,13 @@ public class DruidHttpClientConfig @JsonProperty private Period unusedConnectionTimeout = new Period("PT4M"); + /** + * Maximum number of bytes queued per query before exerting backpressure. Not always used; currently, it's only + * respected by CachingClusteredClient (broker -> data server communication). + */ + @JsonProperty + private long maxQueuedBytes = 0L; + public int getNumConnections() { return numConnections; @@ -101,4 +108,9 @@ public Duration getUnusedConnectionTimeout() } return unusedConnectionTimeout == null ? null : unusedConnectionTimeout.toStandardDuration(); } + + public long getMaxQueuedBytes() + { + return maxQueuedBytes; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java index 408ab4beef7d..1ae2d18397ca 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java @@ -426,11 +426,11 @@ private static class BytesAccumulatingResponseHandler extends InputStreamRespons private String description; @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { status = response.getStatus().getCode(); description = response.getStatus().getReasonPhrase(); - return ClientResponse.unfinished(super.handleResponse(response).getObj()); + return ClientResponse.unfinished(super.handleResponse(response, trafficCop).getObj()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java index 87f2e8599817..e6faec51c185 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java @@ -602,11 +602,11 @@ private static class BytesAccumulatingResponseHandler extends InputStreamRespons private String description; @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { status = response.getStatus().getCode(); description = response.getStatus().getReasonPhrase(); - return ClientResponse.unfinished(super.handleResponse(response).getObj()); + return ClientResponse.unfinished(super.handleResponse(response, trafficCop).getObj()); } } } diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java index ac71c1d4f2c7..09a0a0969e8c 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -859,11 +859,11 @@ HttpResponseHandler makeResponseHandler( return new SequenceInputStreamResponseHandler() { @Override - public ClientResponse handleResponse(HttpResponse response) + public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { returnCode.set(response.getStatus().getCode()); reasonString.set(response.getStatus().getReasonPhrase()); - return super.handleResponse(response); + return super.handleResponse(response, trafficCop); } }; } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 7ac1596b9131..f02ba430eca6 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulator; @@ -33,6 +34,7 @@ import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.client.selector.TierSelectorStrategy; +import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; @@ -50,7 +52,6 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.SingleElementPartitionChunk; -import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.AfterClass; @@ -296,6 +297,13 @@ public int getCacheBulkMergeLimit() { return mergeLimit; } + }, + new DruidHttpClientConfig() { + @Override + public long getMaxQueuedBytes() + { + return 0L; + } } ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 4a6dfd0a7ba5..83b4aeebcd57 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -56,6 +56,7 @@ import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; +import org.apache.druid.guice.http.DruidHttpClientConfig; import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -2684,6 +2685,13 @@ public int getCacheBulkMergeLimit() { return mergeLimit; } + }, + new DruidHttpClientConfig() { + @Override + public long getMaxQueuedBytes() + { + return 0L; + } } ); } diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index 0bc1ecb0edea..fbeae25f769c 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -322,13 +322,13 @@ public ListenableFuture go( //fail scenario where request is sent to server but we got an unexpected response. HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); httpResponse.setContent(ChannelBuffers.buffer(0)); - httpResponseHandler.handleResponse(httpResponse); + httpResponseHandler.handleResponse(httpResponse, null); return Futures.immediateFailedFuture(new RuntimeException("server error")); } HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); httpResponse.setContent(ChannelBuffers.buffer(0)); - httpResponseHandler.handleResponse(httpResponse); + httpResponseHandler.handleResponse(httpResponse, null); try { return results.take(); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncerTest.java index 26531bc3c511..079552185cb7 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncerTest.java @@ -190,13 +190,13 @@ public ListenableFuture go( HttpResponseStatus.INTERNAL_SERVER_ERROR ); httpResponse.setContent(ChannelBuffers.buffer(0)); - httpResponseHandler.handleResponse(httpResponse); + httpResponseHandler.handleResponse(httpResponse, null); return Futures.immediateFailedFuture(new RuntimeException("server error")); } HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); httpResponse.setContent(ChannelBuffers.buffer(0)); - httpResponseHandler.handleResponse(httpResponse); + httpResponseHandler.handleResponse(httpResponse, null); try { return results.take(); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java index b65c1ee7d374..6b65fae290bc 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -208,7 +208,7 @@ public ListenableFuture go( { HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); httpResponse.setContent(ChannelBuffers.buffer(0)); - httpResponseHandler.handleResponse(httpResponse); + httpResponseHandler.handleResponse(httpResponse, null); try { List changeRequests = ServerTestHelper.MAPPER.readValue( request.getContent().array(), new TypeReference>()