Skip to content

Commit

Permalink
[fix] [log] fix the vague response if topic not found (apache#20932)
Browse files Browse the repository at this point in the history
### Motivation

When I did this test below and got the error "Topic not found".

```java
pulsarAdmin.topics().createNonPartitionedTopic("persistent://public/default/tp_1");
Consumer consumer = null;
Consumer consumer = pulsarClient.newConsumer()
          .topic("persistent://public/default/tp_1")
          .subscriptionName("s1")
          .enableRetry(true)
          .subscribe();
```

I do create the topic `persistent://public/default/tp_1` first but got a response "Topic not found", it is confusing. 

The root cause is the retry letter topic `persistent://public/default/tp_1-sub1-RETRY` was not created. 

### Modifications

clear the vague response if the topic is not founded.
  • Loading branch information
poorbarcode committed Nov 28, 2023
1 parent e1d06b5 commit 1a024bc
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,13 @@ protected CompletableFuture<PersistentTopic> getExistingPersistentTopicAsync(boo
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(topicName.toString());
if (topicFuture == null) {
return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
return FutureUtil.failedFuture(new RestException(NOT_FOUND,
String.format("Topic not found %s", topicName.toString())));
}
return topicFuture.thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent()) {
return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
return FutureUtil.failedFuture(new RestException(NOT_FOUND,
String.format("Topic not found %s", topicName.toString())));
}
return CompletableFuture.completedFuture((PersistentTopic) optionalTopic.get());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,12 @@ public List<String> getListFromBundle(@PathParam("property") String property, @P
}
}

private Topic getTopicReference(TopicName topicName) {
private Topic getTopicReference(final TopicName topicName) {
try {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
String.format("Topic not found %s", topicName.toString())));
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException | TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public void getPartitionedStats(
getPartitionedTopicMetadataAsync(topicName,
authoritative, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
asyncResponse.resume(new RestException(Status.NOT_FOUND,
String.format("Partitioned topic not found %s", topicName.toString())));
return;
}
NonPersistentPartitionedTopicStatsImpl stats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TopicLookupBase extends PulsarWebResource {
private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";

protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean authoritative,
protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative,
String listenerName) {
if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topicName);
Expand All @@ -79,7 +79,8 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topic
})
.thenCompose(exist -> {
if (!exist) {
throw new RestException(Response.Status.NOT_FOUND, "Topic not found.");
throw new RestException(Response.Status.NOT_FOUND,
String.format("Topic not found %s", topicName.toString()));
}
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
cmdProducer.hasInitialSubscriptionName() ? cmdProducer.getInitialSubscriptionName() : null;
final boolean supportsPartialProducer = supportsPartialProducer();

TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
final TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
if (topicName == null) {
return;
}
Expand Down Expand Up @@ -1607,7 +1607,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {

// Do not print stack traces for expected exceptions
if (cause instanceof NoSuchElementException) {
cause = new TopicNotFoundException("Topic Not Found.");
cause = new TopicNotFoundException(String.format("Topic not found %s", topicName.toString()));
log.warn("[{}] Failed to load topic {}, producerId={}: Topic not found", remoteAddress, topicName,
producerId);
} else if (!Exceptions.areExceptionsPresentInChain(cause,
Expand Down Expand Up @@ -2439,7 +2439,7 @@ remoteAddress, new String(commandGetSchema.getSchemaVersion()),
schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
if (schemaAndMetadata == null) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
"Topic not found or no-schema");
String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
} else {
commandSender.sendGetSchemaResponse(requestId,
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
Expand All @@ -2457,7 +2457,7 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea
log.debug("Received CommandGetOrCreateSchema call from {}", remoteAddress);
}
long requestId = commandGetOrCreateSchema.getRequestId();
String topicName = commandGetOrCreateSchema.getTopic();
final String topicName = commandGetOrCreateSchema.getTopic();
SchemaData schemaData = getSchema(commandGetOrCreateSchema.getSchema());
SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
Expand All @@ -2477,7 +2477,7 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea
});
} else {
commandSender.sendGetOrCreateSchemaErrorResponse(requestId, ServerError.TopicNotFound,
"Topic not found");
String.format("Topic not found %s", topicName));
}
}).exceptionally(ex -> {
ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... c
testNamespace, "my-topic", true);
} catch (Exception e) {
//System.out.println(e.getMessage());
Assert.assertEquals("Topic not found", e.getMessage());
Assert.assertTrue(e.getMessage().contains("Topic not found"));
}

String key = "legendtkl";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void testGetTransactionInBufferStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
Expand All @@ -173,7 +173,7 @@ public void testGetTransactionInBufferStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testGetTransactionInPendingAckStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
Expand All @@ -219,7 +219,7 @@ public void testGetTransactionInPendingAckStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Expand Down Expand Up @@ -334,7 +334,7 @@ public void testGetTransactionBufferStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
Expand All @@ -344,7 +344,7 @@ public void testGetTransactionBufferStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
Expand Down Expand Up @@ -392,7 +392,7 @@ public void testGetPendingAckStats(String ackType) throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
Expand All @@ -402,7 +402,7 @@ public void testGetPendingAckStats(String ackType) throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);

Expand Down Expand Up @@ -541,7 +541,7 @@ public void testGetPendingAckInternalStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
try {
pulsar.getBrokerService().getTopic(topic, false);
Expand All @@ -551,7 +551,7 @@ public void testGetPendingAckInternalStats() throws Exception {
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
assertTrue(cause.getMessage().contains("Topic not found"));
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-api")
public class SimpleProducerConsumerDisallowAutoCreateTopicTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setAllowAutoTopicCreation(false);
}

@Test
public void testClearErrorIfRetryTopicNotExists() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
final String subName = "sub";
final String retryTopicName = topicName + "-" + subName + RETRY_GROUP_TOPIC_SUFFIX;
admin.topics().createNonPartitionedTopic(topicName);
Consumer consumer = null;
try {
consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.enableRetry(true)
.subscribe();
fail("");
} catch (Exception ex) {
log.info("got an expected error", ex);
assertTrue(ex.getMessage().contains("Not found:"));
assertTrue(ex.getMessage().contains(retryTopicName));
} finally {
// cleanup.
if (consumer != null) {
consumer.close();
}
admin.topics().delete(topicName);
}
}
}

0 comments on commit 1a024bc

Please sign in to comment.