Skip to content

Commit

Permalink
Imporve visability into failed message delivery (#3469)
Browse files Browse the repository at this point in the history
Log additional information, such as message type and the receiving
endpoint when message delivery fails.
  • Loading branch information
vjeko committed Dec 23, 2022
1 parent 003d656 commit e3a9c5a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ public void shutdown() {
executor.shutdown();
}

/**
* Return a human-readable string of the receiving endpoint
*
* @param ctx ChannelHandlerContext
* @return string in form of IP:PORT
*/
protected static String getEndpoint(ChannelHandlerContext ctx) {
try {
return ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
} catch (NullPointerException ex) {
return "unavailable";
}
}

/**
* Respond to a ping request.
*
Expand All @@ -65,8 +79,8 @@ public void shutdown() {
*/
@RequestHandler(type = RequestPayloadMsg.PayloadCase.PING_REQUEST)
public void handlePing(RequestMsg req, ChannelHandlerContext ctx, IServerRouter r) {
log.trace("handlePing[{}]: Ping message received from {} {}", req.getHeader().getRequestId(),
req.getHeader().getClientId().getMsb(), req.getHeader().getClientId().getLsb());
log.trace("handlePing[{}]: Ping message received from {}",
req.getHeader().getRequestId(), getEndpoint(ctx));

HeaderMsg responseHeader = getHeaderMsg(req.getHeader(), ClusterIdCheck.CHECK, EpochCheck.IGNORE);
ResponseMsg response = getResponseMsg(responseHeader, getPingResponseMsg());
Expand All @@ -85,22 +99,17 @@ public void handlePing(RequestMsg req, ChannelHandlerContext ctx, IServerRouter
private synchronized void handleSeal(RequestMsg req, ChannelHandlerContext ctx, IServerRouter r) {
try {
final long epoch = req.getPayload().getSealRequest().getEpoch();
String remoteHostAddress;
try {
remoteHostAddress = ((InetSocketAddress)ctx.channel().remoteAddress()).getAddress().getHostAddress();
} catch (NullPointerException ex) {
remoteHostAddress = "unavailable";
}

log.info("handleSeal[{}]: Received SEAL from (clientId={}:{}), moving to new epoch {},",
req.getHeader().getRequestId(), req.getHeader().getClientId(), remoteHostAddress, epoch);
log.info("handleSeal[{}]: Received SEAL from {}, moving to new epoch {},",
req.getHeader().getRequestId(), getEndpoint(ctx), epoch);

serverContext.setServerEpoch(epoch, r);
HeaderMsg responseHeader = getHeaderMsg(req.getHeader(), ClusterIdCheck.CHECK, EpochCheck.IGNORE);
ResponseMsg response = getResponseMsg(responseHeader, getSealResponseMsg());
r.sendResponse(response, ctx);
} catch (WrongEpochException e) {
log.debug("handleSeal[{}]: Rejected SEAL current={}, requested={}", req.getHeader().getRequestId(),
log.debug("handleSeal[{}]: Rejected SEAL from {} current={}, requested={}",
req.getHeader().getRequestId(), getEndpoint(ctx),
e.getCorrectEpoch(), req.getPayload().getSealRequest().getEpoch());

HeaderMsg responseHeader = getHeaderMsg(req.getHeader(), ClusterIdCheck.CHECK, EpochCheck.IGNORE);
Expand All @@ -121,7 +130,7 @@ private synchronized void handleSeal(RequestMsg req, ChannelHandlerContext ctx,
@RequestHandler(type = RequestPayloadMsg.PayloadCase.RESET_REQUEST)
private void handleReset(RequestMsg req, ChannelHandlerContext ctx, IServerRouter r) {
log.warn("handleReset[{}]: Remote reset requested from {}",
req.getHeader().getRequestId(), req.getHeader().getClientId());
req.getHeader().getRequestId(), getEndpoint(ctx));

HeaderMsg responseHeader = getHeaderMsg(req.getHeader(), ClusterIdCheck.CHECK, EpochCheck.IGNORE);
ResponseMsg response = getResponseMsg(responseHeader, getResetResponseMsg());
Expand All @@ -141,7 +150,7 @@ private void handleReset(RequestMsg req, ChannelHandlerContext ctx, IServerRoute
@RequestHandler(type = RequestPayloadMsg.PayloadCase.RESTART_REQUEST)
private void handleRestart(RequestMsg req, ChannelHandlerContext ctx, IServerRouter r) {
log.warn("handleRestart[{}]: Remote restart requested from {}",
req.getHeader().getRequestId(), req.getHeader().getClientId());
req.getHeader().getRequestId(), getEndpoint(ctx));

HeaderMsg responseHeader = getHeaderMsg(req.getHeader(), ClusterIdCheck.CHECK, EpochCheck.IGNORE);
ResponseMsg response = getResponseMsg(responseHeader, getRestartResponseMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import javax.annotation.Nonnull;
import javax.net.ssl.SSLException;

import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -125,7 +127,7 @@ public class NettyClientRouter extends SimpleChannelInboundHandler<Object> imple
/**
* The outstanding requests on this router.
*/
public final Map<Long, CompletableFuture> outstandingRequests;
public final Map<Long, RequestMetadata> outstandingRequests;

/**
* The currently registered channel.
Expand Down Expand Up @@ -170,6 +172,13 @@ public class NettyClientRouter extends SimpleChannelInboundHandler<Object> imple
*/
private final EventLoopGroup eventLoopGroup;

@Data
@Builder
public static class RequestMetadata {
CompletableFuture<?> future;
RequestPayloadMsg.PayloadCase payloadCase;
}

/**
* Creates a new NettyClientRouter connected to the specified host and port with the specified tls
* and sasl options. The new {@link this} will attempt connection to the node until {@link
Expand Down Expand Up @@ -229,6 +238,8 @@ public NettyClientRouter(@Nonnull NodeLocator node,
b.handler(getChannelInitializer());
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) timeoutConnect);

log.info("Created a new NettyClientRouter with ID {}", parameters.getClientId());

// Asynchronously connect, retrying until shut down.
// Once connected, connectionFuture will be completed.
connectAsync(b);
Expand Down Expand Up @@ -336,9 +347,9 @@ private void addReconnectionOnCloseFuture(@Nonnull Channel channel,
// Remove the current completion future, forcing clients to wait for reconnection.
connectionFuture = new CompletableFuture<>();
// Exceptionally complete all requests that were waiting for a completion.
outstandingRequests.forEach((reqId, reqCompletableFuture) -> {
reqCompletableFuture.completeExceptionally(
new NetworkException("Disconnected", node));
outstandingRequests.forEach((reqId, requestMetadata) -> {
requestMetadata.future.completeExceptionally(new NetworkException(
String.format("Disconnected (%s)", requestMetadata.payloadCase), node));
// And also remove them.
outstandingRequests.remove(reqId);
});
Expand Down Expand Up @@ -455,7 +466,11 @@ public <T> CompletableFuture<T> sendRequestAndGetCompletable(

// Generate a future and put it in the completion table.
final CompletableFuture<T> cf = new CompletableFuture<>();
outstandingRequests.put(thisRequestId, cf);
outstandingRequests.put(thisRequestId,
RequestMetadata.builder()
.future(cf)
.payloadCase(request.getPayload().getPayloadCase())
.build());

// Write this message out on the channel
channel.writeAndFlush(request, channel.voidPromise());
Expand All @@ -475,7 +490,8 @@ public <T> CompletableFuture<T> sendRequestAndGetCompletable(
if (e.getCause() instanceof TimeoutException) {
outstandingRequests.remove(thisRequestId);
log.debug(
"sendRequestAndGetCompletable: Remove request {} to {} due to timeout! Request:{}",
"sendRequestAndGetCompletable: Remove request (Type: {} ID: {}) to {} due to timeout! Request:{}",
request.getPayload().getPayloadCase(),
thisRequestId, node, TextFormat.shortDebugString(request.getHeader()));
}
return null;
Expand Down Expand Up @@ -530,8 +546,9 @@ public void sendRequest(RequestPayloadMsg payload, long epoch,
* @param <T> The type of the completion.
*/
public <T> void completeRequest(long requestId, T completion) {
CompletableFuture<T> cf;
if ((cf = (CompletableFuture<T>) outstandingRequests.remove(requestId)) != null) {
RequestMetadata requestMetadata = outstandingRequests.remove(requestId);
if (requestMetadata != null) {
CompletableFuture<T> cf = (CompletableFuture<T>) requestMetadata.future;
cf.complete(completion);
} else {
log.warn("Attempted to complete request {}, but request not outstanding!", requestId);
Expand All @@ -541,18 +558,20 @@ public <T> void completeRequest(long requestId, T completion) {
/**
* Exceptionally complete a request with a given cause.
*
* @param requestID The request to complete.
* @param requestId The request to complete.
* @param cause The cause to give for the exceptional completion.
*/
public void completeExceptionally(long requestID, @Nonnull Throwable cause) {
CompletableFuture cf;
if ((cf = outstandingRequests.remove(requestID)) != null) {
cf.completeExceptionally(cause);
log.debug("completeExceptionally: Remove request {} to {} due to {}.", requestID, node,
public void completeExceptionally(long requestId, @Nonnull Throwable cause) {
RequestMetadata requestMetadata = outstandingRequests.remove(requestId);
if (requestMetadata != null) {
requestMetadata.future.completeExceptionally(cause);
log.debug("completeExceptionally: Remove request (Type: {} ID: {}) to {} due to {}.",
requestMetadata.payloadCase,
requestId, node,
cause.getClass().getSimpleName(), cause);
} else {
log.warn("Attempted to exceptionally complete request {}, but request not outstanding!",
requestID);
requestId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private void attemptConsensus(Layout layout)
prepareRank = 1L;
} catch (OutrankedException oe) {
// Update rank since outranked.
log.error("Conflict in updating layout by attemptConsensus: {}", oe);
log.error("Conflict in updating layout by attemptConsensus:", oe);
// Update rank to be able to outrank other competition and complete paxos.
prepareRank = oe.getNewRank() + 1;
throw oe;
Expand Down

0 comments on commit e3a9c5a

Please sign in to comment.