Skip to content

Commit 9ab3221

Browse files
dao-junsrinath-ctds
authored andcommitted
[fix][broker] Fix UnsupportedOperationException while setting subscription level dispatch rate policy (apache#24048)
(cherry picked from commit 9f38a5c) (cherry picked from commit 98c205b)
1 parent 6dd61f1 commit 9ab3221

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,11 @@
8686
import org.apache.pulsar.common.policies.data.PublishRate;
8787
import org.apache.pulsar.common.policies.data.RetentionPolicies;
8888
import org.apache.pulsar.common.policies.data.SubscribeRate;
89+
import org.apache.pulsar.common.policies.data.SubscriptionPolicies;
8990
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
9091
import org.apache.pulsar.common.policies.data.TopicPolicies;
9192
import org.apache.pulsar.common.policies.data.TopicStats;
93+
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
9294
import org.assertj.core.api.Assertions;
9395
import org.awaitility.Awaitility;
9496
import org.mockito.Mockito;
@@ -3215,7 +3217,7 @@ public void testDelayedDeliveryPolicy() throws Exception {
32153217

32163218
admin.topics().delete(topic, true);
32173219
}
3218-
3220+
32193221
@Test
32203222
public void testUpdateRetentionWithPartialFailure() throws Exception {
32213223
String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
@@ -3259,4 +3261,40 @@ public void testUpdateRetentionWithPartialFailure() throws Exception {
32593261
admin.namespaces().removeRetention(myNamespace);
32603262
admin.topics().delete(tpName, false);
32613263
}
3264+
3265+
@Test
3266+
public void testTopicPoliciesGetSubscriptionPolicies() throws Exception {
3267+
TopicPolicies topicPolicies = TopicPolicies.builder()
3268+
.maxProducerPerTopic(10).subscriptionPolicies(null).build();
3269+
Assert.assertNotNull(topicPolicies.getSubscriptionPolicies());
3270+
Assert.assertEquals(topicPolicies.getMaxProducerPerTopic(), 10);
3271+
Assert.assertTrue(topicPolicies.getSubscriptionPolicies().isEmpty());
3272+
topicPolicies.getSubscriptionPolicies().computeIfAbsent("sub", k ->
3273+
new SubscriptionPolicies()).setDispatchRate(new DispatchRateImpl());
3274+
Assert.assertEquals(topicPolicies.getSubscriptionPolicies().get("sub").getDispatchRate()
3275+
.getDispatchThrottlingRateInByte(), 0);
3276+
}
3277+
3278+
@Test
3279+
public void testSetSubRateWithNoSub() throws Exception {
3280+
String topic = "persistent://" + myNamespace + "/testSetSubRateWithNoSub";
3281+
admin.topics().createNonPartitionedTopic(topic);
3282+
admin.topicPolicies().setSubscriptionDispatchRate(topic, DispatchRate.builder()
3283+
.dispatchThrottlingRateInMsg(10)
3284+
.dispatchThrottlingRateInByte(10)
3285+
.ratePeriodInSecond(10)
3286+
.build());
3287+
}
3288+
3289+
@Test
3290+
public void testSetSubRateWithSub() throws Exception {
3291+
String topic = "persistent://" + myNamespace + "/testSetSubRateWithSub";
3292+
admin.topics().createNonPartitionedTopic(topic);
3293+
admin.topics().createSubscription(topic, "sub1", MessageId.earliest);
3294+
admin.topicPolicies().setSubscriptionDispatchRate(topic, "sub1", DispatchRate.builder()
3295+
.dispatchThrottlingRateInMsg(10)
3296+
.dispatchThrottlingRateInByte(10)
3297+
.ratePeriodInSecond(10)
3298+
.build());
3299+
}
32623300
}

pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
package org.apache.pulsar.common.policies.data;
2020

2121
import com.google.common.collect.Sets;
22+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2223
import java.util.ArrayList;
23-
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.Set;
28+
import java.util.concurrent.ConcurrentHashMap;
2829
import lombok.AllArgsConstructor;
2930
import lombok.Builder;
3031
import lombok.Data;
@@ -42,6 +43,7 @@
4243
@Builder
4344
@NoArgsConstructor
4445
@AllArgsConstructor
46+
@SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
4547
public class TopicPolicies {
4648

4749
@Builder.Default
@@ -193,6 +195,9 @@ public Set<String> getReplicationClustersSet() {
193195
}
194196

195197
public Map<String, SubscriptionPolicies> getSubscriptionPolicies() {
196-
return subscriptionPolicies == null ? Collections.emptyMap() : subscriptionPolicies;
198+
if (subscriptionPolicies == null) {
199+
subscriptionPolicies = new ConcurrentHashMap<>();
200+
}
201+
return subscriptionPolicies;
197202
}
198203
}

0 commit comments

Comments
 (0)