Skip to content

Commit

Permalink
MINOR: Log client disconnect events at INFO level (#11449)
Browse files Browse the repository at this point in the history
Client disconnects are crucial events for debugging. The fact that we only log them at DEBUG/TRACE means we rarely have them available outside of a testing context. This patch therefore increases verbosity to INFO level. In practice, we already have backoff configurations which should prevent these logs from getting too spammy. 

Reviewers: Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
hachikuji committed Nov 10, 2021
1 parent b8c3a67 commit 79d97bd
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
Expand Up @@ -158,8 +158,7 @@ public Iterable<NetworkClient.InFlightRequest> clearAll(String node) {

private Boolean hasExpiredRequest(long now, Deque<NetworkClient.InFlightRequest> deque) {
for (NetworkClient.InFlightRequest request : deque) {
long timeSinceSend = Math.max(0, now - request.sendTimeMs);
if (timeSinceSend > request.requestTimeoutMs)
if (request.timeElapsedSinceSendMs(now) > request.requestTimeoutMs)
return true;
}
return false;
Expand Down
41 changes: 29 additions & 12 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Expand Up @@ -316,26 +316,34 @@ boolean canConnect(Node node, long now) {
*/
@Override
public void disconnect(String nodeId) {
if (connectionStates.isDisconnected(nodeId))
if (connectionStates.isDisconnected(nodeId)) {
log.debug("Client requested disconnect from node {}, which is already disconnected", nodeId);
return;
}

log.info("Client requested disconnect from node {}", nodeId);
selector.close(nodeId);
long now = time.milliseconds();

cancelInFlightRequests(nodeId, now, abortedSends);

connectionStates.disconnected(nodeId, now);

if (log.isTraceEnabled()) {
log.trace("Manually disconnected from {}. Aborted in-flight requests: {}.", nodeId, inFlightRequests);
}
}

private void cancelInFlightRequests(String nodeId, long now, Collection<ClientResponse> responses) {
Iterable<InFlightRequest> inFlightRequests = this.inFlightRequests.clearAll(nodeId);
for (InFlightRequest request : inFlightRequests) {
log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
if (log.isDebugEnabled()) {
log.debug("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected " +
"(elapsed time since creation: {}ms, elapsed time since send: {}ms, request timeout: {}ms): {}",
request.header.apiKey(), request.header.correlationId(), nodeId,
request.timeElapsedSinceCreateMs(now), request.timeElapsedSinceSendMs(now),
request.requestTimeoutMs, request.request);
} else {
log.info("Cancelled in-flight {} request with correlation id {} due to node {} being disconnected " +
"(elapsed time since creation: {}ms, elapsed time since send: {}ms, request timeout: {}ms)",
request.header.apiKey(), request.header.correlationId(), nodeId,
request.timeElapsedSinceCreateMs(now), request.timeElapsedSinceSendMs(now),
request.requestTimeoutMs);
}

if (!request.isInternalRequest) {
if (responses != null)
Expand All @@ -355,6 +363,7 @@ private void cancelInFlightRequests(String nodeId, long now, Collection<ClientRe
*/
@Override
public void close(String nodeId) {
log.info("Client requested connection close from node {}", nodeId);
selector.close(nodeId);
long now = time.milliseconds();
cancelInFlightRequests(nodeId, now, null);
Expand Down Expand Up @@ -785,7 +794,7 @@ private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
log.info("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
}
Expand All @@ -807,7 +816,7 @@ private void handleTimedOutConnections(List<ClientResponse> responses, long now)
List<String> nodes = connectionStates.nodesWithConnectionSetupTimeout(now);
for (String nodeId : nodes) {
this.selector.close(nodeId);
log.debug(
log.info(
"Disconnecting from node {} due to socket connection setup timeout. " +
"The timeout value is {} ms.",
nodeId,
Expand Down Expand Up @@ -923,7 +932,7 @@ private void handleApiVersionsResponse(List<ClientResponse> responses,
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
String node = entry.getKey();
log.debug("Node {} disconnected.", node);
log.info("Node {} disconnected.", node);
processDisconnection(responses, node, now, entry.getValue());
}
}
Expand Down Expand Up @@ -1251,6 +1260,14 @@ public InFlightRequest(RequestHeader header,
this.sendTimeMs = sendTimeMs;
}

public long timeElapsedSinceSendMs(long currentTimeMs) {
return Math.max(0, currentTimeMs - sendTimeMs);
}

public long timeElapsedSinceCreateMs(long currentTimeMs) {
return Math.max(0, currentTimeMs - createdTimeMs);
}

public ClientResponse completed(AbstractResponse response, long timeMs) {
return new ClientResponse(header, callback, destination, createdTimeMs, timeMs,
false, null, null, response);
Expand Down
Expand Up @@ -925,8 +925,10 @@ protected synchronized void markCoordinatorUnknown(boolean isDisconnected, Strin

// Disconnect from the coordinator to ensure that there are no in-flight requests remaining.
// Pending callbacks will be invoked with a DisconnectException on the next call to poll.
if (!isDisconnected)
if (!isDisconnected) {
log.info("Requesting disconnect from last known coordinator {}", oldCoordinator);
client.disconnectAsync(oldCoordinator);
}

lastTimeOfConnectionMs = time.milliseconds();
} else {
Expand Down

0 comments on commit 79d97bd

Please sign in to comment.