From 4e1e06a6c9fd3f14d516f27df75c51e967bc8a6a Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sun, 25 Apr 2021 07:56:35 +0800 Subject: [PATCH] Fix CPU 100% when deleting namespace (#10337) ### Motivation When deleting the namespace, the namespace Policies will be marked as deleted. This will trigger topic's `onPoliciesUpdate` However, in onPoliciesUpdate, the data of the Policies node on zk will be read, such as: `checkReplicationAndRetryOnFailure` Due to the deletion of the namespace, the zk node may no longer exist at this time. Failure to read data will trigger infinite retries. https://github.com/apache/pulsar/blob/e970c2947aff9231202ab72bdbad047d85c55633/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1175-L1193 If there are many topics, there will be a short-term CPU spike ![image](https://user-images.githubusercontent.com/9758905/115834541-ebc32480-a447-11eb-887a-95c4a3d1adf1.png) --- .../service/persistent/PersistentTopic.java | 7 +++- .../persistent/PersistentTopicTest.java | 35 +++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8c536eaf9b35d..972e1a8de49cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1172,7 +1172,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } - private CompletableFuture checkReplicationAndRetryOnFailure() { + @VisibleForTesting + CompletableFuture checkReplicationAndRetryOnFailure() { CompletableFuture result = new CompletableFuture(); checkReplication().thenAccept(res -> { log.info("[{}] Policies updated successfully", topic); @@ -2183,6 +2184,10 @@ public CompletableFuture onPoliciesUpdate(Policies data) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); } + if (data.deleted) { + log.debug("Ignore the update because it has been deleted : {}", data); + return CompletableFuture.completedFuture(null); + } isEncryptionRequired = data.encryption_required; setSchemaCompatibilityStrategy(data); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 2663f94b79ad0..a89858ecc7a00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -23,15 +23,18 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; -import static org.testng.Assert.assertEquals; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import java.lang.reflect.Field; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Sets; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -43,6 +46,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.Policies; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -156,4 +161,30 @@ public void testUnblockStuckSubscription() throws Exception { msg = consumer2.receive(5, TimeUnit.SECONDS); assertNotNull(msg); } + + @Test + public void testDeleteNamespaceInfiniteRetry() throws Exception { + //init namespace + final String myNamespace = "prop/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry"; + conf.setForceDeleteNamespaceAllowed(true); + //init topic and policies + pulsarClient.newProducer().topic(topic).create().close(); + admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0); + Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() + -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0); + + PersistentTopic persistentTopic = + spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get()); + + Policies policies = new Policies(); + policies.deleted = true; + persistentTopic.onPoliciesUpdate(policies); + verify(persistentTopic, times(0)).checkReplicationAndRetryOnFailure(); + + policies.deleted = false; + persistentTopic.onPoliciesUpdate(policies); + verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure(); + } }