Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(Discover
try {
checkAuthorization(service, topicName, role, authenticationData);
final String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
topicName.getNamespaceObject().toString(), topicName.getDomain().value(), topicName.getEncodedLocalName());
// gets the number of partitions from the zk cache
pulsarResources.getNamespaceResources().getPartitionedTopicResources().getAsync(path)
.thenAccept(metadata -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,11 @@ protected void simulateStoreError(String string, Code sessionexpired) {
&& path.equals("/admin/partitioned-topics/test/local/ns/persistent/my-topic-2");
});
}

protected void simulateStoreErrorForNonPersistentTopic(String string, Code sessionexpired) {
mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ public void testGetPartitionsMetadata() throws Exception {
}
}

@Test
public void testGetPartitionsMetadataForNonPersistentTopic() throws Exception {
TopicName topic1 = TopicName.get("non-persistent://test/local/ns/my-topic-1");

PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
.get();
assertEquals(m.partitions, 0);

// Simulate ZK error
simulateStoreErrorForNonPersistentTopic("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2", Code.SESSIONEXPIRED);
TopicName topic2 = TopicName.get("non-persistent://test/local/ns/my-topic-2");
CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
.getPartitionedTopicMetadata(service, topic2, "role", null);
try {
future.get();
fail("Partition metadata lookup should have failed");
} catch (ExecutionException e) {
assertEquals(e.getCause().getClass(), MetadataStoreException.class);
}
}

/**
* It verifies: client connects to Discovery-service and receives discovery response successfully.
*
Expand Down