Skip to content

Commit

Permalink
Fix subscription dispatch rate does not work after the topic unload w…
Browse files Browse the repository at this point in the history
…ithout dispatch rate limit. (#8947)

### Motivation

Fix subscription dispatch rate does not work after the topic unload without dispatch rate limit.
Currently, if the subscription dispatch rate is present then update the rate limit, the subscription can be affected by the new policy. But if the subscription dispatch rate is absent, update the rate limit, the subscription cannot be affected.

### Verifying this change

New unitest added.
  • Loading branch information
codelipenghui committed Dec 14, 2020
1 parent 13e57bd commit 37e02fa
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;

public interface Dispatcher {
Expand Down Expand Up @@ -91,6 +92,10 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
return Optional.empty();
}

default void updateRateLimiter(DispatchRate dispatchRate) {

}

default void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
//No-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
Expand Down Expand Up @@ -766,6 +767,20 @@ public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
}

@Override
public void updateRateLimiter(DispatchRate dispatchRate) {
if (!this.dispatchRateLimiter.isPresent() && dispatchRate != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
}
this.dispatchRateLimiter.ifPresent(limiter -> {
if (dispatchRate != null) {
this.dispatchRateLimiter.get().updateDispatchRate(dispatchRate);
} else {
this.dispatchRateLimiter.get().updateDispatchRate();
}
});
}

@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
Expand Down Expand Up @@ -570,6 +571,20 @@ public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
}

@Override
public void updateRateLimiter(DispatchRate dispatchRate) {
if (!this.dispatchRateLimiter.isPresent() && dispatchRate != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
}
this.dispatchRateLimiter.ifPresent(limiter -> {
if (dispatchRate != null) {
this.dispatchRateLimiter.get().updateDispatchRate(dispatchRate);
} else {
this.dispatchRateLimiter.get().updateDispatchRate();
}
});
}

@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2647,13 +2647,7 @@ public void onUpdate(TopicPolicies policies) {
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
if (policies.isSubscriptionDispatchRateSet()) {
dispatcher.getRateLimiter().ifPresent(rateLimiter ->
rateLimiter.updateDispatchRate(policies.getSubscriptionDispatchRate()));
} else {
dispatcher.getRateLimiter().ifPresent(rateLimiter ->
rateLimiter.updateDispatchRate());
}
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
});

if (policies.getPublishRate() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,37 @@ public void testGetSetSubscriptionDispatchRate() throws Exception {
admin.topics().delete(topic, true);
}

@Test
public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);

Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

DispatchRate dispatchRate = new DispatchRate(1000,
1024 * 1024, 1);
log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);

String subscriptionName = "test_subscription_rate";
Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();

admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
log.info("Subscription dispatch rate set success on topic: {}", topic);

Awaitility.await().atMost(3, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), dispatchRate));

DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertNotNull(dispatchRateLimiter);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);

consumer.close();
admin.topics().delete(topic, true);
}

@Test
public void testRemoveSubscriptionDispatchRate() throws Exception {
final String topic = testTopic + UUID.randomUUID();
Expand Down

0 comments on commit 37e02fa

Please sign in to comment.