diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c1b475c5c3318..af7488bd652b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1027,6 +1027,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) : emptyKeySharedMeta; + final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH; + final Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( + subscribe.getSubscriptionPropertiesList()); if (log.isDebugEnabled()) { log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName, @@ -1091,14 +1094,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { return null; } - final long consumerEpoch; - if (subscribe.hasConsumerEpoch()) { - consumerEpoch = subscribe.getConsumerEpoch(); - } else { - consumerEpoch = DEFAULT_CONSUMER_EPOCH; - } - Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( - subscribe.getSubscriptionPropertiesList()); service.isAllowAutoTopicCreationAsync(topicName.toString()) .thenApply(isAllowed -> forceTopicCreation && isAllowed) .thenCompose(createTopicIfDoesNotExist -> @@ -1636,6 +1631,7 @@ protected void handleAck(CommandAck ack) { final boolean hasRequestId = ack.hasRequestId(); final long requestId = hasRequestId ? ack.getRequestId() : 0; final long consumerId = ack.getConsumerId(); + final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null; if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); @@ -1644,7 +1640,7 @@ protected void handleAck(CommandAck ack) { ctx.writeAndFlush(Commands.newAckResponse( requestId, null, null, consumerId)); } - getBrokerService().getInterceptor().messageAcked(this, consumer, ack); + getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck); }).exceptionally(e -> { if (hasRequestId) { ctx.writeAndFlush(Commands.newAckResponse(requestId, @@ -2333,7 +2329,7 @@ protected void handleNewTxn(CommandNewTxn command) { if (log.isDebugEnabled()) { log.debug("Send response {} for new txn request {}", tcId.getId(), requestId); } - commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId()); + commandSender.sendNewTxnResponse(requestId, txnID, tcId.getId()); } else { if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) { // if new txn throw ReachMaxActiveTxnException, don't return any response to client,