Skip to content

Commit

Permalink
Improve Slow Inbound Handling to Include Response Type (#80425) (#92290)
Browse files Browse the repository at this point in the history
The slow logging for responses is often relatively uselss when debugging
because it does not contain the type of the response. Tracking down the type
from the message size and response id is not possible in most cases.
This commit adds the handler to the log message which gives us that type information.

Co-authored-by: Armin Braun <me@obrown.io>
  • Loading branch information
pxsalehi and original-brownbear committed Dec 12, 2022
1 parent 88e5311 commit 5abd5e8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,41 +97,43 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
final Header header = message.getHeader();
assert header.needsToReadVariableHeader() == false;

TransportResponseHandler<?> responseHandler = null;
final boolean isRequest = message.getHeader().isRequest();

ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
// Place the context with the headers from the message
threadContext.setHeaders(header.getHeaders());
threadContext.putTransient("_remote_address", remoteAddress);
if (header.isRequest()) {
if (isRequest) {
handleRequest(channel, header, message);
} else {
// Responses do not support short circuiting currently
assert message.isShortCircuit() == false;
final TransportResponseHandler<?> handler;
long requestId = header.getRequestId();
if (header.isHandshake()) {
handler = handshaker.removeHandlerForHandshake(requestId);
responseHandler = handshaker.removeHandlerForHandshake(requestId);
} else {
TransportResponseHandler<? extends TransportResponse> theHandler = responseHandlers.onResponseReceived(
final TransportResponseHandler<? extends TransportResponse> theHandler = responseHandlers.onResponseReceived(
requestId,
messageListener
);
if (theHandler == null && header.isError()) {
handler = handshaker.removeHandlerForHandshake(requestId);
responseHandler = handshaker.removeHandlerForHandshake(requestId);
} else {
handler = theHandler;
responseHandler = theHandler;
}
}
// ignore if its null, the service logs it
if (handler != null) {
if (responseHandler != null) {
final StreamInput streamInput;
if (message.getContentLength() > 0 || header.getVersion().equals(Version.CURRENT) == false) {
streamInput = namedWriteableStream(message.openOrGetStreamInput());
assertRemoteVersion(streamInput, header.getVersion());
if (header.isError()) {
handlerResponseError(streamInput, handler);
handlerResponseError(streamInput, responseHandler);
} else {
handleResponse(remoteAddress, streamInput, handler);
handleResponse(remoteAddress, streamInput, responseHandler);
}
// Check the entire message has been read
final int nextByte = streamInput.read();
Expand All @@ -141,28 +143,38 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
"Message not fully read (response) for requestId ["
+ requestId
+ "], handler ["
+ handler
+ responseHandler
+ "], error ["
+ header.isError()
+ "]; resetting"
);
}
} else {
assert header.isError() == false;
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler);
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, responseHandler);
}
}
}
} finally {
final long took = threadPool.relativeTimeInMillis() - startTime;
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0 && took > logThreshold) {
logger.warn(
"handling inbound transport message [{}] took [{}ms] which is above the warn threshold of [{}ms]",
message,
took,
logThreshold
);
if (isRequest) {
logger.warn(
"handling request [{}] took [{}ms] which is above the warn threshold of [{}ms]",
message,
took,
logThreshold
);
} else {
logger.warn(
"handling response [{}] on handler [{}] took [{}ms] which is above the warn threshold of [{}ms]",
message,
responseHandler,
took,
logThreshold
);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,28 +275,30 @@ public void testClosesChannelOnErrorInHandshakeWithIncompatibleVersion() throws
public void testLogsSlowInboundProcessing() throws Exception {
final MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"expected message",
InboundHandler.class.getCanonicalName(),
Level.WARN,
"handling inbound transport message "
)
);
final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
Loggers.addAppender(inboundHandlerLogger, mockAppender);

handler.setSlowLogThreshold(TimeValue.timeValueMillis(5L));
try {
final Version remoteVersion = Version.CURRENT;

mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"expected slow request",
InboundHandler.class.getCanonicalName(),
Level.WARN,
"handling request "
)
);

final long requestId = randomNonNegativeLong();
final Header requestHeader = new Header(
between(0, 100),
requestId,
TransportStatus.setRequest(TransportStatus.setHandshake((byte) 0)),
remoteVersion
);
final InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {
final InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.empty(), () -> {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
Expand All @@ -309,6 +311,34 @@ public void testLogsSlowInboundProcessing() throws Exception {
handler.inboundMessage(channel, requestMessage);
assertNotNull(channel.getMessageCaptor().get());
mockAppender.assertAllExpectationsMatched();

mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"expected slow response",
InboundHandler.class.getCanonicalName(),
Level.WARN,
"handling response "
)
);

final long responseId = randomNonNegativeLong();
final Header responseHeader = new Header(between(0, 100), responseId, TransportStatus.setResponse((byte) 0), remoteVersion);
responseHeader.headers = Tuple.tuple(Map.of(), Map.of());
handler.setMessageListener(new TransportMessageListener() {
@Override
@SuppressWarnings("rawtypes")
public void onResponseReceived(long requestId, Transport.ResponseContext context) {
assertEquals(responseId, requestId);
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
});
handler.inboundMessage(channel, new InboundMessage(responseHeader, ReleasableBytesReference.empty(), () -> {}));

mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(inboundHandlerLogger, mockAppender);
mockAppender.stop();
Expand Down

0 comments on commit 5abd5e8

Please sign in to comment.