diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c1e17159eb738..25090989db298 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1303,12 +1303,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t OffloadPoliciesImpl topicLevelOffloadPolicies = null; if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) { - TopicName cloneTopicName = topicName; - if (topicName.isPartitioned()) { - cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); - } try { - TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName); + TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName); if (topicPolicies != null) { persistencePolicies = topicPolicies.getPersistence(); retentionPolicies = topicPolicies.getRetentionPolicies(); @@ -2573,12 +2569,8 @@ public Optional getTopicPolicies(TopicName topicName) { if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) { return Optional.empty(); } - TopicName cloneTopicName = topicName; - if (topicName.isPartitioned()) { - cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); - } try { - return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName)); + return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName)); } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { log.debug("Topic {} policies have not been initialized yet.", topicName.getPartitionedTopicName()); return Optional.empty(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 6a5d0d62a9903..0e0fe790c22f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -91,7 +91,7 @@ public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, Top .domain(topicName.getDomain().toString()) .tenant(topicName.getTenant()) .namespace(topicName.getNamespaceObject().getLocalName()) - .topic(topicName.getLocalName()) + .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()) .policies(policies) .build()) .build()).whenComplete(((messageId, e) -> { @@ -147,7 +147,7 @@ public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesC && !policyCacheInitMap.get(topicName.getNamespaceObject())) { throw new TopicPoliciesCacheNotInitException(); } - return policiesCache.get(topicName); + return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 3934d3b342c4e..d8c0814738b34 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -18,8 +18,23 @@ */ package org.apache.pulsar.broker.admin; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -42,6 +57,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; @@ -61,23 +77,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - @Slf4j @Test(groups = "broker") public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { @@ -2144,6 +2143,30 @@ public void testAutoCreationDisabled() throws Exception { assertNull(admin.topics().getMessageTTL(topic)); } + @Test + public void testSubscriptionTypesWithPartitionedTopic() throws Exception { + final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); + admin.topics().createPartitionedTopic(topic, 1); + pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close(); + Awaitility.await() + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + Set subscriptionTypeSet = new HashSet<>(); + subscriptionTypeSet.add(SubscriptionType.Key_Shared); + admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet); + Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionTypesEnabled(topic))); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopicReference(TopicName.get(topic).getPartition(0).toString()).get(); + Set old = new HashSet<>(pulsar.getConfiguration().getSubscriptionTypesEnabled()); + try { + pulsar.getConfiguration().getSubscriptionTypesEnabled().clear(); + assertTrue(persistentTopic.checkSubscriptionTypesEnable(CommandSubscribe.SubType.Key_Shared)); + } finally { + //restore + pulsar.getConfiguration().getSubscriptionTypesEnabled().addAll(old); + } + } + @Test(timeOut = 30000) public void testSubscriptionTypesEnabled() throws Exception { final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();