Skip to content

Commit

Permalink
check shard level transport response memory
Browse files Browse the repository at this point in the history
  • Loading branch information
howardhuanghua committed Oct 9, 2019
1 parent 603c3e6 commit 3ddf32c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ Transport.Connection getConnection(@Nullable String clusterAlias, DiscoveryNode
}
}

final class ConnectionCountingHandler<Response extends TransportResponse> extends ActionListenerResponseHandler<Response> {
public final class ConnectionCountingHandler<Response extends TransportResponse> extends ActionListenerResponseHandler<Response> {
private final Map<String, Long> clientConnections;
private final String nodeId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -134,7 +135,7 @@ private void messageReceived(BytesReference reference, TcpChannel channel) throw
if (message.isError()) {
handlerResponseError(message.getStreamInput(), handler);
} else {
handleResponse(remoteAddress, message.getStreamInput(), handler);
handleResponse(remoteAddress, message.getStreamInput(), handler, reference.length());
}
// Check the entire message has been read
final int nextByte = message.getStreamInput().read();
Expand Down Expand Up @@ -198,15 +199,31 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
}

private <T extends TransportResponse> void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream,
final TransportResponseHandler<T> handler) {
final TransportResponseHandler<T> handler, int messageLengthBytes) {
final T response;
long bytesNeedToRelease = 0;
CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
try {
if (handler instanceof TransportService.ContextRestoreResponseHandler) {
TransportResponseHandler<T> delegate = ((TransportService.ContextRestoreResponseHandler<T>) handler).getDelegate();
if (delegate instanceof SearchTransportService.ConnectionCountingHandler && messageLengthBytes > 1024) {
// the main purpose is to check memory before deserialization for large size of response
bytesNeedToRelease = messageLengthBytes;
breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_response>");
}
}

response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));
} catch (Exception e) {
handleException(handler, new TransportSerializationException(
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
return;
} finally {
// release message bytes from request breaker even the real memory has not been released yet
if (bytesNeedToRelease > 0) {
breaker.addWithoutBreaking(-bytesNeedToRelease);
}
}
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,10 @@ public ContextRestoreResponseHandler(Supplier<ThreadContext.StoredContext> conte
this.contextSupplier = contextSupplier;
}

public TransportResponseHandler<T> getDelegate() {
return delegate;
}

@Override
public T read(StreamInput in) throws IOException {
return delegate.read(in);
Expand Down

0 comments on commit 3ddf32c

Please sign in to comment.