Skip to content

Commit 758e135

Browse files
nodecesrinath-ctds
authored andcommitted
[fix][broker] Directly query single topic existence when the topic is partitioned (apache#24154)
Signed-off-by: Zixuan Liu <nodeces@gmail.com> (cherry picked from commit 0d6c6f4) (cherry picked from commit b619f9c)
1 parent 38145e8 commit 758e135

File tree

7 files changed

+107
-32
lines changed

7 files changed

+107
-32
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,18 +1416,22 @@ public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) {
14161416
return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
14171417
topic.isPartitioned() ? TopicName.get(topic.getPartitionedTopicName()) : topic)
14181418
.thenCompose(metadata -> {
1419+
// When the topic has metadata:
1420+
// - The topic name is non-partitioned, which means that the topic exists.
1421+
// - The topic name is partitioned, please check the specific partition.
14191422
if (metadata.partitions > 0) {
14201423
if (!topic.isPartitioned()) {
14211424
return CompletableFuture.completedFuture(
14221425
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
1423-
} else {
1424-
if (topic.getPartitionIndex() < metadata.partitions) {
1425-
return CompletableFuture.completedFuture(
1426-
TopicExistsInfo.newNonPartitionedTopicExists());
1427-
}
1426+
}
1427+
if (!topic.isPersistent()) {
1428+
// A non-persistent partitioned topic contains only metadata.
1429+
// Since no actual partitions are created, there's no need to check under /managed-ledgers.
1430+
return CompletableFuture.completedFuture(topic.getPartitionIndex() < metadata.partitions
1431+
? TopicExistsInfo.newNonPartitionedTopicExists()
1432+
: TopicExistsInfo.newTopicNotExists());
14281433
}
14291434
}
1430-
// Direct query the single topic.
14311435
return checkNonPartitionedTopicExists(topic).thenApply(
14321436
b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() :
14331437
TopicExistsInfo.newTopicNotExists());

pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java

Lines changed: 90 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.namespace;
2020

2121
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
22+
import static org.assertj.core.api.Assertions.assertThat;
2223
import static org.mockito.ArgumentMatchers.any;
2324
import static org.mockito.Mockito.doAnswer;
2425
import static org.mockito.Mockito.doReturn;
@@ -86,6 +87,7 @@
8687
import org.apache.pulsar.common.policies.data.Policies;
8788
import org.apache.pulsar.common.policies.data.TenantInfo;
8889
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
90+
import org.apache.pulsar.common.policies.data.TopicType;
8991
import org.apache.pulsar.common.util.ObjectMapperFactory;
9092
import org.apache.pulsar.metadata.api.GetResult;
9193
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -816,23 +818,6 @@ public Object[] topicDomain() {
816818
};
817819
}
818820

819-
@Test(dataProvider = "topicDomain")
820-
public void testCheckTopicExists(String topicDomain) throws Exception {
821-
String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
822-
admin.topics().createNonPartitionedTopic(topic);
823-
Awaitility.await().untilAsserted(() -> {
824-
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(topic)).get().isExists());
825-
});
826-
827-
String partitionedTopic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
828-
admin.topics().createPartitionedTopic(partitionedTopic, 5);
829-
Awaitility.await().untilAsserted(() -> {
830-
assertTrue(pulsar.getNamespaceService().checkTopicExists(TopicName.get(partitionedTopic)).get().isExists());
831-
assertTrue(pulsar.getNamespaceService()
832-
.checkTopicExists(TopicName.get(partitionedTopic + "-partition-2")).get().isExists());
833-
});
834-
}
835-
836821
@Test
837822
public void testAllowedClustersAtNamespaceLevelShouldBeIncludedInAllowedClustersAtTenantLevel() throws Exception {
838823
// 1. Setup
@@ -954,6 +939,94 @@ public void testNewAllowedClusterAdminAPIAndItsImpactOnReplicationClusterAPI() t
954939
pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
955940
}
956941

942+
943+
@Test(dataProvider = "topicDomain")
944+
public void checkTopicExistsForNonPartitionedTopic(String topicDomain) throws Exception {
945+
TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID());
946+
admin.topics().createNonPartitionedTopic(topicName.toString());
947+
CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName);
948+
assertThat(result)
949+
.succeedsWithin(3, TimeUnit.SECONDS)
950+
.satisfies(n -> {
951+
assertTrue(n.isExists());
952+
assertEquals(n.getPartitions(), 0);
953+
assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED);
954+
n.recycle();
955+
});
956+
}
957+
958+
@Test(dataProvider = "topicDomain")
959+
public void checkTopicExistsForPartitionedTopic(String topicDomain) throws Exception {
960+
TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID());
961+
admin.topics().createPartitionedTopic(topicName.toString(), 3);
962+
963+
// Check the topic exists by the partitions.
964+
CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName);
965+
assertThat(result)
966+
.succeedsWithin(3, TimeUnit.SECONDS)
967+
.satisfies(n -> {
968+
assertTrue(n.isExists());
969+
assertEquals(n.getPartitions(), 3);
970+
assertEquals(n.getTopicType(), TopicType.PARTITIONED);
971+
n.recycle();
972+
});
973+
974+
// Check the specific partition.
975+
result = pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(2));
976+
assertThat(result)
977+
.succeedsWithin(3, TimeUnit.SECONDS)
978+
.satisfies(n -> {
979+
assertTrue(n.isExists());
980+
assertEquals(n.getPartitions(), 0);
981+
assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED);
982+
n.recycle();
983+
});
984+
985+
// Partition index is out of range.
986+
result = pulsar.getNamespaceService().checkTopicExists(topicName.getPartition(10));
987+
assertThat(result)
988+
.succeedsWithin(3, TimeUnit.SECONDS)
989+
.satisfies(n -> {
990+
assertFalse(n.isExists());
991+
assertEquals(n.getPartitions(), 0);
992+
assertEquals(n.getTopicType(), TopicType.NON_PARTITIONED);
993+
n.recycle();
994+
});
995+
}
996+
997+
@Test(dataProvider = "topicDomain")
998+
public void checkTopicExistsForNonExistentNonPartitionedTopic(String topicDomain) {
999+
TopicName topicName = TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID());
1000+
CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName);
1001+
assertThat(result)
1002+
.succeedsWithin(3, TimeUnit.SECONDS)
1003+
.satisfies(n -> {
1004+
// when using the pulsar client to check non_persistent topic, always return true, so ignore to
1005+
// check that.
1006+
if (topicDomain.equals(TopicDomain.persistent)) {
1007+
assertFalse(n.isExists());
1008+
}
1009+
n.recycle();
1010+
});
1011+
}
1012+
1013+
@Test(dataProvider = "topicDomain")
1014+
public void checkTopicExistsForNonExistentPartitionTopic(String topicDomain) {
1015+
TopicName topicName =
1016+
TopicName.get(topicDomain, "prop", "ns-abc", "topic-" + UUID.randomUUID() + "-partition-10");
1017+
CompletableFuture<TopicExistsInfo> result = pulsar.getNamespaceService().checkTopicExists(topicName);
1018+
assertThat(result)
1019+
.succeedsWithin(3, TimeUnit.SECONDS)
1020+
.satisfies(n -> {
1021+
// when using the pulsar client to check non_persistent topic, always return true, so ignore to
1022+
// check that.
1023+
if (topicDomain.equals(TopicDomain.persistent)) {
1024+
assertFalse(n.isExists());
1025+
}
1026+
n.recycle();
1027+
});
1028+
}
1029+
9571030
/**
9581031
* 1. Manually trigger "LoadReportUpdaterTask"
9591032
* 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@
3737
import org.testng.annotations.Test;
3838

3939
@Slf4j
40-
// TODO: This test is in flaky group until CI is fixed.
41-
// To be addressed as part of https://github.com/apache/pulsar/pull/24154
42-
@Test(groups = "flaky")
40+
@Test(groups = "broker")
4341
public class BrokerServiceChaosTest extends CanReconnectZKClientPulsarServiceBaseTest {
4442

4543
@Override

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@
4141
import org.testng.annotations.Test;
4242

4343
@Slf4j
44-
// TODO: This test is in flaky group until CI is fixed.
45-
// To be addressed as part of https://github.com/apache/pulsar/pull/24154
46-
@Test(groups = "flaky")
44+
@Test(groups = "broker")
4745
public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicatorTest {
4846

4947
@Override

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain domain,
117117
Consumer<byte[]> ignored =
118118
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
119119
} else {
120-
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
120+
assertThrows(PulsarClientException.NotFoundException.class, () -> {
121121
@Cleanup
122122
Consumer<byte[]> ignored =
123123
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain domain,
258258
@Cleanup
259259
Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create();
260260
} else {
261-
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
261+
assertThrows(PulsarClientException.NotFoundException.class, () -> {
262262
@Cleanup
263263
Producer<byte[]> ignored = pulsarClient.newProducer().topic(partitionedTopic).create();
264264
});

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,9 @@ public TransactionHasOperationFailedException(String msg) {
961961
public static Throwable wrap(Throwable t, String msg) {
962962
msg += "\n" + t.getMessage();
963963
// wrap an exception with new message info
964-
if (t instanceof TopicDoesNotExistException) {
964+
if (t instanceof NotFoundException) {
965+
return new NotFoundException(msg);
966+
} else if (t instanceof TopicDoesNotExistException) {
965967
return new TopicDoesNotExistException(msg);
966968
} else if (t instanceof TimeoutException) {
967969
return new TimeoutException(msg);

0 commit comments

Comments
 (0)