diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 0749c3d9f2c86..8cbbde41faa90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -371,5 +371,26 @@ public void unregisterListener(TopicName topicName, TopicPolicyListener Lists.newCopyOnWriteArrayList()).remove(listener); } + @Override + public void clean(TopicName topicName) { + TopicName realTopicName = topicName; + if (topicName.isPartitioned()) { + //change persistent://tenant/namespace/xxx-partition-0 to persistent://tenant/namespace/xxx + realTopicName = TopicName.get(topicName.getPartitionedTopicName()); + } + policiesCache.remove(realTopicName); + listeners.remove(realTopicName); + } + + @VisibleForTesting + protected Map getPoliciesCache() { + return policiesCache; + } + + @VisibleForTesting + protected Map>> getListeners() { + return listeners; + } + private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index c896dc49fd024..dd6bf1b9a85c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -84,6 +84,14 @@ public interface TopicPoliciesService { void unregisterListener(TopicName topicName, TopicPolicyListener listener); + /** + * clean cache and listeners in TopicPolicies and so on. + * @param topicName + */ + default void clean(TopicName topicName) { + throw new UnsupportedOperationException("Clean is not supported by default"); + } + class TopicPoliciesServiceDisabled implements TopicPoliciesService { @Override @@ -132,5 +140,10 @@ public void registerListener(TopicName topicName, TopicPolicyListener listener) { //No-op } + + @Override + public void clean(TopicName topicName) { + //No-op + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 972e1a8de49cc..3594a45aa923c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1057,8 +1057,7 @@ public void deleteLedgerComplete(Object ctx) { subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - brokerService.pulsar().getTopicPoliciesService() - .unregisterListener(TopicName.get(topic), getPersistentTopic()); + brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic)); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); } @@ -1149,8 +1148,7 @@ public void closeComplete(Object ctx) { subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - brokerService.pulsar().getTopicPoliciesService() - .unregisterListener(TopicName.get(topic), getPersistentTopic()); + brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic)); log.info("[{}] Topic closed", topic); closeFuture.complete(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index e2cf04c8ed7af..4d2b8cf3f003f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -28,12 +28,19 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ExecutionException; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.AssertJUnit.assertNull; @Test(groups = "broker") public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest { @@ -174,6 +181,29 @@ public void testGetPolicy() throws ExecutionException, InterruptedException, Top Assert.assertEquals(policies1, policiesGet1); } + @Test + public void testCacheCleanup() throws Exception { + final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID(); + TopicName topicName = TopicName.get(topic); + admin.topics().createPartitionedTopic(topic, 3); + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().untilAsserted(() + -> systemTopicBasedTopicPoliciesService.cacheIsInitialized(topicName)); + admin.topics().setMaxConsumers(topic, 1000); + Awaitility.await().untilAsserted(() -> + assertNotNull(admin.topics().getMaxConsumers(topic))); + Map map = systemTopicBasedTopicPoliciesService.getPoliciesCache(); + Map>> listMap = + systemTopicBasedTopicPoliciesService.getListeners(); + assertNotNull(map.get(topicName)); + assertEquals(map.get(topicName).getMaxConsumerPerTopic().intValue(), 1000); + assertNotNull(listMap.get(topicName).get(0)); + + admin.topics().deletePartitionedTopic(topic, true); + assertNull(map.get(topicName)); + assertNull(listMap.get(topicName)); + } + private void prepareData() throws PulsarAdminException { admin.clusters().createCluster("test", new ClusterData(pulsar.getBrokerServiceUrl())); admin.tenants().createTenant("system-topic",