diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a515c883569ca..27e3f1b034530 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -382,11 +382,7 @@ private void initializeRateLimiterIfNeeded() { this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC)); } - if (SubscribeRateLimiter.isSubscribeRateEnabled(getSubscribeRate())) { - this.subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this))); - } else { - this.subscribeRateLimiter = Optional.empty(); - } + updateSubscribeRateLimiter(); // dispatch rate limiter for each subscription subscriptions.forEach((name, subscription) -> { @@ -446,13 +442,21 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont } public void updateSubscribeRateLimiter() { - SubscribeRate subscribeRate = this.getSubscribeRate(); - if (isSubscribeRateEnabled(subscribeRate)) { - subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this))); - } else { - subscribeRateLimiter = Optional.empty(); + SubscribeRate subscribeRate = getSubscribeRate(); + synchronized (subscribeRateLimiter) { + if (isSubscribeRateEnabled(subscribeRate)) { + if (subscribeRateLimiter.isPresent()) { + this.subscribeRateLimiter.get().onSubscribeRateUpdate(subscribeRate); + } else { + this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this)); + } + } else { + if (subscribeRateLimiter.isPresent()) { + subscribeRateLimiter.get().close(); + subscribeRateLimiter = Optional.empty(); + } + } } - subscribeRateLimiter.ifPresent(limiter -> limiter.onSubscribeRateUpdate(subscribeRate)); } private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) { @@ -3090,6 +3094,7 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this)); } else if (!policies.get().isSubscribeRateSet() || policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) { + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); this.subscribeRateLimiter = Optional.empty(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index 771e34a4fd647..89af6f6be882f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -140,6 +140,9 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif } public void onSubscribeRateUpdate(SubscribeRate subscribeRate) { + if (this.subscribeRate.equals(subscribeRate)) { + return; + } this.subscribeRate = subscribeRate; stopResetTask(); for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java index 76399f32f7d99..547fbe354f1fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java @@ -18,31 +18,39 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.policies.data.SubscribeRate; import org.awaitility.Awaitility; import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker") public class SubscribeRateTest extends BrokerTestBase { - + @BeforeMethod @Override protected void setup() throws Exception { - //No-op + super.baseSetup(); + conf.setTopicLevelPoliciesEnabled(true); + conf.setSystemTopicEnabled(true); + conf.setMaxPendingPublishRequestsPerConnection(0); } + @AfterMethod(alwaysRun = true) @Override protected void cleanup() throws Exception { - //No-op + super.internalCleanup(); } @Test public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception { - conf.setTopicLevelPoliciesEnabled(true); - conf.setSystemTopicEnabled(true); - conf.setMaxPendingPublishRequestsPerConnection(0); - super.baseSetup(); final String topic = "persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate"; Producer producer = pulsarClient.newProducer() .topic(topic) @@ -77,4 +85,47 @@ public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception { producer.close(); } + + @Test + public void testUpdateSubscribeRateLimiter() throws Exception { + + final String topic = "persistent://prop/ns-abc/testUpdateSubscribeRateLimiter"; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .producerName("producer-name") + .create(); + + Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); + Assert.assertNotNull(topicRef); + Assert.assertTrue(topicRef instanceof PersistentTopic); + Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent()); + + // init + PersistentTopic persistentTopic = spy(((PersistentTopic) topicRef)); + when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(10, 60)); + persistentTopic.updateSubscribeRateLimiter(); + + Optional limiter1 = persistentTopic.getSubscribeRateLimiter(); + Assert.assertTrue(limiter1.isPresent()); + Assert.assertEquals(limiter1.get().getSubscribeRate(), new SubscribeRate(10, 60)); + + // update + when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(20, 120)); + persistentTopic.updateSubscribeRateLimiter(); + + Optional limiter2 = persistentTopic.getSubscribeRateLimiter(); + Assert.assertTrue(limiter2.isPresent()); + Assert.assertEquals(limiter2.get().getSubscribeRate(), new SubscribeRate(20, 120)); + + Assert.assertSame(limiter1, limiter2); + + // disable + when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(0, 0)); + persistentTopic.updateSubscribeRateLimiter(); + + Optional limiter3 = persistentTopic.getSubscribeRateLimiter(); + Assert.assertFalse(limiter3.isPresent()); + } }