From 305a35a62ae0dc014235821687642ad883a63e24 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Thu, 17 Sep 2020 10:22:47 +0800 Subject: [PATCH 1/3] support non-durable --- .../client/cli/PulsarClientToolTest.java | 108 +++++++++++ .../client/cli/PulsarClientToolWsTest.java | 170 ++++++++++++++++++ .../apache/pulsar/client/cli/CmdConsume.java | 9 +- .../pulsar/websocket/ConsumerHandler.java | 13 ++ .../websocket/stats/ProxyTopicStat.java | 3 + 5 files changed, 301 insertions(+), 2 deletions(-) create mode 100644 pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java index a859380a96702..5290031d3e5d2 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java @@ -101,4 +101,112 @@ public void testInitialzation() throws InterruptedException, ExecutionException, future.get(); executor.shutdown(); } + + @Test(timeOut = 20000) + public void testNonDurableSubscribe() throws Exception { + + Properties properties = new Properties(); + properties.setProperty("serviceUrl", brokerUrl.toString()); + properties.setProperty("useTls", "false"); + + final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString(); + + int numberOfMessages = 10; + ExecutorService executor = Executors.newSingleThreadExecutor(); + CompletableFuture future = new CompletableFuture<>(); + executor.execute(() -> { + try { + PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); + String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", + Integer.toString(numberOfMessages), "--hex", "-m", "NonDurable", "-r", "30", topicName}; + Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); + future.complete(null); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + // Make sure subscription has been created + while (true) { + try { + List subscriptions = admin.topics().getSubscriptions(topicName); + if (subscriptions.size() == 1) { + break; + } + } catch (Exception e) { + } + Thread.sleep(200); + } + + PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); + + String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", + "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; + Assert.assertEquals(pulsarClientToolProducer.run(args), 0); + Assert.assertFalse(future.isCompletedExceptionally()); + future.get(); + executor.shutdown(); + + while (true) { + try { + List subscriptions = admin.topics().getSubscriptions(topicName); + if (subscriptions.size() == 0) { + break; + } + } catch (Exception e) { + } + Thread.sleep(200); + } + } + + @Test(timeOut = 20000) + public void testDurableSubscribe() throws Exception { + + Properties properties = new Properties(); + properties.setProperty("serviceUrl", brokerUrl.toString()); + properties.setProperty("useTls", "false"); + + final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString(); + + int numberOfMessages = 10; + ExecutorService executor = Executors.newSingleThreadExecutor(); + CompletableFuture future = new CompletableFuture<>(); + executor.execute(() -> { + try { + PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); + String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", + Integer.toString(numberOfMessages), "--hex", "-m", "Durable", "-r", "30", topicName}; + Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); + future.complete(null); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + // Make sure subscription has been created + while (true) { + try { + List subscriptions = admin.topics().getSubscriptions(topicName); + if (subscriptions.size() == 1) { + break; + } + } catch (Exception e) { + } + Thread.sleep(200); + } + + PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); + + String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", + "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; + Assert.assertEquals(pulsarClientToolProducer.run(args), 0); + Assert.assertFalse(future.isCompletedExceptionally()); + future.get(); + executor.shutdown(); + //wait for close + Thread.sleep(2000); + List subscriptions = admin.topics().getSubscriptions(topicName); + Assert.assertNotNull(subscriptions); + Assert.assertEquals(subscriptions.size(), 1); + } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java new file mode 100644 index 0000000000000..c2e9d976d3479 --- /dev/null +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.cli; + +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.websocket.WebSocketService; +import org.apache.pulsar.websocket.service.ProxyServer; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + + +public class PulsarClientToolWsTest extends BrokerTestBase { + private ProxyServer proxyServer; + private WebSocketService service; + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + + /*WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); + config.setWebServicePort(Optional.of(0)); + config.setClusterName("test"); + config.setConfigurationStoreServers("dummy-zk-servers"); + service = spy(new WebSocketService(config)); + doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); + proxyServer = new ProxyServer(config); + WebSocketServiceStarter.start(proxyServer, service);*/ + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.resetConfig(); + super.internalCleanup(); + //service.close(); + //proxyServer.stop(); + } + + @Test(timeOut = 20000) + public void testWebSocketNonDurableSubscriptionMode() throws Exception { + Properties properties = new Properties(); + properties.setProperty("serviceUrl", brokerUrl.toString()); + properties.setProperty("useTls", "false"); + + final String topicName = "persistent://my-property/my-ns/test/topic-" + UUID.randomUUID(); + + int numberOfMessages = 10; + ExecutorService executor = Executors.newSingleThreadExecutor(); + CompletableFuture future = new CompletableFuture<>(); + executor.execute(() -> { + try { + PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); + String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", + Integer.toString(numberOfMessages), "--hex", "-m", "NonDurable", "-r", "30", topicName}; + Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); + future.complete(null); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + // Make sure subscription has been created + while (true) { + try { + List subscriptions = admin.topics().getSubscriptions(topicName); + if (subscriptions.size() == 1) { + break; + } + } catch (Exception e) { + } + Thread.sleep(200); + } + + PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); + + String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", + "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; + Assert.assertEquals(pulsarClientToolProducer.run(args), 0); + Assert.assertFalse(future.isCompletedExceptionally()); + future.get(); + executor.shutdown(); + while (true) { + try { + List subscriptions = admin.topics().getSubscriptions(topicName); + if (subscriptions.size() == 0) { + break; + } + } catch (Exception e) { + } + Thread.sleep(200); + } + } + + @Test(timeOut = 20000) + public void testWebSocketDurableSubscriptionMode() throws Exception { + Properties properties = new Properties(); + properties.setProperty("serviceUrl", brokerUrl.toString()); + properties.setProperty("useTls", "false"); + + final String topicName = "persistent://my-property/my-ns/test/topic-" + UUID.randomUUID(); + + int numberOfMessages = 10; + ExecutorService executor = Executors.newSingleThreadExecutor(); + CompletableFuture future = new CompletableFuture<>(); + executor.execute(() -> { + try { + PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); + String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", + Integer.toString(numberOfMessages), "--hex", "-m", "Durable", "-r", "30", topicName}; + Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); + future.complete(null); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + // Make sure subscription has been created + while (true) { + try { + List subscriptions = admin.topics().getSubscriptions(topicName); + if (subscriptions.size() == 1) { + break; + } + } catch (Exception e) { + } + Thread.sleep(200); + } + + PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); + + String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", + "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; + Assert.assertEquals(pulsarClientToolProducer.run(args), 0); + Assert.assertFalse(future.isCompletedExceptionally()); + future.get(); + executor.shutdown(); + + //wait for close + Thread.sleep(2000); + List subscriptions = admin.topics().getSubscriptions(topicName); + Assert.assertNotNull(subscriptions); + Assert.assertEquals(subscriptions.size(), 1); + } +} \ No newline at end of file diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index 5bca4d4de4b53..f6d5cb7e49a44 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; @@ -82,6 +83,9 @@ public class CmdConsume { @Parameter(names = { "-t", "--subscription-type" }, description = "Subscription type.") private SubscriptionType subscriptionType = SubscriptionType.Exclusive; + @Parameter(names = { "-m", "--subscription-mode" }, description = "Subscription mode.") + private SubscriptionMode subscriptionMode = SubscriptionMode.Durable; + @Parameter(names = { "-p", "--subscription-position" }, description = "Subscription position.") private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; @@ -197,6 +201,7 @@ private int consume(String topic) { ConsumerBuilder builder = client.newConsumer() .subscriptionName(this.subscriptionName) .subscriptionType(subscriptionType) + .subscriptionMode(subscriptionMode) .subscriptionInitialPosition(subscriptionInitialPosition); if (isRegex) { @@ -255,9 +260,9 @@ private int consumeFromWebSocket(String topic) { String wsTopic = String.format( "%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster() + "/") - + "%s/%s/%s?subscriptionType=%s", + + "%s/%s/%s?subscriptionType=%s&subscriptionMode=%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName(), - subscriptionName, subscriptionType.toString()); + subscriptionName, subscriptionType.toString(), subscriptionMode.toString()); String consumerBaseUri = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/" + wsTopic; URI consumerUri = URI.create(consumerBaseUri); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 0645a9a0000f6..b270afa9ae6c1 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.common.util.DateFormatter; @@ -70,6 +71,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler { private String subscription = null; private SubscriptionType subscriptionType; + private SubscriptionMode subscriptionMode; private Consumer consumer; private int maxPendingMessages = 0; @@ -103,6 +105,7 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser : builder.getConf().getReceiverQueueSize(); } this.subscriptionType = builder.getConf().getSubscriptionType(); + this.subscriptionMode = builder.getConf().getSubscriptionMode(); if (!checkAuth(response)) { return; @@ -284,6 +287,10 @@ public SubscriptionType getSubscriptionType() { return subscriptionType; } + public SubscriptionMode getSubscriptionMode() { + return subscriptionMode; + } + public long getAndResetNumMsgsDelivered() { return numMsgsDelivered.sumThenReset(); } @@ -319,6 +326,12 @@ private ConsumerBuilder getConsumerConfiguration(PulsarClient client) { builder.subscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType"))); } + if (queryParams.containsKey("subscriptionMode")) { + checkArgument(Enums.getIfPresent(SubscriptionMode.class, queryParams.get("subscriptionMode")).isPresent(), + "Invalid subscriptionMode %s", queryParams.get("subscriptionMode")); + builder.subscriptionMode(SubscriptionMode.valueOf(queryParams.get("subscriptionMode"))); + } + if (queryParams.containsKey("receiverQueueSize")) { builder.receiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")), 1000)); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java index ce6bd91643c65..9cbd9ae6749fa 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java @@ -20,6 +20,7 @@ import java.util.Set; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.websocket.ConsumerHandler; import org.apache.pulsar.websocket.ProducerHandler; @@ -62,6 +63,7 @@ public ConsumerStats() { public ConsumerStats(ConsumerHandler handler) { this.subscriptionName = handler.getSubscription(); this.subscriptionType = handler.getSubscriptionType(); + this.subscriptionMode = handler.getSubscriptionMode(); this.remoteConnection = handler.getRemote().getInetSocketAddress().toString(); this.numberOfMsgDelivered = handler.getMsgDeliveredCounter(); } @@ -76,6 +78,7 @@ public ConsumerStats(ReaderHandler handler) { public String remoteConnection; public String subscriptionName; public SubscriptionType subscriptionType; + public SubscriptionMode subscriptionMode; public long numberOfMsgDelivered; } From 66894690152ca342667e4defa3f2f4de535c1d99 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Tue, 22 Sep 2020 13:41:00 +0800 Subject: [PATCH 2/3] remove code --- .../pulsar/client/cli/PulsarClientToolWsTest.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java index c2e9d976d3479..ca2eba117708c 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java @@ -42,15 +42,6 @@ public class PulsarClientToolWsTest extends BrokerTestBase { @Override protected void setup() throws Exception { super.internalSetup(); - - /*WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); - config.setWebServicePort(Optional.of(0)); - config.setClusterName("test"); - config.setConfigurationStoreServers("dummy-zk-servers"); - service = spy(new WebSocketService(config)); - doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); - proxyServer = new ProxyServer(config); - WebSocketServiceStarter.start(proxyServer, service);*/ } @AfterMethod @@ -58,8 +49,6 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { super.resetConfig(); super.internalCleanup(); - //service.close(); - //proxyServer.stop(); } @Test(timeOut = 20000) From 161fe1640e11eb619f5ca5039a2175934711bdd1 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Wed, 30 Sep 2020 10:40:33 +0800 Subject: [PATCH 3/3] support clean inactive non-persistent subscriptions --- .../NonPersistentSubscription.java | 13 +++++ .../nonpersistent/NonPersistentTopic.java | 28 ++++++++++- .../api/NonDurableSubscriptionTest.java | 50 ++++++++++++++++++- 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 8675703ad31f4..954d4c95c03c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -64,12 +64,16 @@ public class NonPersistentSubscription implements Subscription { @SuppressWarnings("unused") private volatile int isFenced = FALSE; + // Timestamp of when this subscription was last seen active + private volatile long lastActive; + public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) { this.topic = topic; this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); IS_FENCED_UPDATER.set(this, FALSE); + this.lastActive = System.currentTimeMillis(); } @Override @@ -89,6 +93,7 @@ public boolean isReplicated() { @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + updateLastActive(); if (IS_FENCED_UPDATER.get(this) == TRUE) { log.warn("Attempting to add consumer {} on a fenced subscription", consumer); throw new SubscriptionFencedException("Subscription is fenced"); @@ -173,6 +178,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce @Override public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException { + updateLastActive(); if (dispatcher != null) { dispatcher.removeConsumer(consumer); } @@ -483,4 +489,11 @@ public CompletableFuture endTxn(long txnidMostBits, long txnidLeastBits, i private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class); + public long getLastActive() { + return lastActive; + } + + public void updateLastActive() { + this.lastActive = System.currentTimeMillis(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 5d64869e5d639..e506edfec0d53 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -881,7 +881,33 @@ public void checkGC() { @Override public void checkInactiveSubscriptions() { - // no-op + TopicName name = TopicName.get(topic); + try { + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path(POLICIES, name.getNamespace())) + .orElseThrow(KeeperException.NoNodeException::new); + final int defaultExpirationTime = brokerService.pulsar().getConfiguration() + .getSubscriptionExpirationTimeMinutes(); + final long expirationTimeMillis = TimeUnit.MINUTES + .toMillis((policies.subscription_expiration_time_minutes <= 0 && defaultExpirationTime > 0) + ? defaultExpirationTime + : policies.subscription_expiration_time_minutes); + if (expirationTimeMillis > 0) { + subscriptions.forEach((subName, sub) -> { + if (sub.getDispatcher() != null && sub.getDispatcher().isConsumerConnected() || sub.isReplicated()) { + return; + } + if (System.currentTimeMillis() - sub.getLastActive() > expirationTimeMillis) { + sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration", + topic, subName)); + } + }); + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[{}] Error getting policies", topic); + } + } } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index 5eb8085b263c0..db3689ec143ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -18,20 +18,30 @@ */ package org.apache.pulsar.client.api; +import java.lang.reflect.Field; +import java.util.UUID; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.client.impl.ConsumerImpl; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.testng.Assert.assertNotNull; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertTrue; + public class NonDurableSubscriptionTest extends ProducerConsumerBase { @BeforeMethod @Override protected void setup() throws Exception { + conf.setSubscriptionExpirationTimeMinutes(1); super.internalSetup(); super.producerBaseSetup(); } @@ -65,7 +75,7 @@ public void testNonDurableSubscription() throws Exception { // 3 receive the first 5 messages for (int i = 0; i < 5; i++) { Message message = consumer.receive(1, TimeUnit.SECONDS); - Assert.assertNotNull(message); + assertNotNull(message); Assert.assertEquals(message.getValue(), "message" + i); consumer.acknowledge(message); } @@ -74,9 +84,45 @@ public void testNonDurableSubscription() throws Exception { // 5 for non-durable we are going to restart from the next entry for (int i = 5; i < messageNum; i++) { Message message = consumer.receive(3, TimeUnit.SECONDS); - Assert.assertNotNull(message); + assertNotNull(message); Assert.assertEquals(message.getValue(), "message" + i); } } + + @Test(timeOut = 10000) + public void testDeleteInactiveNonPersistentSubscription() throws Exception { + final String topic = "non-persistent://my-property/my-ns/topic-" + UUID.randomUUID(); + final String subName = "my-subscriber"; + admin.topics().createNonPartitionedTopic(topic); + // 1 setup consumer + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName(subName).subscribe(); + // 3 due to the existence of consumers, subscriptions will not be cleaned up + NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + NonPersistentSubscription nonPersistentSubscription = (NonPersistentSubscription) nonPersistentTopic.getSubscription(subName); + assertNotNull(nonPersistentSubscription); + assertNotNull(nonPersistentSubscription.getDispatcher()); + assertTrue(nonPersistentSubscription.getDispatcher().isConsumerConnected()); + assertFalse(nonPersistentSubscription.isReplicated()); + + nonPersistentTopic.checkInactiveSubscriptions(); + Thread.sleep(500); + nonPersistentSubscription = (NonPersistentSubscription) nonPersistentTopic.getSubscription(subName); + assertNotNull(nonPersistentSubscription); + // remove consumer and wait for cleanup + consumer.close(); + Thread.sleep(500); + + //change last active time to 5 minutes ago + Field f = NonPersistentSubscription.class.getDeclaredField("lastActive"); + f.setAccessible(true); + f.set(nonPersistentTopic.getSubscription(subName), System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5)); + //without consumers and last active time is 5 minutes ago, subscription should be cleaned up + nonPersistentTopic.checkInactiveSubscriptions(); + Thread.sleep(500); + nonPersistentSubscription = (NonPersistentSubscription) nonPersistentTopic.getSubscription(subName); + assertNull(nonPersistentSubscription); + + } }