diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ecf99a426dbd4..a1d41b6c6a000 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -302,7 +302,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { - log.debug("[{}] Opened ledger {}: ", name, id, BKException.getMessage(rc)); + log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc)); } if (rc == BKException.Code.OK) { LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(id) @@ -1986,7 +1986,7 @@ void internalTrimConsumedLedgers(CompletableFuture promise) { overRetentionQuota, currentLedger.getId()); } if (ls.getLedgerId() == currentLedger.getId()) { - log.debug("[{}] ledger id skipped for deletion as it is currently being written to", name, + log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name, ls.getLedgerId()); break; } else if (expired) { @@ -2146,7 +2146,7 @@ public void deleteCursorComplete(Object ctx) { @Override public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to delete cursor {}", name, cursor, exception); + log.warn("[{}] Failed to delete cursor {} : {}", name, cursor, exception); cursorDeleteException.compareAndSet(null, exception); if (cursorsToDelete.decrementAndGet() == 0) { // Trigger callback only once @@ -2185,7 +2185,7 @@ private void asyncDeleteLedger(long ledgerId, long retry) { if (isNoSuchLedgerExistsException(rc)) { log.warn("[{}] Ledger was already deleted {}", name, ledgerId); } else if (rc != BKException.Code.OK) { - log.error("[{}] Error deleting ledger {}", name, ledgerId, BKException.getMessage(rc)); + log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); scheduledExecutor.schedule(safeRun(() -> { asyncDeleteLedger(ledgerId, retry - 1); }), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d74017c4e6a88..92ce81d9c04f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -349,7 +349,7 @@ private void revokePermissions(String topicUri, String role) { if (!policies.auth_policies.destination_auth.containsKey(topicUri) || !policies.auth_policies.destination_auth.get(topicUri).containsKey(role)) { - log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level", + log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(), role, topicUri); throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d4e90be705d99..62876f014561a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -446,8 +446,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle, lookupFuture.complete(Optional.of(new LookupResult(ownerInfo))); } }).exceptionally(exception -> { - LOG.warn("Failed to acquire ownership for namespace bundle {}: ", bundle, exception.getMessage(), - exception); + LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", bundle, exception); lookupFuture.completeExceptionally(new PulsarServerException( "Failed to acquire ownership for namespace bundle " + bundle, exception)); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 9294330c87f42..1037952755d36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -216,7 +217,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { consumer.close(); } catch (BrokerServiceException e) { - log.warn("Consumer {} was already closed: {}", consumer, e.getMessage(), e); + log.warn("Consumer {} was already closed: {}", consumer, e); } }); } @@ -232,14 +233,14 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (state != State.Failed) { // No need to report stack trace for known exceptions that happen in disconnections - log.warn("[{}] Got exception {} : {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), - ClientCnx.isKnownException(cause) ? null : cause); + log.warn("[{}] Got exception {}", remoteAddress, + ClientCnx.isKnownException(cause) ? cause : ExceptionUtils.getStackTrace(cause)); state = State.Failed; } else { // At default info level, suppress all subsequent exceptions that are thrown when the connection has already // failed if (log.isDebugEnabled()) { - log.debug("[{}] Got exception: {}", remoteAddress, cause.getMessage(), cause); + log.debug("[{}] Got exception: {}", remoteAddress, cause); } } ctx.close(); @@ -582,8 +583,7 @@ public void refreshAuthenticationCredentials() { pendingAuthChallengeResponse = true; } catch (AuthenticationException e) { - log.warn("[{}] Failed to refresh authentication: ", - remoteAddress, e.getMessage()); + log.warn("[{}] Failed to refresh authentication: {}", remoteAddress, e); ctx.close(); } })); @@ -1391,7 +1391,7 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { ctx.writeAndFlush(Commands.newSuccess(requestId)); log.info("[{}] Closed consumer {}", remoteAddress, consumer); } catch (BrokerServiceException e) { - log.warn("[{]] Error closing consumer: ", remoteAddress, consumer, e); + log.warn("[{]] Error closing consumer {} : {}", remoteAddress, consumer, e); ctx.writeAndFlush( Commands.newError(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage())); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 9bcd2874dc1bc..1ae209d4a305a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -515,7 +515,7 @@ public void markDeleteComplete(Object ctx) { public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { // TODO: cut consumer connection on markDeleteFailed if (log.isDebugEnabled()) { - log.debug("[{}][{}] Failed to mark delete for position ", topicName, subName, ctx, exception); + log.debug("[{}][{}] Failed to mark delete for position {}: {}", topicName, subName, ctx, exception); } } }; @@ -530,7 +530,7 @@ public void deleteComplete(Object position) { @Override public void deleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}][{}] Failed to delete message at {}", topicName, subName, ctx, exception); + log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception); } }; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index dfa60e8294df5..3205608ca7ad1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -50,6 +50,7 @@ import javax.net.ssl.SSLSession; import lombok.Getter; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.pulsar.PulsarVersion; @@ -261,8 +262,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (state != State.Failed) { // No need to report stack trace for known exceptions that happen in disconnections - log.warn("[{}] Got exception {} : {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), - isKnownException(cause) ? null : cause); + log.warn("[{}] Got exception {}", remoteAddress, + ClientCnx.isKnownException(cause) ? cause : ExceptionUtils.getStackTrace(cause)); state = State.Failed; } else { // At default info level, suppress all subsequent exceptions that are thrown when the connection has already diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 970e1343c3adb..558282ae0108f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1094,12 +1094,12 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa CompletableFuture future = new CompletableFuture<>(); client.getPartitionsForTopic(topicName).thenCompose(list -> { - int oldPartitionNumber = topics.get(topicName.toString()); + int oldPartitionNumber = topics.get(topicName); int currentPartitionNumber = list.size(); if (log.isDebugEnabled()) { log.debug("[{}] partitions number. old: {}, new: {}", - topicName.toString(), oldPartitionNumber, currentPartitionNumber); + topicName, oldPartitionNumber, currentPartitionNumber); } if (oldPartitionNumber == currentPartitionNumber) { @@ -1123,7 +1123,7 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); if (log.isDebugEnabled()) { log.debug("[{}] create consumer {} for partitionName: {}", - topicName.toString(), newConsumer.getTopic(), partitionName); + topicName, newConsumer.getTopic(), partitionName); } return subFuture; }) @@ -1140,14 +1140,14 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa future.complete(null); }) .exceptionally(ex -> { - log.warn("[{}] Failed to subscribe {} partition: {} - {}", - topic, topicName.toString(), oldPartitionNumber, currentPartitionNumber, ex.getMessage()); + log.warn("[{}] Failed to subscribe {} partition: {} - {} : {}", + topic, topicName, oldPartitionNumber, currentPartitionNumber, ex); future.completeExceptionally(ex); return null; }); } else { log.error("[{}] not support shrink topic partitions. old: {}, new: {}", - topicName.toString(), oldPartitionNumber, currentPartitionNumber); + topicName, oldPartitionNumber, currentPartitionNumber); future.completeExceptionally(new NotSupportedException("not support shrink topic partitions")); } return future; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 7a99fe1658691..f8584ab28f598 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -140,7 +140,7 @@ void handleNewTxnResponse(PulsarApi.CommandNewTxnResponse response) { public CompletableFuture addPublishPartitionToTxnAsync(TxnID txnID, List partitions) { if (LOG.isDebugEnabled()) { - LOG.debug("Add publish partition to txn request with txnId, with partitions", txnID, partitions); + LOG.debug("Add publish partition {} to txn {}", partitions, txnID); } CompletableFuture callback = new CompletableFuture<>();