Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,7 @@ private void initializeRateLimiterIfNeeded() {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}

if (SubscribeRateLimiter.isSubscribeRateEnabled(getSubscribeRate())) {
this.subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this)));
} else {
this.subscribeRateLimiter = Optional.empty();
}
updateSubscribeRateLimiter();

// dispatch rate limiter for each subscription
subscriptions.forEach((name, subscription) -> {
Expand Down Expand Up @@ -446,13 +442,21 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
}

public void updateSubscribeRateLimiter() {
SubscribeRate subscribeRate = this.getSubscribeRate();
if (isSubscribeRateEnabled(subscribeRate)) {
subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new SubscribeRateLimiter(this)));
} else {
subscribeRateLimiter = Optional.empty();
SubscribeRate subscribeRate = getSubscribeRate();
synchronized (subscribeRateLimiter) {
if (isSubscribeRateEnabled(subscribeRate)) {
if (subscribeRateLimiter.isPresent()) {
this.subscribeRateLimiter.get().onSubscribeRateUpdate(subscribeRate);
} else {
this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
}
} else {
if (subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().close();
subscribeRateLimiter = Optional.empty();
}
}
}
subscribeRateLimiter.ifPresent(limiter -> limiter.onSubscribeRateUpdate(subscribeRate));
}

private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
Expand Down Expand Up @@ -3090,6 +3094,7 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
} else if (!policies.get().isSubscribeRateSet()
|| policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
this.subscribeRateLimiter = Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
}

public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
if (this.subscribeRate.equals(subscribeRate)) {
return;
}
this.subscribeRate = subscribeRate;
stopResetTask();
for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,39 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.util.Optional;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class SubscribeRateTest extends BrokerTestBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
//No-op
super.baseSetup();
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
conf.setMaxPendingPublishRequestsPerConnection(0);
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
//No-op
super.internalCleanup();
}

@Test
public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
conf.setMaxPendingPublishRequestsPerConnection(0);
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
Expand Down Expand Up @@ -77,4 +85,47 @@ public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {

producer.close();
}

@Test
public void testUpdateSubscribeRateLimiter() throws Exception {

final String topic = "persistent://prop/ns-abc/testUpdateSubscribeRateLimiter";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();

Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
Assert.assertTrue(topicRef instanceof PersistentTopic);
Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());

// init
PersistentTopic persistentTopic = spy(((PersistentTopic) topicRef));
when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(10, 60));
persistentTopic.updateSubscribeRateLimiter();

Optional<SubscribeRateLimiter> limiter1 = persistentTopic.getSubscribeRateLimiter();
Assert.assertTrue(limiter1.isPresent());
Assert.assertEquals(limiter1.get().getSubscribeRate(), new SubscribeRate(10, 60));

// update
when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(20, 120));
persistentTopic.updateSubscribeRateLimiter();

Optional<SubscribeRateLimiter> limiter2 = persistentTopic.getSubscribeRateLimiter();
Assert.assertTrue(limiter2.isPresent());
Assert.assertEquals(limiter2.get().getSubscribeRate(), new SubscribeRate(20, 120));

Assert.assertSame(limiter1, limiter2);

// disable
when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(0, 0));
persistentTopic.updateSubscribeRateLimiter();

Optional<SubscribeRateLimiter> limiter3 = persistentTopic.getSubscribeRateLimiter();
Assert.assertFalse(limiter3.isPresent());
}
}