Skip to content

Commit

Permalink
[fix][broker] new load balancer system topic should not be auto-creat…
Browse files Browse the repository at this point in the history
…ed now
  • Loading branch information
heesung-sn committed Jun 13, 2023
1 parent f7c4537 commit ff3e3fa
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
Expand Down Expand Up @@ -3304,10 +3305,19 @@ private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName
topicName.getNamespaceObject());
return CompletableFuture.completedFuture(false);
}
//System topic can always be created automatically

// ServiceUnitStateChannelImpl.TOPIC expects to be a non-partitioned-topic now.
// We don't allow the auto-creation here.
// ServiceUnitStateChannelImpl.start() is responsible to create the topic.
if (ServiceUnitStateChannelImpl.TOPIC.equals(topicName.toString())) {
return CompletableFuture.completedFuture(false);
}

//Other system topics can be created automatically
if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
return CompletableFuture.completedFuture(true);
}

final boolean allowed;
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -1579,7 +1580,6 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
});
}


// this test is disabled since it is flaky
@Test(enabled = false)
public void testBrokerStatsTopicLoadFailed() throws Exception {
Expand Down Expand Up @@ -1657,4 +1657,13 @@ public void testBrokerStatsTopicLoadFailed() throws Exception {
return flag.get();
});
}

@Test
public void testIsSystemTopicAllowAutoTopicCreationAsync() throws Exception {
BrokerService brokerService = pulsar.getBrokerService();
assertFalse(brokerService.isAllowAutoTopicCreationAsync(
ServiceUnitStateChannelImpl.TOPIC).get());
assertTrue(brokerService.isAllowAutoTopicCreationAsync(
"persistent://pulsar/system/my-system-topic").get());
}
}

0 comments on commit ff3e3fa

Please sign in to comment.