Skip to content

Commit

Permalink
[improve][broker] Only get consumer future when interceptor not null (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Dec 22, 2022
1 parent 8990855 commit 74017d5
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3015,19 +3015,17 @@ public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long e
BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount,
ackSet, epoch);
ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
try {
if (brokerInterceptor != null) {
if (brokerInterceptor != null) {
try {
brokerInterceptor.onPulsarCommand(command, this);
}
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
if (brokerInterceptor != null) {
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
brokerInterceptor.messageDispatched(this, consumer, ledgerId, entryId, metadataAndPayload);
}
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
}
} catch (Exception e) {
log.error("Exception occur when intercept messages.", e);
}
return res;
}
Expand Down

0 comments on commit 74017d5

Please sign in to comment.