Skip to content

Commit

Permalink
[improve][broker] checkTopicExists supports checking partitioned topi…
Browse files Browse the repository at this point in the history
…c without index

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Dec 9, 2023
1 parent 2e31e71 commit 4c02fbe
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1351,42 +1351,26 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
}

public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
if (topic.isPersistent()) {
if (topic.isPartitioned()) {
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
// Allow creating the non-partitioned persistent topic that name includes `-partition-`
if (metadata.partitions == 0
|| topic.getPartitionIndex() < metadata.partitions) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
}
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return CompletableFuture.completedFuture(true);
}

if (topic.isPersistent()) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
// The non-partitioned non-persistent topic only exist in the broker topics.
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (nonPersistentTopicFuture == null) {
return CompletableFuture.completedFuture(false);
});
} else {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
}
} else {
if (topic.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topic.getPartitionedTopicName());
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(partitionedTopicName)
.thenApply((metadata) -> topic.getPartitionIndex() < metadata.partitions);
} else {
// only checks and don't do any topic creating and loading.
CompletableFuture<Optional<Topic>> topicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (topicFuture == null) {
return CompletableFuture.completedFuture(false);
} else {
return topicFuture.thenApply(Optional::isPresent).exceptionally(throwable -> {
LOG.warn("[{}] topicFuture completed with exception when checkTopicExists, {}",
topic, throwable.getMessage());
return false;
});
}
}
}
} else {
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
}
}
});
}

public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -64,6 +65,7 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -73,6 +75,7 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
Expand All @@ -95,6 +98,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "flaky")
Expand Down Expand Up @@ -799,6 +803,30 @@ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
assertFalse(getResult.isPresent());
}

@DataProvider(name = "topicDomain")
public Object[] topicDomain() {
return new Object[]{
TopicDomain.persistent.value(),
TopicDomain.non_persistent.value()
};
}

@Test(dataProvider = "topicDomain")
public void testCheckTopicExists(String topicDomain) throws Exception {
String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get());
});

String partitionedTopic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
admin.topics().createPartitionedTopic(partitionedTopic, 5);
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get());
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic + "-partition-2")).get());
});
}

/**
* 1. Manually trigger "LoadReportUpdaterTask"
* 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
Expand Down

0 comments on commit 4c02fbe

Please sign in to comment.