Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug log for WebSocket. #12458

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ public void onWebSocketText(String message) {

// Check and notify consumer if reached end of topic.
private void handleEndOfTopic() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received check reach the end of topic request from {} ", consumer.getTopic(),
subscription, getRemote().getInetSocketAddress().toString());
}
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(
new EndOfTopicResponse(consumer.hasReachedEndOfTopic()));
Expand Down Expand Up @@ -259,6 +263,10 @@ public void writeSuccess() {
}

private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received unsubscribe request from {} ", consumer.getTopic(),
subscription, getRemote().getInetSocketAddress().toString());
}
consumer.unsubscribe();
}

Expand All @@ -276,18 +284,30 @@ private void handleAck(ConsumerCommand command) throws IOException {
// We should have received an ack
MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
topic.toString());
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
}
consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
checkResumeReceive();
}

private void handleNack(ConsumerCommand command) throws IOException {
MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
topic.toString());
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received negative ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
}
consumer.negativeAcknowledge(msgId);
checkResumeReceive();
}

private void handlePermit(ConsumerCommand command) throws IOException {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received {} permits request from {} ", consumer.getTopic(),
subscription, command.permitMessages, getRemote().getInetSocketAddress().toString());
}
if (command.permitMessages == null) {
throw new IOException("Missing required permitMessages field for 'permit' command");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public void close() throws IOException {

@Override
public void onWebSocketText(String message) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received new message from producer {} ", producer.getTopic(),
getRemote().getInetSocketAddress().toString());
}
ProducerMessage sendRequest;
byte[] rawPayload = null;
String requestContext = null;
Expand Down Expand Up @@ -188,6 +192,10 @@ public void onWebSocketText(String message) {
final long now = System.nanoTime();

builder.sendAsync().thenAccept(msgId -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Success fully write the message to broker with returned message ID {} from producer {}",
producer.getTopic(), msgId, getRemote().getInetSocketAddress().toString());
}
updateSentMsgStats(msgSize, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - now));
if (isConnected()) {
String messageId = Base64.getEncoder().encodeToString(msgId.toByteArray());
Expand Down