Skip to content

Commit

Permalink
[fix][broker] Fix NPE when set AutoTopicCreationOverride (apache#15653
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mattisonchao committed May 20, 2022
1 parent d1f4e19 commit e2afcf0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
Expand Up @@ -97,6 +97,7 @@
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.ValidateResult;
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
Expand Down Expand Up @@ -836,9 +837,11 @@ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
"Invalid configuration for autoTopicCreationOverride. the detail is "
+ validateResult.getErrorInfo());
}
if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
if (Objects.equals(autoTopicCreationOverride.getTopicType(), TopicType.PARTITIONED.toString())) {
if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}
}
}
// Force to read the data s.t. the watch to the cache content is setup.
Expand Down
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
Expand Down Expand Up @@ -1708,6 +1710,48 @@ public void testMaxNamespacesPerTenant() throws Exception {
}
}

@Test
public void testAutoTopicCreationOverrideWithMaxNumPartitionsLimit() throws Exception{
super.internalCleanup();
conf.setMaxNumPartitionsPerPartitionedTopic(10);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(
Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("testTenant", tenantInfo);
// test non-partitioned
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
AutoTopicCreationOverride overridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("non-partitioned")
.build();
admin.namespaces().setAutoTopicCreation("testTenant/ns1", overridePolicy);
AutoTopicCreationOverride newOverridePolicy =
admin.namespaces().getAutoTopicCreation("testTenant/ns1");
assertEquals(overridePolicy, newOverridePolicy);
// test partitioned
AutoTopicCreationOverride partitionedOverridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(10)
.build();
admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedOverridePolicy);
AutoTopicCreationOverride partitionedNewOverridePolicy =
admin.namespaces().getAutoTopicCreation("testTenant/ns1");
assertEquals(partitionedOverridePolicy, partitionedNewOverridePolicy);
// test partitioned with error
AutoTopicCreationOverride partitionedWrongOverridePolicy = AutoTopicCreationOverride
.builder().allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(123)
.build();
try {
admin.namespaces().setAutoTopicCreation("testTenant/ns1", partitionedWrongOverridePolicy);
fail();
} catch (Exception ex) {
assertTrue(ex.getCause() instanceof NotAcceptableException);
}
}
@Test
public void testMaxTopicsPerNamespace() throws Exception {
super.internalCleanup();
Expand Down

0 comments on commit e2afcf0

Please sign in to comment.