-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[broker] Fix topic policy listener deleted by mistake. #12904
[broker] Fix topic policy listener deleted by mistake. #12904
Conversation
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
if (brokerService.pulsar().getConfig().isSystemTopicEnabled() | ||
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { | ||
brokerService.getPulsar().getTopicPoliciesService() | ||
.unregisterListener(TopicName.getPartitionedTopicName(topic), this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In current unregisterListener
implementation, it will make listeners key keep growing even thought the listener unregister called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have optimized the unregister implementation in #12654. See the last line of the unit test,
listMap.get(topicName)
returns null instead of empty map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job.
### Motivation Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions. - Register: call TopicPoliciesService.registerListener("A", listener), for all 3 partitions of topic "A". - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the problem it will delete all listeners of all partitions of topic "A", if any partition is closed. This means, if we calls `admin.topics().unload("A-partition-0")`, "A-partition-1" and "A-partition-2" will not be able to receive topic policy update callbacks any more. A detailed case is designed in the new unit test `testListenerCleanupByPartition`. ### Modifications With previous optimization of apache#12654 , now we can use `org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to do the clean up.
### Motivation `TopicPoliciesService#clean` was introduced to solve listener leaking problem in #10466. And this is already addressed in #12904 using `unregisterListener`. So this method is no use any more, and I don't see any situations that we may use this again. ### Modifications Delete TopicPoliciesService#clean
### Motivation Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions. - Register: call TopicPoliciesService.registerListener("A", listener), for all 3 partitions of topic "A". - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the problem it will delete all listeners of all partitions of topic "A", if any partition is closed. This means, if we calls `admin.topics().unload("A-partition-0")`, "A-partition-1" and "A-partition-2" will not be able to receive topic policy update callbacks any more. A detailed case is designed in the new unit test `testListenerCleanupByPartition`. ### Modifications With previous optimization of #12654 , now we can use `org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to do the clean up. (cherry picked from commit a0c96a0)
### Motivation Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions. - Register: call TopicPoliciesService.registerListener("A", listener), for all 3 partitions of topic "A". - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the problem it will delete all listeners of all partitions of topic "A", if any partition is closed. This means, if we calls `admin.topics().unload("A-partition-0")`, "A-partition-1" and "A-partition-2" will not be able to receive topic policy update callbacks any more. A detailed case is designed in the new unit test `testListenerCleanupByPartition`. ### Modifications With previous optimization of apache#12654 , now we can use `org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to do the clean up.
…#12913) ### Motivation `TopicPoliciesService#clean` was introduced to solve listener leaking problem in apache#10466. And this is already addressed in apache#12904 using `unregisterListener`. So this method is no use any more, and I don't see any situations that we may use this again. ### Modifications Delete TopicPoliciesService#clean
### Motivation Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions. - Register: call TopicPoliciesService.registerListener("A", listener), for all 3 partitions of topic "A". - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the problem it will delete all listeners of all partitions of topic "A", if any partition is closed. This means, if we calls `admin.topics().unload("A-partition-0")`, "A-partition-1" and "A-partition-2" will not be able to receive topic policy update callbacks any more. A detailed case is designed in the new unit test `testListenerCleanupByPartition`. ### Modifications With previous optimization of apache#12654 , now we can use `org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to do the clean up.
…#12913) ### Motivation `TopicPoliciesService#clean` was introduced to solve listener leaking problem in apache#10466. And this is already addressed in apache#12904 using `unregisterListener`. So this method is no use any more, and I don't see any situations that we may use this again. ### Modifications Delete TopicPoliciesService#clean
### Motivation Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions. - Register: call TopicPoliciesService.registerListener("A", listener), for all 3 partitions of topic "A". - Clean: call TopicPoliciesService.clean("A-partition-x"), here is the problem it will delete all listeners of all partitions of topic "A", if any partition is closed. This means, if we calls `admin.topics().unload("A-partition-0")`, "A-partition-1" and "A-partition-2" will not be able to receive topic policy update callbacks any more. A detailed case is designed in the new unit test `testListenerCleanupByPartition`. ### Modifications With previous optimization of #12654 , now we can use `org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener` to do the clean up. (cherry picked from commit a0c96a0)
Motivation
Here is the current way of dealing topic policy listeners in PersistentTopic, for example topic name is "A", with 3 partitions.
This means, if we calls
admin.topics().unload("A-partition-0")
, "A-partition-1" and "A-partition-2" will not be able to receive topic policy update callbacks any more.A detailed case is designed in the new unit test
testListenerCleanupByPartition
.Modifications
With previous optimization of #12654 , now we can use
org.apache.pulsar.broker.service.TopicPoliciesService#unregisterListener
to do the clean up.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
no-need-doc
Bug fix.