Skip to content

Commit

Permalink
[fix][client] Fix NPE of MultiTopicsConsumerImpl due to race condition (
Browse files Browse the repository at this point in the history
apache#18287)

(cherry picked from commit 516db51)
(cherry picked from commit cc53bd3)
  • Loading branch information
codelipenghui authored and nicoloboschi committed Nov 9, 2022
1 parent 558cce2 commit 6a7c5a5
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
}
// Stop to process the remaining message after the consumer is closed.
if (getState() == State.Closed) {
return;
}
// Process the message, add to the queue and trigger listener or async callback
messages.forEach(msg -> messageReceived(consumer, msg));

Expand Down Expand Up @@ -584,14 +588,14 @@ public CompletableFuture<Void> unsubscribeAsync() {
.map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());

FutureUtil.waitForAll(futureList)
.thenCompose((r) -> {
.thenComposeAsync((r) -> {
setState(State.Closed);
cleanupMultiConsumer();
log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
topic, subscription, consumerName);
// fail all pending-receive futures to notify application
return failPendingReceive();
})
}, internalPinnedExecutor)
.whenComplete((r, ex) -> {
if (ex == null) {
unsubscribeFuture.complete(null);
Expand Down Expand Up @@ -626,13 +630,13 @@ public CompletableFuture<Void> closeAsync() {
.map(ConsumerImpl::closeAsync).collect(Collectors.toList());

FutureUtil.waitForAll(futureList)
.thenCompose((r) -> {
.thenComposeAsync((r) -> {
setState(State.Closed);
cleanupMultiConsumer();
log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
// fail all pending-receive futures to notify application
return failPendingReceive();
})
}, internalPinnedExecutor)
.whenComplete((r, ex) -> {
if (ex == null) {
closeFuture.complete(null);
Expand Down

0 comments on commit 6a7c5a5

Please sign in to comment.