Skip to content

Commit

Permalink
First find the persistent partitioned topic
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Dec 11, 2023
1 parent 4c02fbe commit 4229726
Showing 1 changed file with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1350,27 +1350,52 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
});
}

public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
private CompletableFuture<Boolean> internalCheckTopicExits(TopicName topic) {
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return CompletableFuture.completedFuture(true);
}
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
});
}

if (topic.isPersistent()) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
// The non-partitioned non-persistent topic only exist in the broker topics.
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (nonPersistentTopicFuture == null) {
return CompletableFuture.completedFuture(false);
public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
CompletableFuture<Boolean> future;
// If the topic is persistent and the name includes `-partition-`, find the topic from the managed/ledger.
if (topic.isPersistent() && topic.isPartitioned()) {
future = pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
future = CompletableFuture.completedFuture(false);
}

return future.thenCompose(found -> {
if (found != null && found) {
return CompletableFuture.completedFuture(true);
}

return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return CompletableFuture.completedFuture(true);
}

if (topic.isPersistent()) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
} else {
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
// The non-partitioned non-persistent topic only exist in the broker topics.
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (nonPersistentTopicFuture == null) {
return CompletableFuture.completedFuture(false);
} else {
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
}
}
}
});
});
});
}

public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
Expand Down

0 comments on commit 4229726

Please sign in to comment.