Skip to content

Commit

Permalink
[improve] [log] Print source client addr when enabled haProxyProtocol…
Browse files Browse the repository at this point in the history
…Enabled (#22686)

(cherry picked from commit d77c5de)
  • Loading branch information
poorbarcode committed May 10, 2024
1 parent 2e37449 commit b178084
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ public KeySharedMeta getKeySharedMeta() {
public String toString() {
if (subscription != null && cnx != null) {
return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId)
.add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
.add("consumerName", consumerName).add("address", this.cnx.toString()).toString();
} else {
return MoreObjects.toStringHelper(this).add("consumerId", consumerId)
.add("consumerName", consumerName).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ public Map<String, String> getMetadata() {

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.clientAddress())
return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.toString())
.add("producerName", producerName).add("producerId", producerId).toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
remoteAddress, getPrincipal());
}

log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.ctx().channel().toString(),
log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.toString(),
topicName, subscriptionName, consumerId);
try {
Metadata.validateMetadata(metadata,
Expand Down Expand Up @@ -1818,7 +1818,7 @@ protected void handleAck(CommandAck ack) {
if (log.isDebugEnabled()) {
log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard"
+ " this command. consumerId: {}, cnx: {}, messageIdCount: {}", ack.getConsumerId(),
this.ctx().channel().toString(), ack.getMessageIdsCount());
this.toString(), ack.getMessageIdsCount());
}
}
}
Expand Down Expand Up @@ -2176,7 +2176,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", ServerCnx.this.toString(),
topic.getName(), subscriptionName, lastPosition, partitionIndex);
}

Expand Down Expand Up @@ -3255,7 +3255,7 @@ private void disableTcpNoDelayIfNeeded(String topic, String producerName) {
}
} catch (Throwable t) {
log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", topic, producerName,
ctx.channel());
this.toString());
}
}
}
Expand Down Expand Up @@ -3318,6 +3318,31 @@ public SocketAddress getRemoteAddress() {
return remoteAddress;
}

/**
* Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038] [SR:/240.240.0.5:58038].
* L: local Address.
* R: remote address.
* SR: source remote address. It is the source address when enabled "haProxyProtocolEnabled".
*/
@Override
public String toString() {
ChannelHandlerContext ctx = ctx();
// ctx.channel(): 96.
// clientSourceAddress: 5 + 46(ipv6).
// state: 19.
// Len = 166.
StringBuilder buf = new StringBuilder(166);
if (ctx == null) {
buf.append("[ctx: null]");
} else {
buf.append(ctx.channel().toString());
}
String clientSourceAddr = clientSourceAddress();
buf.append(" [SR:").append(clientSourceAddr == null ? "-" : clientSourceAddr)
.append(", state:").append(state).append("]");
return buf.toString();
}

@Override
public BrokerService getBrokerService() {
return service;
Expand Down Expand Up @@ -3455,7 +3480,7 @@ public CompletableFuture<Boolean> checkConnectionLiveness() {
ctx.executor().schedule(() -> {
if (finalConnectionCheckInProgress == connectionCheckInProgress
&& !finalConnectionCheckInProgress.isDone()) {
log.warn("[{}] Connection check timed out. Closing connection.", remoteAddress);
log.warn("[{}] Connection check timed out. Closing connection.", this.toString());
ctx.close();
}
}, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
} else {
msg += "Pattern longer than maximum: " + maxSubscriptionPatternLength;
}
log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), msg, namespaceName);
log.warn("[{}] {} on namespace {}", connection.toString(), msg, namespaceName);
connection.getCommandSender().sendErrorResponse(requestId, ServerError.NotAllowedError, msg);
lookupSemaphore.release();
return;
Expand All @@ -144,14 +144,14 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
TopicListWatcher watcher = existingWatcherFuture.getNow(null);
log.info("[{}] Watcher with the same id is already created:"
+ " watcherId={}, watcher={}",
connection.getRemoteAddress(), watcherId, watcher);
connection.toString(), watcherId, watcher);
watcherFuture = existingWatcherFuture;
} else {
// There was an early request to create a watcher with the same watcherId. This can happen when
// client timeout is lower the broker timeouts. We need to wait until the previous watcher
// creation request either completes or fails.
log.warn("[{}] Watcher with id is already present on the connection,"
+ " consumerId={}", connection.getRemoteAddress(), watcherId);
+ " consumerId={}", connection.toString(), watcherId);
ServerError error;
if (!existingWatcherFuture.isDone()) {
error = ServerError.ServiceNotReady;
Expand Down Expand Up @@ -179,14 +179,14 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
if (log.isDebugEnabled()) {
log.debug(
"[{}] Received WatchTopicList for namespace [//{}] by {}",
connection.getRemoteAddress(), namespaceName, requestId);
connection.toString(), namespaceName, requestId);
}
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, hash, topicList);
lookupSemaphore.release();
})
.exceptionally(ex -> {
log.warn("[{}] Error WatchTopicList for namespace [//{}] by {}",
connection.getRemoteAddress(), namespaceName, requestId);
connection.toString(), namespaceName, requestId);
connection.getCommandSender().sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(
new BrokerServiceException.ServerMetadataException(ex)), ex.getMessage());
Expand All @@ -213,7 +213,7 @@ public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watc
} else {
if (!watcherFuture.complete(watcher)) {
log.warn("[{}] Watcher future was already completed. Deregistering watcherId={}.",
connection.getRemoteAddress(), watcherId);
connection.toString(), watcherId);
topicResources.deregisterPersistentTopicListener(watcher);
}
}
Expand All @@ -232,7 +232,7 @@ public void deleteTopicListWatcher(Long watcherId) {
CompletableFuture<TopicListWatcher> watcherFuture = watchers.get(watcherId);
if (watcherFuture == null) {
log.info("[{}] TopicListWatcher was not registered on the connection: {}",
watcherId, connection.getRemoteAddress());
watcherId, connection.toString());
return;
}

Expand All @@ -242,22 +242,22 @@ public void deleteTopicListWatcher(Long watcherId) {
// watcher future as failed and we can tell the client the close operation was successful. When the actual
// create operation will complete, the new watcher will be discarded.
log.info("[{}] Closed watcher before its creation was completed. watcherId={}",
connection.getRemoteAddress(), watcherId);
connection.toString(), watcherId);
watchers.remove(watcherId);
return;
}

if (watcherFuture.isCompletedExceptionally()) {
log.info("[{}] Closed watcher that already failed to be created. watcherId={}",
connection.getRemoteAddress(), watcherId);
connection.toString(), watcherId);
watchers.remove(watcherId);
return;
}

// Proceed with normal watcher close
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
watchers.remove(watcherId);
log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId);
log.info("[{}] Closed watcher, watcherId={}", connection.toString(), watcherId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
cmd.parseFrom(buffer, cmdSize);

if (log.isDebugEnabled()) {
log.debug("[{}] Received cmd {}", ctx.channel().remoteAddress(), cmd.getType());
log.debug("[{}] Received cmd {}", ctx.channel(), cmd.getType());
}
messageReceived();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;

if (log.isDebugEnabled()) {
log.debug("[{}] Scheduling keep-alive task every {} s", ctx.channel(), keepAliveIntervalSeconds);
log.debug("[{}] Scheduling keep-alive task every {} s", this.toString(), keepAliveIntervalSeconds);
}
if (keepAliveIntervalSeconds > 0) {
this.keepAliveTask = ctx.executor()
Expand All @@ -85,13 +85,13 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
protected final void handlePing(CommandPing ping) {
// Immediately reply success to ping requests
if (log.isDebugEnabled()) {
log.debug("[{}] Replying back to ping message", ctx.channel());
log.debug("[{}] Replying back to ping message", this.toString());
}
ctx.writeAndFlush(Commands.newPong())
.addListener(future -> {
if (!future.isSuccess()) {
log.warn("[{}] Forcing connection to close since cannot send a pong message.",
ctx.channel(), future.cause());
toString(), future.cause());
ctx.close();
}
});
Expand All @@ -107,24 +107,24 @@ private void handleKeepAliveTimeout() {
}

if (!isHandshakeCompleted()) {
log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", ctx.channel());
log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", this.toString());
ctx.close();
} else if (waitingForPingResponse && ctx.channel().config().isAutoRead()) {
// We were waiting for a response and another keep-alive just completed.
// If auto-read was disabled, it means we stopped reading from the connection, so we might receive the Ping
// response later and thus not enforce the strict timeout here.
log.warn("[{}] Forcing connection to close after keep-alive timeout", ctx.channel());
log.warn("[{}] Forcing connection to close after keep-alive timeout", this.toString());
ctx.close();
} else if (getRemoteEndpointProtocolVersion() >= ProtocolVersion.v1.getValue()) {
// Send keep alive probe to peer only if it supports the ping/pong commands, added in v1
if (log.isDebugEnabled()) {
log.debug("[{}] Sending ping message", ctx.channel());
log.debug("[{}] Sending ping message", this.toString());
}
waitingForPingResponse = true;
sendPing();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Peer doesn't support keep-alive", ctx.channel());
log.debug("[{}] Peer doesn't support keep-alive", this.toString());
}
}
}
Expand All @@ -134,7 +134,7 @@ protected ChannelFuture sendPing() {
.addListener(future -> {
if (!future.isSuccess()) {
log.warn("[{}] Forcing connection to close since cannot send a ping message.",
ctx.channel(), future.cause());
this.toString(), future.cause());
ctx.close();
}
});
Expand All @@ -152,5 +152,20 @@ public void cancelKeepAliveTask() {
*/
protected abstract boolean isHandshakeCompleted();

/**
* Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038].
* L: local Address.
* R: remote address.
*/
@Override
public String toString() {
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
return "[ctx: null]";
} else {
return ctx.channel().toString();
}
}

private static final Logger log = LoggerFactory.getLogger(PulsarHandler.class);
}

0 comments on commit b178084

Please sign in to comment.