Skip to content

Commit

Permalink
[improve][broker] checkTopicExists supports checking partitioned topi…
Browse files Browse the repository at this point in the history
…c without index (apache#21701)

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 8b8048c)
  • Loading branch information
nodece authored and nikhil-ctds committed Jun 4, 2024
1 parent 29b4706 commit 2051bad
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1313,42 +1313,40 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
}

public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
if (topic.isPersistent()) {
if (topic.isPartitioned()) {
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
// Allow creating the non-partitioned persistent topic that name includes `-partition-`
if (metadata.partitions == 0
|| topic.getPartitionIndex() < metadata.partitions) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
}
return CompletableFuture.completedFuture(false);
});
} else {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
}
CompletableFuture<Boolean> future;
// If the topic is persistent and the name includes `-partition-`, find the topic from the managed/ledger.
if (topic.isPersistent() && topic.isPartitioned()) {
future = pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
if (topic.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topic.getPartitionedTopicName());
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(partitionedTopicName)
.thenApply((metadata) -> topic.getPartitionIndex() < metadata.partitions);
} else {
// only checks and don't do any topic creating and loading.
CompletableFuture<Optional<Topic>> topicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (topicFuture == null) {
return CompletableFuture.completedFuture(false);
} else {
return topicFuture.thenApply(Optional::isPresent).exceptionally(throwable -> {
LOG.warn("[{}] topicFuture completed with exception when checkTopicExists, {}",
topic, throwable.getMessage());
return false;
});
}
}
future = CompletableFuture.completedFuture(false);
}

return future.thenCompose(found -> {
if (found != null && found) {
return CompletableFuture.completedFuture(true);
}

return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return CompletableFuture.completedFuture(true);
}

if (topic.isPersistent()) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
// The non-partitioned non-persistent topic only exist in the broker topics.
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (nonPersistentTopicFuture == null) {
return CompletableFuture.completedFuture(false);
} else {
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
}
}
});
});
}

public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -72,6 +73,7 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
Expand All @@ -94,6 +96,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "flaky")
Expand Down Expand Up @@ -800,6 +803,30 @@ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
assertFalse(getResult.isPresent());
}

@DataProvider(name = "topicDomain")
public Object[] topicDomain() {
return new Object[]{
TopicDomain.persistent.value(),
TopicDomain.non_persistent.value()
};
}

@Test(dataProvider = "topicDomain")
public void testCheckTopicExists(String topicDomain) throws Exception {
String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get());
});

String partitionedTopic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
admin.topics().createPartitionedTopic(partitionedTopic, 5);
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get());
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic + "-partition-2")).get());
});
}

/**
* 1. Manually trigger "LoadReportUpdaterTask"
* 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
Expand Down

0 comments on commit 2051bad

Please sign in to comment.