From 6b17e9958752c7bffc26d1a8522f7959d035d550 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 19 Dec 2022 23:59:35 -0600 Subject: [PATCH] [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987) In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads. Here is the single `BaseCommand`: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99 Here is the method call that resets the object: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114 Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object. This PR copies relevant values or objects into other variables. * Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread. * Move logic to copy certain command fields to earlier in method for `handleSubscribe` * Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured. This is a trivial change that is already covered by tests. - [x] `doc-not-needed` This is an internal change. PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/8 (cherry picked from commit a408e9e392d48dcda7c17cd9b9e85e530c94998d) --- .../apache/pulsar/broker/service/ServerCnx.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d18e154f3795c..9e04dd9ccdeac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -967,6 +967,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) : emptyKeySharedMeta; + final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH; + final Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( + subscribe.getSubscriptionPropertiesList()); CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( topicName, @@ -1029,14 +1032,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { boolean createTopicIfDoesNotExist = forceTopicCreation && service.isAllowAutoTopicCreation(topicName.toString()); - final long consumerEpoch; - if (subscribe.hasConsumerEpoch()) { - consumerEpoch = subscribe.getConsumerEpoch(); - } else { - consumerEpoch = DEFAULT_CONSUMER_EPOCH; - } - Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( - subscribe.getSubscriptionPropertiesList()); service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { @@ -1552,6 +1547,7 @@ protected void handleAck(CommandAck ack) { final boolean hasRequestId = ack.hasRequestId(); final long requestId = hasRequestId ? ack.getRequestId() : 0; final long consumerId = ack.getConsumerId(); + final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null; if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); @@ -1561,7 +1557,7 @@ protected void handleAck(CommandAck ack) { requestId, null, null, consumerId)); } if (getBrokerService().getInterceptor() != null) { - getBrokerService().getInterceptor().messageAcked(this, consumer, ack); + getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck); } }).exceptionally(e -> { if (hasRequestId) {