Skip to content

Commit eb3fc87

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix][proxy] Fix incorrect client error when calling get topic metadata (apache#24181)
(cherry picked from commit 39f3d9b) (cherry picked from commit 2a52ad2)
1 parent 32392a2 commit eb3fc87

File tree

3 files changed

+85
-1
lines changed

3 files changed

+85
-1
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,47 @@ public static PulsarClientException getPulsarClientException(ServerError error,
13751375
}
13761376
}
13771377

1378+
public static ServerError revertClientExToErrorCode(PulsarClientException ex) {
1379+
if (ex instanceof PulsarClientException.AuthenticationException) {
1380+
return ServerError.AuthenticationError;
1381+
} else if (ex instanceof PulsarClientException.AuthorizationException) {
1382+
return ServerError.AuthorizationError;
1383+
} else if (ex instanceof PulsarClientException.ProducerBusyException) {
1384+
return ServerError.ProducerBusy;
1385+
} else if (ex instanceof PulsarClientException.ConsumerBusyException) {
1386+
return ServerError.ConsumerBusy;
1387+
} else if (ex instanceof PulsarClientException.BrokerMetadataException) {
1388+
return ServerError.MetadataError;
1389+
} else if (ex instanceof PulsarClientException.BrokerPersistenceException) {
1390+
return ServerError.PersistenceError;
1391+
} else if (ex instanceof PulsarClientException.TooManyRequestsException) {
1392+
return ServerError.TooManyRequests;
1393+
} else if (ex instanceof PulsarClientException.LookupException) {
1394+
return ServerError.ServiceNotReady;
1395+
} else if (ex instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
1396+
return ServerError.ProducerBlockedQuotaExceededError;
1397+
} else if (ex instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
1398+
return ServerError.ProducerBlockedQuotaExceededException;
1399+
} else if (ex instanceof PulsarClientException.TopicTerminatedException) {
1400+
return ServerError.TopicTerminatedError;
1401+
} else if (ex instanceof PulsarClientException.IncompatibleSchemaException) {
1402+
return ServerError.IncompatibleSchema;
1403+
} else if (ex instanceof PulsarClientException.TopicDoesNotExistException) {
1404+
return ServerError.TopicNotFound;
1405+
} else if (ex instanceof PulsarClientException.SubscriptionNotFoundException) {
1406+
return ServerError.SubscriptionNotFound;
1407+
} else if (ex instanceof PulsarClientException.ConsumerAssignException) {
1408+
return ServerError.ConsumerAssignError;
1409+
} else if (ex instanceof PulsarClientException.NotAllowedException) {
1410+
return ServerError.NotAllowedError;
1411+
} else if (ex instanceof PulsarClientException.TransactionConflictException) {
1412+
return ServerError.TransactionConflict;
1413+
} else if (ex instanceof PulsarClientException.ProducerFencedException) {
1414+
return ServerError.ProducerFenced;
1415+
}
1416+
return ServerError.UnknownError;
1417+
}
1418+
13781419
public void close() {
13791420
if (ctx != null) {
13801421
ctx.close();

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DefaultLookupProxyHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
256256
if (t != null) {
257257
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
258258
t.getMessage(), t);
259-
writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
259+
PulsarClientException pce = PulsarClientException.unwrap(t);
260+
writeAndFlush(Commands.newLookupErrorResponse(clientCnx.revertClientExToErrorCode(pce),
260261
t.getMessage(), clientRequestId));
261262
} else {
262263
writeAndFlush(

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import static org.mockito.Mockito.doReturn;
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertNotNull;
27+
import static org.testng.Assert.assertTrue;
28+
import static org.testng.Assert.fail;
2729
import io.netty.buffer.ByteBuf;
2830
import io.netty.channel.EventLoopGroup;
2931
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -52,6 +54,7 @@
5254
import org.apache.pulsar.client.api.MessageRoutingMode;
5355
import org.apache.pulsar.client.api.Producer;
5456
import org.apache.pulsar.client.api.PulsarClient;
57+
import org.apache.pulsar.client.api.PulsarClientException;
5558
import org.apache.pulsar.client.api.Schema;
5659
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
5760
import org.apache.pulsar.client.api.SubscriptionType;
@@ -67,13 +70,15 @@
6770
import org.apache.pulsar.common.api.proto.ProtocolVersion;
6871
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
6972
import org.apache.pulsar.common.naming.TopicName;
73+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
7074
import org.apache.pulsar.common.policies.data.ClusterData;
7175
import org.apache.pulsar.common.policies.data.RetentionPolicies;
7276
import org.apache.pulsar.common.policies.data.TenantInfo;
7377
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
7478
import org.apache.pulsar.common.policies.data.TopicType;
7579
import org.apache.pulsar.common.protocol.Commands;
7680
import org.apache.pulsar.common.schema.SchemaInfo;
81+
import org.apache.pulsar.common.util.FutureUtil;
7782
import org.apache.pulsar.common.util.netty.EventLoopUtil;
7883
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
7984
import org.mockito.Mockito;
@@ -139,6 +144,13 @@ protected void initializeProxyConfig() throws Exception {
139144
proxyClientAuthentication.start();
140145
}
141146

147+
@Override
148+
protected void doInitConf() throws Exception {
149+
super.doInitConf();
150+
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
151+
conf.setDefaultNumPartitions(1);
152+
}
153+
142154
@Override
143155
@AfterClass(alwaysRun = true)
144156
protected void cleanup() throws Exception {
@@ -414,6 +426,36 @@ public void testProtocolVersionAdvertisement() throws Exception {
414426
}
415427
}
416428

429+
@Test
430+
public void testGetPartitionedMetadataErrorCode() throws Exception {
431+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
432+
// Trigger partitioned metadata creation.
433+
PulsarClientImpl brokerClient = (PulsarClientImpl) pulsarClient;
434+
PartitionedTopicMetadata brokerMetadata =
435+
brokerClient.getPartitionedTopicMetadata(topic, true, true).get();
436+
assertEquals(brokerMetadata.partitions, 1);
437+
assertEquals(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
438+
.getPartitionedTopicMetadataAsync(TopicName.get(topic)).get().get().partitions, 1);
439+
// Verify: Proxy never rewrite error code.
440+
ClientConfigurationData proxyClientConf = new ClientConfigurationData();
441+
proxyClientConf.setServiceUrl(proxyService.getServiceUrl());
442+
PulsarClientImpl proxyClient =
443+
(PulsarClientImpl) getClientActiveConsumerChangeNotSupported(proxyClientConf);
444+
PartitionedTopicMetadata proxyMetadata =
445+
proxyClient.getPartitionedTopicMetadata(topic, false, false).get();
446+
assertEquals(proxyMetadata.partitions, 1);
447+
try {
448+
proxyClient.getPartitionedTopicMetadata(topic + "-partition-0", false, false).get();
449+
fail("expected a TopicDoesNotExistException");
450+
} catch (Exception ex) {
451+
assertTrue(FutureUtil.unwrapCompletionException(ex)
452+
instanceof PulsarClientException.TopicDoesNotExistException);
453+
}
454+
// cleanup.
455+
proxyClient.close();
456+
admin.topics().deletePartitionedTopic(topic);
457+
}
458+
417459
@Test
418460
public void testGetClientVersion() throws Exception {
419461
@Cleanup

0 commit comments

Comments
 (0)