diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8c536eaf9b35d..972e1a8de49cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1172,7 +1172,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } - private CompletableFuture checkReplicationAndRetryOnFailure() { + @VisibleForTesting + CompletableFuture checkReplicationAndRetryOnFailure() { CompletableFuture result = new CompletableFuture(); checkReplication().thenAccept(res -> { log.info("[{}] Policies updated successfully", topic); @@ -2183,6 +2184,10 @@ public CompletableFuture onPoliciesUpdate(Policies data) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); } + if (data.deleted) { + log.debug("Ignore the update because it has been deleted : {}", data); + return CompletableFuture.completedFuture(null); + } isEncryptionRequired = data.encryption_required; setSchemaCompatibilityStrategy(data); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 2663f94b79ad0..a89858ecc7a00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -23,15 +23,18 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; -import static org.testng.Assert.assertEquals; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import java.lang.reflect.Field; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Sets; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -43,6 +46,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.Policies; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -156,4 +161,30 @@ public void testUnblockStuckSubscription() throws Exception { msg = consumer2.receive(5, TimeUnit.SECONDS); assertNotNull(msg); } + + @Test + public void testDeleteNamespaceInfiniteRetry() throws Exception { + //init namespace + final String myNamespace = "prop/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry"; + conf.setForceDeleteNamespaceAllowed(true); + //init topic and policies + pulsarClient.newProducer().topic(topic).create().close(); + admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0); + Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() + -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0); + + PersistentTopic persistentTopic = + spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get()); + + Policies policies = new Policies(); + policies.deleted = true; + persistentTopic.onPoliciesUpdate(policies); + verify(persistentTopic, times(0)).checkReplicationAndRetryOnFailure(); + + policies.deleted = false; + persistentTopic.onPoliciesUpdate(policies); + verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure(); + } }