diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index ea16251d40e6d..bafe4a5d44243 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -77,7 +77,14 @@ final protected void handlePing(CommandPing ping) { if (log.isDebugEnabled()) { log.debug("[{}] Replying back to ping message", ctx.channel()); } - ctx.writeAndFlush(Commands.newPong()); + ctx.writeAndFlush(Commands.newPong()) + .addListener(future -> { + if (!future.isSuccess()) { + log.warn("[{}] Forcing connection to close since cannot send a pong message.", + ctx.channel(), future.cause()); + ctx.close(); + } + }); } @Override @@ -104,7 +111,14 @@ private void handleKeepAliveTimeout() { log.debug("[{}] Sending ping message", ctx.channel()); } waitingForPingResponse = true; - ctx.writeAndFlush(Commands.newPing()); + ctx.writeAndFlush(Commands.newPing()) + .addListener(future -> { + if (!future.isSuccess()) { + log.warn("[{}] Forcing connection to close since cannot send a ping message.", + ctx.channel(), future.cause()); + ctx.close(); + } + }); } else { if (log.isDebugEnabled()) { log.debug("[{}] Peer doesn't support keep-alive", ctx.channel());