Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinate node memory checking during accumulating shard result response #47806

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ public void handleException(TransportException e) {
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
}

@Override
public boolean canTripCircuitBreaker() {
return true;
}

private boolean assertNodePresent() {
clientConnections.compute(nodeId, (id, conns) -> {
assert conns != null : "number of connections for " + id + " is null, but should be an integer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void messageReceived(BytesReference reference, TcpChannel channel) throw
if (message.isError()) {
handlerResponseError(message.getStreamInput(), handler);
} else {
handleResponse(remoteAddress, message.getStreamInput(), handler);
handleResponse(remoteAddress, message.getStreamInput(), handler, reference.length());
}
// Check the entire message has been read
final int nextByte = message.getStreamInput().read();
Expand Down Expand Up @@ -198,15 +198,28 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
}

private <T extends TransportResponse> void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream,
final TransportResponseHandler<T> handler) {
final TransportResponseHandler<T> handler, int messageLengthBytes) {
final T response;
long bytesNeedToRelease = 0;
CircuitBreaker breaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
try {
if (handler.canTripCircuitBreaker() && messageLengthBytes > 1024) {
// the main purpose is to check memory before deserialization for large size of response
bytesNeedToRelease = messageLengthBytes;
breaker.addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_response>");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this particular approach (the network level approach), you will need to catch the circuit breaking exception and handle it. With this code path it is going to appear as a transport serialization exception.

I also this that we would want to approach to literally surround the execution. I understand you are primarily concern about the increase in size from deserialization, but as a generic approach we would want to wrap the call to the response handler.

}

response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));
} catch (Exception e) {
handleException(handler, new TransportSerializationException(
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
return;
} finally {
// release message bytes from request breaker even the real memory has not been released yet
if (bytesNeedToRelease > 0) {
breaker.addWithoutBreaking(-bytesNeedToRelease);
}
}
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ public interface TransportResponseHandler<T extends TransportResponse> extends W
void handleException(TransportException exp);

String executor();

default boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,11 @@ public String toString() {
return getClass().getName() + "/" + delegate.toString();
}

@Override
public boolean canTripCircuitBreaker() {
return delegate.canTripCircuitBreaker();
}

void setTimeoutHandler(TimeoutHandler handler) {
this.handler = handler;
}
Expand Down