Skip to content

Commit

Permalink
[fix][broker] Copy proto command fields into final variables in Serve…
Browse files Browse the repository at this point in the history
…rCnx (#18987)

In the `PulsarDecoder`, we use a single `BaseCommand` object and overwrite it for each incoming protocol message. As a result, it is not safe to publish any references to a proto command to other threads.

Here is the single `BaseCommand`:

https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L99

Here is the method call that resets the object:

https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java#L114

Note that the call to `parseFrom` first calls `clear()`, which resets all values on the object.

This PR copies relevant values or objects into other variables.

* Replace `command` with `tcId` since the latter is a final variable meant to be published to another thread.
* Move logic to copy certain command fields to earlier in method for `handleSubscribe`
* Copy `ack` object to new `CommandAck` when there is a broker interceptor. Note that copying this command is likely somewhat costly, so we only do it when there is an interceptor configured.

This is a trivial change that is already covered by tests.

- [x] `doc-not-needed`

This is an internal change.

PR in forked repository: michaeljmarshall#8

(cherry picked from commit a408e9e)
  • Loading branch information
michaeljmarshall committed Dec 21, 2022
1 parent 60a4985 commit 05631bd
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ protected void handleAck(CommandAck ack) {
final boolean hasRequestId = ack.hasRequestId();
final long requestId = hasRequestId ? ack.getRequestId() : 0;
final long consumerId = ack.getConsumerId();
final CommandAck finalAck = getBrokerService().getInterceptor() != null ? new CommandAck().copyFrom(ack) : null;

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
Expand All @@ -1484,7 +1485,7 @@ protected void handleAck(CommandAck ack) {
requestId, null, null, consumerId));
}
if (getBrokerService().getInterceptor() != null) {
getBrokerService().getInterceptor().messageAcked(this, consumer, ack);
getBrokerService().getInterceptor().messageAcked(this, consumer, finalAck);
}
}).exceptionally(e -> {
if (hasRequestId) {
Expand Down

0 comments on commit 05631bd

Please sign in to comment.