Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinate node memory checking during accumulating shard result response #47806

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ public void handleException(TransportException e) {
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
}

@Override
public boolean canTripCircuitBreaker() {
return true;
}

private boolean assertNodePresent() {
clientConnections.compute(nodeId, (id, conns) -> {
assert conns != null : "number of connections for " + id + " is null, but should be an integer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.MeanMetric;
Expand Down Expand Up @@ -134,7 +135,7 @@ private void messageReceived(BytesReference reference, TcpChannel channel) throw
if (message.isError()) {
handlerResponseError(message.getStreamInput(), handler);
} else {
handleResponse(remoteAddress, message.getStreamInput(), handler);
handleResponse(remoteAddress, message.getStreamInput(), handler, reference.length());
}
// Check the entire message has been read
final int nextByte = message.getStreamInput().read();
Expand Down Expand Up @@ -198,9 +199,23 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
}

private <T extends TransportResponse> void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream,
final TransportResponseHandler<T> handler) {
final TransportResponseHandler<T> handler, int messageLengthBytes) {
final T response;
try {
if (handler.canTripCircuitBreaker() && messageLengthBytes > 1024) {
// the main purpose is to check memory before deserialization for large size of response
CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
try {
breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_response>");
} catch (CircuitBreakingException e) {
handleException(handler, e);
return;
} finally {
// release message bytes from request breaker even the real memory has not been released yet
breaker.addWithoutBreaking(-messageLengthBytes);
}
}

response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ public interface TransportResponseHandler<T extends TransportResponse> extends W
void handleException(TransportException exp);

String executor();

default boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,11 @@ public String toString() {
return getClass().getName() + "/" + delegate.toString();
}

@Override
public boolean canTripCircuitBreaker() {
return delegate.canTripCircuitBreaker();
}

void setTimeoutHandler(TimeoutHandler handler) {
this.handler = handler;
}
Expand Down