diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f8b8cd79e6748..cebad32f9edb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.isBlank; import java.time.Instant; import java.time.ZoneId; @@ -349,6 +350,14 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri final CompletableFuture future = new CompletableFuture<>(); + if (isBlank(subscriptionName)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Empty subscription name", topic); + } + future.completeExceptionally(new NamingException("Empty subscription name")); + return future; + } + if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { if (log.isDebugEnabled()) { log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 8d77102037b09..9744dd992b3ad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -353,6 +353,25 @@ public void testAddRemoveProducer() throws Exception { topic.removeProducer(producer); /* noop */ } + @Test + public void testSubscribeFail() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + + // Empty subscription name + CommandSubscribe cmd = CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName) + .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build(); + + Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), + 0, cmd.getConsumerName(), cmd.getDurable(), null); + try { + f1.get(); + fail("should fail with exception"); + } catch (ExecutionException ee) { + // Expected + assertTrue(ee.getCause() instanceof BrokerServiceException.NamingException); + } + } + @Test public void testSubscribeUnsubscribe() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 5b854d74bd31e..628f2ce5fb991 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -448,6 +448,14 @@ public void testSillyUser() { Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException); } + try { + Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7", "", + consumerConf); + Assert.fail("Should fail"); + } catch (PulsarClientException e) { + Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException); + } + try { Consumer consumer = pulsarClient.subscribe("invalid://topic7", "my-subscriber-name", consumerConf); Assert.fail("Should fail"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 6ad92b0cebc8d..6a7f1fb5a0c6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.websocket.stats.ProxyTopicStat.ConsumerStats; import org.apache.pulsar.websocket.stats.ProxyTopicStat.ProducerStats; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.UpgradeException; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.glassfish.jersey.client.ClientConfig; @@ -169,6 +170,44 @@ public void socketTest() throws Exception { } } + @Test(timeOut = 10000) + public void badConsumerTest() throws Exception { + + // Empty subcription name + String consumerUri = "ws://localhost:" + port + + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive"; + URI consumeUri = URI.create(consumerUri); + + WebSocketClient consumeClient1 = new WebSocketClient(); + SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket(); + + try { + consumeClient1.start(); + ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest(); + Future consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1); + consumerFuture1.get(); + Assert.fail("should fail: empty subscription"); + } catch (Exception e) { + // Expected + Assert.assertTrue(e.getCause() instanceof UpgradeException); + } finally { + ExecutorService executor = newFixedThreadPool(1); + try { + executor.submit(() -> { + try { + consumeClient1.stop(); + log.info("proxy clients are stopped successfully"); + } catch (Exception e) { + log.error(e.getMessage()); + } + }).get(2, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("failed to close clients ", e); + } + executor.shutdownNow(); + } + } + /** * It verifies proxy topic-stats and proxy-metrics api * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 74a7cff22508c..c58c878d53455 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -56,6 +56,8 @@ import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import static org.apache.commons.lang3.StringUtils.isBlank; + public class PulsarClientImpl implements PulsarClient { private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class); @@ -233,9 +235,9 @@ public CompletableFuture subscribeAsync(final String topic, final Stri if (!DestinationName.isValid(topic)) { return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name")); } - if (subscription == null) { + if (isBlank(subscription)) { return FutureUtil - .failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid subscription name")); + .failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name")); } if (conf == null) { return FutureUtil.failedFuture( diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 603fa9da2642b..d748fb15cc494 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -267,6 +267,7 @@ private static String extractSubscription(HttpServletRequest request) { checkArgument(parts.size() == 9, "Invalid topic name format"); checkArgument(parts.get(1).equals("ws")); checkArgument(parts.get(3).equals("persistent")); + checkArgument(parts.get(8).length() > 0, "Empty subscription name"); return parts.get(8); }