Skip to content

Commit

Permalink
[fix][broker] Copy proto command fields into final variables in Serve…
Browse files Browse the repository at this point in the history
…rCnx (#18987)

In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads.

Here is the single `BaseCommand`:

https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99

Here is the method call that resets the object:

https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114

Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object.

This PR copies relevant values or objects into other variables.

* Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread.
* Move logic to copy certain command fields to earlier in method for `handleSubscribe`
* Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured.

This is a trivial change that is already covered by tests.

- [x] `doc-not-needed`

This is an internal change.

PR in forked repository: michaeljmarshall#8

(cherry picked from commit a408e9e)
  • Loading branch information
michaeljmarshall committed Dec 21, 2022
1 parent 070c356 commit 6b17e99
Showing 1 changed file with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
: emptyKeySharedMeta;
final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());

CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName,
Expand Down Expand Up @@ -1029,14 +1032,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());

final long consumerEpoch;
if (subscribe.hasConsumerEpoch()) {
consumerEpoch = subscribe.getConsumerEpoch();
} else {
consumerEpoch = DEFAULT_CONSUMER_EPOCH;
}
Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
Expand Down Expand Up @@ -1552,6 +1547,7 @@ protected void handleAck(CommandAck ack) {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
Expand All @@ -1561,7 +1557,7 @@ protected void handleAck(CommandAck ack) {
requestId, null, null, consumerId));
}
if (getBrokerService().getInterceptor() != null) {
getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
}
}).exceptionally(e -> {
if (hasRequestId) {
Expand Down

0 comments on commit 6b17e99

Please sign in to comment.