Skip to content

Commit

Permalink
[fix][broker] Fix NPE when set AutoTopicCreationOverride (apache#15653
Browse files Browse the repository at this point in the history
)

(cherry picked from commit e2afcf0)
  • Loading branch information
mattisonchao authored and codelipenghui committed May 20, 2022
1 parent 6415b31 commit 56ef04c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
Expand Up @@ -94,6 +94,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.ValidateResult;
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
Expand Down Expand Up @@ -836,9 +837,11 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
"Invalid configuration for autoTopicCreationOverride. the detail is "
+ validateResult.getErrorInfo());
}
if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
if (Objects.equals(autoTopicCreationOverride.getTopicType(), TopicType.PARTITIONED.toString())) {
if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}
}
}
// Force to read the data s.t. the watch to the cache content is setup.
Expand Down
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -70,7 +70,8 @@ public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
String webServiceUrl = getBrokerWebServiceUrl(s);
String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
new PulsarResourceDescription(),
Collections.singletonMap(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
});
}

Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
Expand Down Expand Up @@ -1705,6 +1707,48 @@ public void testMaxNamespacesPerTenant() throws Exception {
}
}

@Test
public void testAutoTopicCreationOverrideWithMaxNumPartitionsLimit() throws Exception{
super.internalCleanup();
conf.setMaxNumPartitionsPerPartitionedTopic(10);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(
Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
// test non-partitioned
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
AutoTopicCreationOverride overridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("non-partitioned")
.build();
admin.namespaces().setAutoTopicCreation("testTenant/ns1", overridePolicy);
AutoTopicCreationOverride newOverridePolicy =
admin.namespaces().getAutoTopicCreation("testTenant/ns1");
assertEquals(overridePolicy, newOverridePolicy);
// test partitioned
AutoTopicCreationOverride partitionedOverridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(10)
.build();
admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedOverridePolicy);
AutoTopicCreationOverride partitionedNewOverridePolicy =
admin.namespaces().getAutoTopicCreation("testTenant/ns1");
assertEquals(partitionedOverridePolicy, partitionedNewOverridePolicy);
// test partitioned with error
AutoTopicCreationOverride partitionedWrongOverridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(123)
.build();
try {
admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedWrongOverridePolicy);
fail();
} catch (Exception ex) {
assertTrue(ex.getCause() instanceof NotAcceptableException);
}
}
@Test
public void testMaxTopicsPerNamespace() throws Exception {
super.internalCleanup();
Expand Down

0 comments on commit 56ef04c

Please sign in to comment.