-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-broker] Fix bug that message delivery stops after resetting cursor for failover subscription #5185
Conversation
rerun java8 tests |
3 similar comments
rerun java8 tests |
rerun java8 tests |
rerun java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
rerun java8 tests |
4 similar comments
rerun java8 tests |
rerun java8 tests |
rerun java8 tests |
rerun java8 tests |
0b7df68
to
e664868
Compare
rerun java8 tests |
2 similar comments
rerun java8 tests |
rerun java8 tests |
rerun java8 tests |
…ursor for failover subscription (#5185) ### Motivation Resetting the cursor for a subscription in Failover mode may cause message delivery to stop. This can be reproduced with the following procedure: 1. Connect multiple consumers to a subscription in Failover mode 1. Reset the subscription cursor to a past position 1. Close some consumers 1. The remaining consumers may not receive new messages from the topic At this time, the active consumer is already closed one: ```js "subscriptions" : { "sub1" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "msgBacklog" : 57604, "blockedSubscriptionOnUnackedMsgs" : false, "unackedMessages" : 0, "type" : "Failover", "activeConsumerName" : "04b6c", // This consumer is already closed! "msgRateExpired" : 0.0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "06317b", "availablePermits" : 564, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:25.413+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:36968" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "37edc", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:27.77+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:38392" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "822f0", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:27.769+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:38380" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "msgRateRedeliver" : 0.0, "consumerName" : "b91282", "availablePermits" : 1000, "unackedMessages" : 0, "blockedConsumerOnUnackedMsgs" : false, "metadata" : { }, "connectedSince" : "2019-09-11T18:56:25.413+09:00", "clientVersion" : "2.3.2", "address" : "/xxx.xxx.xxx.xxx:38408" } ] } }, ``` This is because `AbstractDispatcherSingleActiveConsumer#closeFuture` is not null, so `pickAndScheduleActiveConsumer()` is not called and the active consumer does not change. https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L181-L184 `closeFuture` becomes non-null when `disconnectAllConsumers()` is called. And once a value is assigned, it will never return to null. https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L217-L218 `disconnectAllConsumers()` is called when unloading or deleting a topic, as well as when resetting the cursor. ### Modifications Added `resetCloseFuture()` method to the Dispatcher classes to return `closeFuture` to null when resetting cursor is completed. (cherry picked from commit 499069e)
Motivation
Resetting the cursor for a subscription in Failover mode may cause message delivery to stop. This can be reproduced with the following procedure:
At this time, the active consumer is already closed one:
This is because
AbstractDispatcherSingleActiveConsumer#closeFuture
is not null, sopickAndScheduleActiveConsumer()
is not called and the active consumer does not change.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
Lines 181 to 184 in 8c3445a
closeFuture
becomes non-null whendisconnectAllConsumers()
is called. And once a value is assigned, it will never return to null.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
Lines 217 to 218 in 8c3445a
disconnectAllConsumers()
is called when unloading or deleting a topic, as well as when resetting the cursor.Modifications
Added
resetCloseFuture()
method to the Dispatcher classes to returncloseFuture
to null when resetting cursor is completed.