From 3ddf32c22a3e8032731a598e3e11d17f5c4e9159 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Wed, 9 Oct 2019 17:57:43 +0800 Subject: [PATCH] check shard level transport response memory --- .../action/search/SearchTransportService.java | 2 +- .../transport/InboundHandler.java | 21 +++++++++++++++++-- .../transport/TransportService.java | 4 ++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 4b66ed885db20..a8a7e19ba5e53 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -396,7 +396,7 @@ Transport.Connection getConnection(@Nullable String clusterAlias, DiscoveryNode } } - final class ConnectionCountingHandler extends ActionListenerResponseHandler { + public final class ConnectionCountingHandler extends ActionListenerResponseHandler { private final Map clientConnections; private final String nodeId; diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index cb7a14b56970c..8c19af9baa0d7 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -134,7 +135,7 @@ private void messageReceived(BytesReference reference, TcpChannel channel) throw if (message.isError()) { handlerResponseError(message.getStreamInput(), handler); } else { - handleResponse(remoteAddress, message.getStreamInput(), handler); + handleResponse(remoteAddress, message.getStreamInput(), handler, reference.length()); } // Check the entire message has been read final int nextByte = message.getStreamInput().read(); @@ -198,15 +199,31 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i } private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, - final TransportResponseHandler handler) { + final TransportResponseHandler handler, int messageLengthBytes) { final T response; + long bytesNeedToRelease = 0; + CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); try { + if (handler instanceof TransportService.ContextRestoreResponseHandler) { + TransportResponseHandler delegate = ((TransportService.ContextRestoreResponseHandler) handler).getDelegate(); + if (delegate instanceof SearchTransportService.ConnectionCountingHandler && messageLengthBytes > 1024) { + // the main purpose is to check memory before deserialization for large size of response + bytesNeedToRelease = messageLengthBytes; + breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } + } + response = handler.read(stream); response.remoteAddress(new TransportAddress(remoteAddress)); } catch (Exception e) { handleException(handler, new TransportSerializationException( "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e)); return; + } finally { + // release message bytes from request breaker even the real memory has not been released yet + if (bytesNeedToRelease > 0) { + breaker.addWithoutBreaking(-bytesNeedToRelease); + } } threadPool.executor(handler.executor()).execute(new AbstractRunnable() { @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index ddd7a0d4cab19..4fbbff78a95b4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1067,6 +1067,10 @@ public ContextRestoreResponseHandler(Supplier conte this.contextSupplier = contextSupplier; } + public TransportResponseHandler getDelegate() { + return delegate; + } + @Override public T read(StreamInput in) throws IOException { return delegate.read(in);