[fix][client] Clean up unacked messages when unsubscribing a topic with ack timeout backoff#25916
Conversation
…iveryTracker on topic unsubscribe Signed-off-by: Dream95 <zhou_8621@163.com>
…yTracker.removeTopicMessages Signed-off-by: Dream95 <zhou_8621@163.com>
Signed-off-by: Dream95 <zhou_8621@163.com>
void-ptr974
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the fix!
The missing cleanup for UnAckedTopicMessageRedeliveryTracker looks correct, and the iterator fix in removeTopicMessages is important.
One non-blocking note: the current getOwnerTopic().contains(topicName) matching seems a bit loose and may match similar topic names, such as my-topic and my-topic-v2. This is pre-existing behavior, so it shouldn’t block this PR.
Would you be interested in addressing this in a follow-up PR?
Thanks for the review. I'll fix that in a follow-up PR. |
Motivation
When a multi-topic consumer unsubscribes from a single topic, the client should drop that topic's unacked messages from the ack-timeout tracker so they are not redelivered later.
MultiTopicsConsumerImplalready calledremoveTopicMessagesforUnAckedTopicMessageTracker, but not forUnAckedTopicMessageRedeliveryTracker, which is used whenackTimeoutRedeliveryBackoffis configured . Unsubscribed topics could therefore leave stale entries in the redelivery partition map and inackTimeoutMessages.Additionally,
UnAckedTopicMessageRedeliveryTracker.removeTopicMessagesused the partition-map iterator when iteratingackTimeoutMessages(iterator.hasNext()/iterator.remove()instead ofiteratorAckTimeOut), so ack-timeout entries were not removed correctly and internal tracker state could be corrupted.Modifications
removeTopicMessagesonUnAckedTopicMessageRedeliveryTrackerwhen a topic is unsubscribed from a multi-topic consumer.UnAckedTopicMessageRedeliveryTracker.removeTopicMessagesto useiteratorAckTimeOut.UnAckedTopicMessageRedeliveryTrackerTestto verifyremoveTopicMessagesclears both the partition map and ack-timeout map.Verifying this change
./gradlew :pulsar-client-original:test --tests org.apache.pulsar.client.impl.UnAckedTopicMessageRedeliveryTrackerTestDoes this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Matching PR in forked repository
PR in forked repository: Dream95#12