-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve] [broker] Add consumer-id into the log when doing subscribe. #20568
Conversation
@@ -361,6 +361,12 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch | |||
msgOutCounter.add(totalMessages); | |||
bytesOutCounter.add(totalBytes); | |||
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); | |||
} else { | |||
log.warn("[{}-{}] Sent messages to client fail by IO exception[{}], these messages(messages count:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the {} in log seems not align to the parameter. IO exception[{}] is the third {}, in the parameter is the last
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you are right. already fixed, Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to change the log level to debug? It's a message level log, for a cluster with high consumption throughput, it will introduce too many warning logs. We should be careful to add logs on the message level with >=
info level.
@@ -361,6 +361,13 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch | |||
msgOutCounter.add(totalMessages); | |||
bytesOutCounter.add(totalBytes); | |||
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); | |||
} else { | |||
log.warn("[{}-{}] Sent messages to client fail by IO exception[{}], these messages(messages count:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered the connection problems that will print tons of logs here? :)
IMO, the message trace logs should be considered carefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered the connection problems that will print tons of logs here? :)
IMO, the message trace logs should be considered carefully.
Added a health check here, if the health check fail, this connection will be closed immediately. So this log only be printed when the health check is in progress.
@@ -1783,6 +1784,10 @@ protected void handleAck(CommandAck ack) { | |||
} | |||
return null; | |||
}); | |||
} else { | |||
log.warn("Consumer future is not complete(not complete or error), but received command ack. so discard" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same concern as before. Tons of logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already change logger level to DEBUG
Codecov Report
@@ Coverage Diff @@
## master #20568 +/- ##
=============================================
+ Coverage 33.50% 73.01% +39.50%
- Complexity 12053 31966 +19913
=============================================
Files 1613 1867 +254
Lines 126120 138662 +12542
Branches 13749 15236 +1487
=============================================
+ Hits 42254 101239 +58985
+ Misses 78332 29385 -48947
- Partials 5534 8038 +2504
Flags with carried forward coverage won't be shown. Click here to find out more.
|
topicName, subscription, status.cause() == null ? "" : status.cause().getMessage(), | ||
totalMessages, this.toString(), status.cause()); | ||
// If the health check fail, this connection will be closed. | ||
cnx.healthCheckManually(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice there is a recent added code also check the connection liveness. do you think we can reuse this method ?
see
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 3395 to 3418 in d85736c
public CompletableFuture<Boolean> checkConnectionLiveness() { | |
if (connectionLivenessCheckTimeoutMillis > 0) { | |
return NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> { | |
if (connectionCheckInProgress != null) { | |
return connectionCheckInProgress; | |
} else { | |
final CompletableFuture<Boolean> finalConnectionCheckInProgress = new CompletableFuture<>(); | |
connectionCheckInProgress = finalConnectionCheckInProgress; | |
ctx.executor().schedule(() -> { | |
if (finalConnectionCheckInProgress == connectionCheckInProgress | |
&& !finalConnectionCheckInProgress.isDone()) { | |
log.warn("[{}] Connection check timed out. Closing connection.", remoteAddress); | |
ctx.close(); | |
} | |
}, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS); | |
sendPing(); | |
return finalConnectionCheckInProgress; | |
} | |
})).thenCompose(java.util.function.Function.identity()); | |
} else { | |
// check is disabled | |
return CompletableFuture.completedFuture((Boolean) null); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a good suggestion. The method checkConnectionLiveness
was called when a new consumer registered, it always calls sendPing
. Since sending messages to the client is a high-frequency event. So if a connection is inactive, this results in a large number of tasks being pushed into Netty's task pool. We should wrap the method checkConnectionLiveness
as a new method to limit the number of tasks. such as
public void healthCheckManually( ) {
if (canSendPingNow()) {
checkConnectionLiveness();
}
}
But the wrap method is not needed to switch thread to the netty thread, it already runs in the netty thread, so calling sendPing
directly is better than calling checkConnectionLiveness
. The current implementation is the result of final optimization.
@@ -1783,6 +1789,10 @@ protected void handleAck(CommandAck ack) { | |||
} | |||
return null; | |||
}); | |||
} else { | |||
log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (log.isDebugEnabled())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add a test to ensure the connection will be closed immediately if failed to write data to the channel.
@@ -361,6 +361,12 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch | |||
msgOutCounter.add(totalMessages); | |||
bytesOutCounter.add(totalBytes); | |||
chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); | |||
} else { | |||
log.warn("[{}-{}] Sent messages to client fail by IO exception[{}], these messages(messages count:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to change the log level to debug? It's a message level log, for a cluster with high consumption throughput, it will introduce too many warning logs. We should be careful to add logs on the message level with >=
info level.
return; | ||
} | ||
if (LAST_MANUAL_HEARTBEAT_CHECK_TIME_UPDATER.compareAndSet(this, lastCheckTime, System.currentTimeMillis())) { | ||
sendPing(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how it can resolve the problem. If the ping command is sent, but no pong response from the client side, does the broker need to wait for the keep-alive timeout? But the keep alive task already handled it, right?
Or maybe we should call handleKeepAliveTimeout()
so that the connection can be closed forcelly without waiting for the response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to change the log level to debug? It's a message level log, for a cluster with high consumption throughput, it will introduce too many warning logs. We should be careful to add logs on the message level with >= info level.
Fixed.
Is it possible to add a test to ensure the connection will be closed immediately if failed to write data to the channel?
After I wrote a test, I noticed that the logic of "close connection after the write failure" already exists[1], I removed the logic which tries to do the same thing. Now the modification of this PR is the following two points:
- (important) Since
cnx.address + consumerId
is the identifier of one consumer; addconsumer-id
into the log when doing subscribe. - add a test to confirm that even if the error occurs when sending messages to the client, the consumption is still OK.
I have renamed the title of this PR.
Thanks very much. It is an important review.
beb8650
to
1937acd
Compare
…#20568) - Since `cnx.address + consumerId` is the identifier of one consumer; add `consumer-id` into the log when doing subscribe. - add a test to confirm that even if the error occurs when sending messages to the client, the consumption is still OK. - print debug log if ack-command was discarded due to `ConsumerFuture is not complete.` - print debug log if sending a message to the client is failed. (cherry picked from commit a41ac49)
…#20568) - Since `cnx.address + consumerId` is the identifier of one consumer; add `consumer-id` into the log when doing subscribe. - add a test to confirm that even if the error occurs when sending messages to the client, the consumption is still OK. - print debug log if ack-command was discarded due to `ConsumerFuture is not complete.` - print debug log if sending a message to the client is failed.
…apache#20568) - Since `cnx.address + consumerId` is the identifier of one consumer; add `consumer-id` into the log when doing subscribe. - add a test to confirm that even if the error occurs when sending messages to the client, the consumption is still OK. - print debug log if ack-command was discarded due to `ConsumerFuture is not complete.` - print debug log if sending a message to the client is failed. (cherry picked from commit a41ac49) (cherry picked from commit e25d764)
…#20568) - Since `cnx.address + consumerId` is the identifier of one consumer; add `consumer-id` into the log when doing subscribe. - add a test to confirm that even if the error occurs when sending messages to the client, the consumption is still OK. - print debug log if ack-command was discarded due to `ConsumerFuture is not complete.` - print debug log if sending a message to the client is failed.
Modifications
cnx.address + consumerId
is the identifier of one consumer; addconsumer-id
into the log when doing subscribe.ConsumerFuture is not complete.
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x