diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d18e154f3795c..9e04dd9ccdeac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -967,6 +967,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) : emptyKeySharedMeta; + final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH; + final Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( + subscribe.getSubscriptionPropertiesList()); CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( topicName, @@ -1029,14 +1032,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { boolean createTopicIfDoesNotExist = forceTopicCreation && service.isAllowAutoTopicCreation(topicName.toString()); - final long consumerEpoch; - if (subscribe.hasConsumerEpoch()) { - consumerEpoch = subscribe.getConsumerEpoch(); - } else { - consumerEpoch = DEFAULT_CONSUMER_EPOCH; - } - Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( - subscribe.getSubscriptionPropertiesList()); service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { @@ -1552,6 +1547,7 @@ protected void handleAck(CommandAck ack) { final boolean hasRequestId = ack.hasRequestId(); final long requestId = hasRequestId ? ack.getRequestId() : 0; final long consumerId = ack.getConsumerId(); + final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null; if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); @@ -1561,7 +1557,7 @@ protected void handleAck(CommandAck ack) { requestId, null, null, consumerId)); } if (getBrokerService().getInterceptor() != null) { - getBrokerService().getInterceptor().messageAcked(this, consumer, ack); + getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck); } }).exceptionally(e -> { if (hasRequestId) {