Skip to content

Commit

Permalink
[ISSUE apache#5759] Clean inactive non-persistent subscriptions (apac…
Browse files Browse the repository at this point in the history
…he#8166)

Fixes apache#5759

### Motivation
unused subscriptions will never be cleared on non-persistent topics

### Modifications
Add the `lastActive` attribute for non-persistent subscriptions. In the `checkInactiveSubscriptions` method of `NonPersistentTopic`, eligible subscriptions will be cleaned up

### Verifying this change
org.apache.pulsar.client.api.NonDurableSubscriptionTest#testDeleteInactiveNonPersistentSubscription
  • Loading branch information
315157973 authored and huangdx0726 committed Nov 13, 2020
1 parent 49696d6 commit cdbab5e
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@ public class NonPersistentSubscription implements Subscription {
@SuppressWarnings("unused")
private volatile int isFenced = FALSE;

// Timestamp of when this subscription was last seen active
private volatile long lastActive;

public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
this.topic = topic;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
IS_FENCED_UPDATER.set(this, FALSE);
this.lastActive = System.currentTimeMillis();
}

@Override
Expand All @@ -89,6 +93,7 @@ public boolean isReplicated() {

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
updateLastActive();
if (IS_FENCED_UPDATER.get(this) == TRUE) {
log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
throw new SubscriptionFencedException("Subscription is fenced");
Expand Down Expand Up @@ -173,6 +178,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

@Override
public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
updateLastActive();
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
Expand Down Expand Up @@ -483,4 +489,11 @@ public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, i

private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);

public long getLastActive() {
return lastActive;
}

public void updateLastActive() {
this.lastActive = System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,33 @@ public void checkGC() {

@Override
public void checkInactiveSubscriptions() {
// no-op
TopicName name = TopicName.get(topic);
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(KeeperException.NoNodeException::new);
final int defaultExpirationTime = brokerService.pulsar().getConfiguration()
.getSubscriptionExpirationTimeMinutes();
final long expirationTimeMillis = TimeUnit.MINUTES
.toMillis((policies.subscription_expiration_time_minutes <= 0 && defaultExpirationTime > 0)
? defaultExpirationTime
: policies.subscription_expiration_time_minutes);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
if (sub.getDispatcher() != null && sub.getDispatcher().isConsumerConnected() || sub.isReplicated()) {
return;
}
if (System.currentTimeMillis() - sub.getLastActive() > expirationTimeMillis) {
sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration",
topic, subName));
}
});
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,30 @@
*/
package org.apache.pulsar.client.api;

import java.lang.reflect.Field;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;

public class NonDurableSubscriptionTest extends ProducerConsumerBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setSubscriptionExpirationTimeMinutes(1);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -65,7 +75,7 @@ public void testNonDurableSubscription() throws Exception {
// 3 receive the first 5 messages
for (int i = 0; i < 5; i++) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(message);
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
consumer.acknowledge(message);
}
Expand All @@ -74,9 +84,45 @@ public void testNonDurableSubscription() throws Exception {
// 5 for non-durable we are going to restart from the next entry
for (int i = 5; i < messageNum; i++) {
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNotNull(message);
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
}

}

@Test(timeOut = 10000)
public void testDeleteInactiveNonPersistentSubscription() throws Exception {
final String topic = "non-persistent://my-property/my-ns/topic-" + UUID.randomUUID();
final String subName = "my-subscriber";
admin.topics().createNonPartitionedTopic(topic);
// 1 setup consumer
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionName(subName).subscribe();
// 3 due to the existence of consumers, subscriptions will not be cleaned up
NonPersistentTopic nonPersistentTopic = (NonPersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
NonPersistentSubscription nonPersistentSubscription = (NonPersistentSubscription) nonPersistentTopic.getSubscription(subName);
assertNotNull(nonPersistentSubscription);
assertNotNull(nonPersistentSubscription.getDispatcher());
assertTrue(nonPersistentSubscription.getDispatcher().isConsumerConnected());
assertFalse(nonPersistentSubscription.isReplicated());

nonPersistentTopic.checkInactiveSubscriptions();
Thread.sleep(500);
nonPersistentSubscription = (NonPersistentSubscription) nonPersistentTopic.getSubscription(subName);
assertNotNull(nonPersistentSubscription);
// remove consumer and wait for cleanup
consumer.close();
Thread.sleep(500);

//change last active time to 5 minutes ago
Field f = NonPersistentSubscription.class.getDeclaredField("lastActive");
f.setAccessible(true);
f.set(nonPersistentTopic.getSubscription(subName), System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));
//without consumers and last active time is 5 minutes ago, subscription should be cleaned up
nonPersistentTopic.checkInactiveSubscriptions();
Thread.sleep(500);
nonPersistentSubscription = (NonPersistentSubscription) nonPersistentTopic.getSubscription(subName);
assertNull(nonPersistentSubscription);

}
}

0 comments on commit cdbab5e

Please sign in to comment.