Skip to content

Commit

Permalink
Add empty check for subscription name (#559)
Browse files Browse the repository at this point in the history
  • Loading branch information
nkurihar authored and Yuki Shiga committed Jul 15, 2017
1 parent ae6ca6b commit 3047d1c
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 2 deletions.
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isBlank;

import java.time.Instant;
import java.time.ZoneId;
Expand Down Expand Up @@ -349,6 +350,14 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

final CompletableFuture<Consumer> future = new CompletableFuture<>();

if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
}
future.completeExceptionally(new NamingException("Empty subscription name"));
return future;
}

if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
Expand Down
Expand Up @@ -353,6 +353,25 @@ public void testAddRemoveProducer() throws Exception {
topic.removeProducer(producer); /* noop */
}

@Test
public void testSubscribeFail() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);

// Empty subscription name
CommandSubscribe cmd = CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName)
.setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();

Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
0, cmd.getConsumerName(), cmd.getDurable(), null);
try {
f1.get();
fail("should fail with exception");
} catch (ExecutionException ee) {
// Expected
assertTrue(ee.getCause() instanceof BrokerServiceException.NamingException);
}
}

@Test
public void testSubscribeUnsubscribe() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
Expand Down
Expand Up @@ -448,6 +448,14 @@ public void testSillyUser() {
Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
}

try {
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7", "",
consumerConf);
Assert.fail("Should fail");
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
}

try {
Consumer consumer = pulsarClient.subscribe("invalid://topic7", "my-subscriber-name", consumerConf);
Assert.fail("Should fail");
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ConsumerStats;
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ProducerStats;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.glassfish.jersey.client.ClientConfig;
Expand Down Expand Up @@ -169,6 +170,44 @@ public void socketTest() throws Exception {
}
}

@Test(timeOut = 10000)
public void badConsumerTest() throws Exception {

// Empty subcription name
String consumerUri = "ws://localhost:" + port
+ "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive";
URI consumeUri = URI.create(consumerUri);

WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();

try {
consumeClient1.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
consumerFuture1.get();
Assert.fail("should fail: empty subscription");
} catch (Exception e) {
// Expected
Assert.assertTrue(e.getCause() instanceof UpgradeException);
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
executor.submit(() -> {
try {
consumeClient1.stop();
log.info("proxy clients are stopped successfully");
} catch (Exception e) {
log.error(e.getMessage());
}
}).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("failed to close clients ", e);
}
executor.shutdownNow();
}
}

/**
* It verifies proxy topic-stats and proxy-metrics api
*
Expand Down
Expand Up @@ -56,6 +56,8 @@
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;

import static org.apache.commons.lang3.StringUtils.isBlank;

public class PulsarClientImpl implements PulsarClient {

private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
Expand Down Expand Up @@ -233,9 +235,9 @@ public CompletableFuture<Consumer> subscribeAsync(final String topic, final Stri
if (!DestinationName.isValid(topic)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
if (subscription == null) {
if (isBlank(subscription)) {
return FutureUtil
.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid subscription name"));
.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
}
if (conf == null) {
return FutureUtil.failedFuture(
Expand Down
Expand Up @@ -267,6 +267,7 @@ private static String extractSubscription(HttpServletRequest request) {
checkArgument(parts.size() == 9, "Invalid topic name format");
checkArgument(parts.get(1).equals("ws"));
checkArgument(parts.get(3).equals("persistent"));
checkArgument(parts.get(8).length() > 0, "Empty subscription name");

return parts.get(8);
}
Expand Down

0 comments on commit 3047d1c

Please sign in to comment.