From 0a453328dd5226e0753080c860a0f557cb6ec00a Mon Sep 17 00:00:00 2001 From: danielhuang Date: Wed, 9 Oct 2019 17:57:43 +0800 Subject: [PATCH 1/4] check shard level transport response memory --- .../action/search/SearchTransportService.java | 2 +- .../transport/InboundHandler.java | 21 +++++++++++++++++-- .../transport/TransportService.java | 4 ++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 4b66ed885db20..a8a7e19ba5e53 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -396,7 +396,7 @@ Transport.Connection getConnection(@Nullable String clusterAlias, DiscoveryNode } } - final class ConnectionCountingHandler extends ActionListenerResponseHandler { + public final class ConnectionCountingHandler extends ActionListenerResponseHandler { private final Map clientConnections; private final String nodeId; diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index cb7a14b56970c..8c19af9baa0d7 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -134,7 +135,7 @@ private void messageReceived(BytesReference reference, TcpChannel channel) throw if (message.isError()) { handlerResponseError(message.getStreamInput(), handler); } else { - handleResponse(remoteAddress, message.getStreamInput(), handler); + handleResponse(remoteAddress, message.getStreamInput(), handler, reference.length()); } // Check the entire message has been read final int nextByte = message.getStreamInput().read(); @@ -198,15 +199,31 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i } private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, - final TransportResponseHandler handler) { + final TransportResponseHandler handler, int messageLengthBytes) { final T response; + long bytesNeedToRelease = 0; + CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); try { + if (handler instanceof TransportService.ContextRestoreResponseHandler) { + TransportResponseHandler delegate = ((TransportService.ContextRestoreResponseHandler) handler).getDelegate(); + if (delegate instanceof SearchTransportService.ConnectionCountingHandler && messageLengthBytes > 1024) { + // the main purpose is to check memory before deserialization for large size of response + bytesNeedToRelease = messageLengthBytes; + breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } + } + response = handler.read(stream); response.remoteAddress(new TransportAddress(remoteAddress)); } catch (Exception e) { handleException(handler, new TransportSerializationException( "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e)); return; + } finally { + // release message bytes from request breaker even the real memory has not been released yet + if (bytesNeedToRelease > 0) { + breaker.addWithoutBreaking(-bytesNeedToRelease); + } } threadPool.executor(handler.executor()).execute(new AbstractRunnable() { @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index ddd7a0d4cab19..4fbbff78a95b4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1067,6 +1067,10 @@ public ContextRestoreResponseHandler(Supplier conte this.contextSupplier = contextSupplier; } + public TransportResponseHandler getDelegate() { + return delegate; + } + @Override public T read(StreamInput in) throws IOException { return delegate.read(in); From adef8ef64570bc302477b2d599d41a2885b9a2d1 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 29 Oct 2019 21:02:59 +0800 Subject: [PATCH 2/4] add canTripCircuitBreaker to filter search response --- .../action/search/SearchTransportService.java | 7 ++++++- .../org/elasticsearch/transport/InboundHandler.java | 12 ++++-------- .../transport/TransportResponseHandler.java | 4 ++++ .../elasticsearch/transport/TransportService.java | 9 +++++---- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index a8a7e19ba5e53..930b432d597ee 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -396,7 +396,7 @@ Transport.Connection getConnection(@Nullable String clusterAlias, DiscoveryNode } } - public final class ConnectionCountingHandler extends ActionListenerResponseHandler { + final class ConnectionCountingHandler extends ActionListenerResponseHandler { private final Map clientConnections; private final String nodeId; @@ -427,6 +427,11 @@ public void handleException(TransportException e) { clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1); } + @Override + public boolean canTripCircuitBreaker() { + return true; + } + private boolean assertNodePresent() { clientConnections.compute(nodeId, (id, conns) -> { assert conns != null : "number of connections for " + id + " is null, but should be an integer"; diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 8c19af9baa0d7..36860a45edb50 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; -import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -204,13 +203,10 @@ private void handleResponse(InetSocketAddress remo long bytesNeedToRelease = 0; CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); try { - if (handler instanceof TransportService.ContextRestoreResponseHandler) { - TransportResponseHandler delegate = ((TransportService.ContextRestoreResponseHandler) handler).getDelegate(); - if (delegate instanceof SearchTransportService.ConnectionCountingHandler && messageLengthBytes > 1024) { - // the main purpose is to check memory before deserialization for large size of response - bytesNeedToRelease = messageLengthBytes; - breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); - } + if (handler.canTripCircuitBreaker() && messageLengthBytes > 1024) { + // the main purpose is to check memory before deserialization for large size of response + bytesNeedToRelease = messageLengthBytes; + breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); } response = handler.read(stream); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index 29720216cf400..4cae5d5d04d17 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -28,4 +28,8 @@ public interface TransportResponseHandler extends W void handleException(TransportException exp); String executor(); + + default boolean canTripCircuitBreaker() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 4fbbff78a95b4..cc251c1d3bd21 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1067,10 +1067,6 @@ public ContextRestoreResponseHandler(Supplier conte this.contextSupplier = contextSupplier; } - public TransportResponseHandler getDelegate() { - return delegate; - } - @Override public T read(StreamInput in) throws IOException { return delegate.read(in); @@ -1106,6 +1102,11 @@ public String toString() { return getClass().getName() + "/" + delegate.toString(); } + @Override + public boolean canTripCircuitBreaker() { + return delegate.canTripCircuitBreaker(); + } + void setTimeoutHandler(TimeoutHandler handler) { this.handler = handler; } From a765a47af6cee4e08be32c0dc81281929469d407 Mon Sep 17 00:00:00 2001 From: danielhuang Date: Tue, 29 Oct 2019 21:58:56 +0800 Subject: [PATCH 3/4] change request to in flight request breaker --- .../main/java/org/elasticsearch/transport/InboundHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 36860a45edb50..9e2e6302ae832 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -201,7 +201,7 @@ private void handleResponse(InetSocketAddress remo final TransportResponseHandler handler, int messageLengthBytes) { final T response; long bytesNeedToRelease = 0; - CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST); + CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); try { if (handler.canTripCircuitBreaker() && messageLengthBytes > 1024) { // the main purpose is to check memory before deserialization for large size of response From 76917bb639cf73708cdab42ba868cffb3c3f36ca Mon Sep 17 00:00:00 2001 From: danielhuang Date: Thu, 14 Nov 2019 16:47:22 +0800 Subject: [PATCH 4/4] handle specific breaker exception --- .../transport/InboundHandler.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 9e2e6302ae832..ee1597e39818d 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.metrics.MeanMetric; @@ -200,13 +201,19 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler, int messageLengthBytes) { final T response; - long bytesNeedToRelease = 0; - CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); try { if (handler.canTripCircuitBreaker() && messageLengthBytes > 1024) { // the main purpose is to check memory before deserialization for large size of response - bytesNeedToRelease = messageLengthBytes; - breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + try { + breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } catch (CircuitBreakingException e) { + handleException(handler, e); + return; + } finally { + // release message bytes from request breaker even the real memory has not been released yet + breaker.addWithoutBreaking(-messageLengthBytes); + } } response = handler.read(stream); @@ -215,11 +222,6 @@ private void handleResponse(InetSocketAddress remo handleException(handler, new TransportSerializationException( "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e)); return; - } finally { - // release message bytes from request breaker even the real memory has not been released yet - if (bytesNeedToRelease > 0) { - breaker.addWithoutBreaking(-bytesNeedToRelease); - } } threadPool.executor(handler.executor()).execute(new AbstractRunnable() { @Override