From a408e9e392d48dcda7c17cd9b9e85e530c94998d Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 19 Dec 2022 23:59:35 -0600 Subject: [PATCH] [fix][broker] Copy proto command fields into final variables in ServerCnx (#18987) ### Motivation In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads. Here is the single `BaseCommand`: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99 Here is the method call that resets the object: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114 Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object. This PR copies relevant values or objects into other variables. ### Modifications * Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread. * Move logic to copy certain command fields to earlier in method for `handleSubscribe` * Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured. ### Verifying this change This is a trivial change that is already covered by tests. ### Documentation - [x] `doc-not-needed` This is an internal change. ### Matching PR in forked repository PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/8 --- .../apache/pulsar/broker/service/ServerCnx.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index c1b475c5c3318..af7488bd652b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1027,6 +1027,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) : emptyKeySharedMeta; + final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH; + final Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( + subscribe.getSubscriptionPropertiesList()); if (log.isDebugEnabled()) { log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName, @@ -1091,14 +1094,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { return null; } - final long consumerEpoch; - if (subscribe.hasConsumerEpoch()) { - consumerEpoch = subscribe.getConsumerEpoch(); - } else { - consumerEpoch = DEFAULT_CONSUMER_EPOCH; - } - Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( - subscribe.getSubscriptionPropertiesList()); service.isAllowAutoTopicCreationAsync(topicName.toString()) .thenApply(isAllowed -> forceTopicCreation && isAllowed) .thenCompose(createTopicIfDoesNotExist -> @@ -1636,6 +1631,7 @@ protected void handleAck(CommandAck ack) { final boolean hasRequestId = ack.hasRequestId(); final long requestId = hasRequestId ? ack.getRequestId() : 0; final long consumerId = ack.getConsumerId(); + final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null; if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); @@ -1644,7 +1640,7 @@ protected void handleAck(CommandAck ack) { ctx.writeAndFlush(Commands.newAckResponse( requestId, null, null, consumerId)); } - getBrokerService().getInterceptor().messageAcked(this, consumer, ack); + getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck); }).exceptionally(e -> { if (hasRequestId) { ctx.writeAndFlush(Commands.newAckResponse(requestId, @@ -2333,7 +2329,7 @@ protected void handleNewTxn(CommandNewTxn command) { if (log.isDebugEnabled()) { log.debug("Send response {} for new txn request {}", tcId.getId(), requestId); } - commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId()); + commandSender.sendNewTxnResponse(requestId, txnID, tcId.getId()); } else { if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) { // if new txn throw ReachMaxActiveTxnException, don't return any response to client,