Skip to content

Commit

Permalink
Fix CPU 100% when deleting namespace (#10337)
Browse files Browse the repository at this point in the history
### Motivation
When deleting the namespace, the namespace Policies will be marked as deleted.
This will trigger topic's `onPoliciesUpdate`
However, in onPoliciesUpdate, the data of the Policies node on zk will be read, such as: `checkReplicationAndRetryOnFailure`
Due to the deletion of the namespace, the zk node may no longer exist at this time. 
Failure to read data will trigger infinite retries.
https://github.com/apache/pulsar/blob/e970c2947aff9231202ab72bdbad047d85c55633/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1175-L1193

If there are many topics, there will be a short-term CPU spike

![image](https://user-images.githubusercontent.com/9758905/115834541-ebc32480-a447-11eb-887a-95c4a3d1adf1.png)
  • Loading branch information
315157973 committed Apr 24, 2021
1 parent 429b023 commit 4e1e06a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
return closeFuture;
}

private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
@VisibleForTesting
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
Expand Down Expand Up @@ -2183,6 +2184,10 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
data.encryption_required);
}
if (data.deleted) {
log.debug("Ignore the update because it has been deleted : {}", data);
return CompletableFuture.completedFuture(null);
}
isEncryptionRequired = data.encryption_required;

setSchemaCompatibilityStrategy(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.lang.reflect.Field;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.collect.Sets;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
Expand All @@ -43,6 +46,8 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -156,4 +161,30 @@ public void testUnblockStuckSubscription() throws Exception {
msg = consumer2.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
}

@Test
public void testDeleteNamespaceInfiniteRetry() throws Exception {
//init namespace
final String myNamespace = "prop/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry";
conf.setForceDeleteNamespaceAllowed(true);
//init topic and policies
pulsarClient.newProducer().topic(topic).create().close();
admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
-> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);

PersistentTopic persistentTopic =
spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get());

Policies policies = new Policies();
policies.deleted = true;
persistentTopic.onPoliciesUpdate(policies);
verify(persistentTopic, times(0)).checkReplicationAndRetryOnFailure();

policies.deleted = false;
persistentTopic.onPoliciesUpdate(policies);
verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure();
}
}

0 comments on commit 4e1e06a

Please sign in to comment.