Skip to content

Commit

Permalink
[ Issue 13479 ] Fixed internal topic effect by InactiveTopicPolicy. (a…
Browse files Browse the repository at this point in the history
…pache#13611)

(cherry picked from commit 5835191)
  • Loading branch information
mattisonchao authored and lhotari committed May 5, 2022
1 parent 60dec95 commit 39d57df
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
public class BrokersBase extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
private int serviceConfigZkVersion = -1;
public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
// log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
// to prevent excessive logging
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
Expand Down Expand Up @@ -313,7 +314,7 @@ public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception

String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
String topic = String.format("persistent://%s/healthcheck", heartbeatNamespace);
String topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX);

PulsarClient client = pulsar().getClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerServi
super(topic, ledger, brokerService);
}

@Override
public boolean isDeleteWhileInactive() {
return false;
}

@Override
public boolean isBacklogExceeded() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.pulsar.broker.systopic;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.BrokersBase;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;

import java.io.IOException;
Expand Down Expand Up @@ -169,7 +172,23 @@ interface Reader {
}

static boolean isSystemTopic(TopicName topicName) {
return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
if (topicName.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
return true;
}

TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());

// event topic
if (EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) {
return true;
}

String localName = nonePartitionedTopicName.getLocalName();
// health check topic
if (StringUtils.endsWith(localName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX)){
return true;
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.admin.impl.BrokersBase;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -543,4 +543,51 @@ public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Excepti
-> Assert.assertFalse(admin.topics().getList(namespace).contains(topic3)));
Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
}

@Test(timeOut = 30000)
public void testInternalTopicInactiveNotClean() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
// init topic
final String healthCheckTopic = "persistent://prop/ns-abc/" + BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX;
final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions";

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();

Producer<byte[]> heathCheckProducer = pulsarClient.newProducer()
.topic(healthCheckTopic)
.create();
Consumer<byte[]> heathCheckConsumer = pulsarClient.newConsumer()
.topic(healthCheckTopic)
.subscriptionName("healthCheck")
.subscribe();

consumer.close();
producer.close();
heathCheckConsumer.close();
heathCheckProducer.close();

Awaitility.await().untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
.contains(topic)));
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(admin.topics().getList("prop/ns-abc").contains(healthCheckTopic));
});

admin.topics().deleteSubscription(topic, "sub");
admin.topics().deleteSubscription(healthCheckTopic, "healthCheck");

Awaitility.await().untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc")
.contains(topic)));
Awaitility.await().pollDelay(2, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
.contains(healthCheckTopic)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.events;

import org.apache.pulsar.common.naming.TopicName;

/**
* System topic name for the event type.
*/
Expand All @@ -29,4 +31,7 @@ public class EventsTopicNames {
*/
public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";

public static boolean checkTopicIsEventsNames(TopicName topicName) {
return NAMESPACE_EVENTS_LOCAL_NAME.equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
}
}

0 comments on commit 39d57df

Please sign in to comment.