Skip to content

Commit

Permalink
[Issue 6394] Add configuration to disable auto creation of subscripti…
Browse files Browse the repository at this point in the history
…ons (#6456)

### Motivation

Fixes #6394

### Modifications

- provide a flag `allowAutoSubscriptionCreation` in `ServiceConfiguration`, defaults to `true`
- when `allowAutoSubscriptionCreation` is disabled and the specified subscription (`Durable`) on the topic does not exist when trying to subscribe via a consumer, the server should reject the request directly by `handleSubscribe` in `ServerCnx`
- create the subscription on the coordination topic if it does not exist when init `WorkerService`
  • Loading branch information
sijie committed Mar 5, 2020
1 parent 65cc303 commit c3292a6
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 2 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
allowAutoSubscriptionCreation=true

# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,9 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
allowAutoSubscriptionCreation=true

# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
)
private String allowAutoTopicCreationType = "non-partitioned";
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Allow automated creation of subscriptions if set to true (default value)."
)
private boolean allowAutoSubscriptionCreation = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "The number of partitioned topics that is allowed to be automatically created"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ public TopicNotFoundException(String msg) {
}
}

public static class SubscriptionNotFoundException extends BrokerServiceException {
public SubscriptionNotFoundException(String msg) {
super(msg);
}
}

public static class SubscriptionBusyException extends BrokerServiceException {
public SubscriptionBusyException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
Expand Down Expand Up @@ -725,7 +726,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
: null;
final String subscription = subscribe.getSubscription();
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.getReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
Expand All @@ -751,7 +751,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (service.isAuthorizationEnabled()) {
authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
subscription);
subscriptionName);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -814,6 +814,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {

Topic topic = optTopic.get();

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.pulsar().getConfig().isAllowAutoSubscriptionCreation()
&& !topic.getSubscriptions().containsKey(subscriptionName);

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(new SubscriptionNotFoundException("Subscription does not exist"));
}

if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -104,6 +105,48 @@ public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() thro
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
}

@Test
public void testAutoSubscriptionCreationDisable() throws Exception{
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "persistent://prop/ns-abc/test-subtopic";
final String subscriptionName = "test-subtopic-sub";

admin.topics().createNonPartitionedTopic(topicName);

try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));

// Reset to default
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
}

@Test
public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "persistent://prop/ns-abc/test-subtopic";
final String subscriptionName = "test-subtopic-sub";

admin.topics().createNonPartitionedTopic(topicName);
assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName));

// Create the subscription by PulsarAdmin
admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));

// Subscribe operation should be successful
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();

// Reset to default
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
}

/**
* CheckAllowAutoCreation's default value is false.
* So using getPartitionedTopicMetadata() directly will not produce partitioned topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

Expand Down Expand Up @@ -162,6 +163,10 @@ public void start(URI dlogUri,
this.connectorsManager = new ConnectorsManager(workerConfig);

//create membership manager
String coordinationTopic = workerConfig.getClusterCoordinationTopic();
if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) {
brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest);
}
this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin);

// create function runtime manager
Expand Down
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit | -1 |
|allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true|
|allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned|
|allowAutoSubscriptionCreation| Enable subscription auto creation if a new consumer connected |true|
|defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if `allowAutoTopicCreationType` is partitioned |1|
|brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true|
|brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60|
Expand Down

0 comments on commit c3292a6

Please sign in to comment.