Skip to content

Commit

Permalink
[fix][broker] Copy proto command fields into final variables in Serve…
Browse files Browse the repository at this point in the history
…rCnx (apache#18987)

### Motivation

In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads.

Here is the single `BaseCommand`:

https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99

Here is the method call that resets the object:

https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114

Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object.

This PR copies relevant values or objects into other variables.

### Modifications

* Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread.
* Move logic to copy certain command fields to earlier in method for `handleSubscribe`
* Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured.

### Verifying this change

This is a trivial change that is already covered by tests.

### Documentation

- [x] `doc-not-needed` 

This is an internal change.

### Matching PR in forked repository

PR in forked repository: michaeljmarshall#8
  • Loading branch information
michaeljmarshall committed Dec 20, 2022
1 parent 3177345 commit a408e9e
Showing 1 changed file with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
: emptyKeySharedMeta;
final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());

if (log.isDebugEnabled()) {
log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName,
Expand Down Expand Up @@ -1091,14 +1094,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
return null;
}

final long consumerEpoch;
if (subscribe.hasConsumerEpoch()) {
consumerEpoch = subscribe.getConsumerEpoch();
} else {
consumerEpoch = DEFAULT_CONSUMER_EPOCH;
}
Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());
service.isAllowAutoTopicCreationAsync(topicName.toString())
.thenApply(isAllowed -> forceTopicCreation && isAllowed)
.thenCompose(createTopicIfDoesNotExist ->
Expand Down Expand Up @@ -1636,6 +1631,7 @@ protected void handleAck(CommandAck ack) {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
Expand All @@ -1644,7 +1640,7 @@ protected void handleAck(CommandAck ack) {
ctx.writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
}).exceptionally(e -> {
if (hasRequestId) {
ctx.writeAndFlush(Commands.newAckResponse(requestId,
Expand Down Expand Up @@ -2333,7 +2329,7 @@ protected void handleNewTxn(CommandNewTxn command) {
if (log.isDebugEnabled()) {
log.debug("Send response {} for new txn request {}", tcId.getId(), requestId);
}
commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId());
commandSender.sendNewTxnResponse(requestId, txnID, tcId.getId());
} else {
if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) {
// if new txn throw ReachMaxActiveTxnException, don't return any response to client,
Expand Down

0 comments on commit a408e9e

Please sign in to comment.