From c3292a611c9cbf2b17c96c5317d8f20247eb1f41 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 4 Mar 2020 17:21:53 -0800 Subject: [PATCH] [Issue 6394] Add configuration to disable auto creation of subscriptions (#6456) ### Motivation Fixes #6394 ### Modifications - provide a flag `allowAutoSubscriptionCreation` in `ServiceConfiguration`, defaults to `true` - when `allowAutoSubscriptionCreation` is disabled and the specified subscription (`Durable`) on the topic does not exist when trying to subscribe via a consumer, the server should reject the request directly by `handleSubscribe` in `ServerCnx` - create the subscription on the coordination topic if it does not exist when init `WorkerService` --- conf/broker.conf | 3 ++ conf/standalone.conf | 3 ++ .../pulsar/broker/ServiceConfiguration.java | 5 +++ .../service/BrokerServiceException.java | 6 +++ .../pulsar/broker/service/ServerCnx.java | 13 +++++- .../BrokerServiceAutoTopicCreationTest.java | 43 +++++++++++++++++++ .../functions/worker/WorkerService.java | 5 +++ site2/docs/reference-configuration.md | 1 + 8 files changed, 77 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index bfb7f7ceee12d..59898800773c9 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -91,6 +91,9 @@ allowAutoTopicCreation=true # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) allowAutoTopicCreationType=non-partitioned +# Enable subscription auto creation if new consumer connected (disable auto creation with value false) +allowAutoSubscriptionCreation=true + # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 diff --git a/conf/standalone.conf b/conf/standalone.conf index 7dff0c3c913ab..f351d8d2bff5f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -632,6 +632,9 @@ allowAutoTopicCreation=true # The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) allowAutoTopicCreationType=non-partitioned +# Enable subscription auto creation if new consumer connected (disable auto creation with value false) +allowAutoSubscriptionCreation=true + # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 86da1027f34d6..0d554f0c5dc74 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -998,6 +998,11 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)" ) private String allowAutoTopicCreationType = "non-partitioned"; + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Allow automated creation of subscriptions if set to true (default value)." + ) + private boolean allowAutoSubscriptionCreation = true; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "The number of partitioned topics that is allowed to be automatically created" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index e794718c5fb1b..b4bfed518ad3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -123,6 +123,12 @@ public TopicNotFoundException(String msg) { } } + public static class SubscriptionNotFoundException extends BrokerServiceException { + public SubscriptionNotFoundException(String msg) { + super(msg); + } + } + public static class SubscriptionBusyException extends BrokerServiceException { public SubscriptionBusyException(String msg) { super(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4497c977036fd..9294330c87f42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; @@ -725,7 +726,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) : null; - final String subscription = subscribe.getSubscription(); final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; final boolean readCompacted = subscribe.getReadCompacted(); final Map metadata = CommandUtils.metadataFromCommand(subscribe); @@ -751,7 +751,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { if (service.isAuthorizationEnabled()) { authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName, originalPrincipal != null ? originalPrincipal : authRole, authenticationData, - subscription); + subscriptionName); } else { authorizationFuture = CompletableFuture.completedFuture(true); } @@ -814,6 +814,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { Topic topic = optTopic.get(); + boolean rejectSubscriptionIfDoesNotExist = isDurable + && !service.pulsar().getConfig().isAllowAutoSubscriptionCreation() + && !topic.getSubscriptions().containsKey(subscriptionName); + + if (rejectSubscriptionIfDoesNotExist) { + return FutureUtil + .failedFuture(new SubscriptionNotFoundException("Subscription does not exist")); + } + if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index 929f9c24133fc..d595a64648cd6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -104,6 +105,48 @@ public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() thro assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName)); } + @Test + public void testAutoSubscriptionCreationDisable() throws Exception{ + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + final String topicName = "persistent://prop/ns-abc/test-subtopic"; + final String subscriptionName = "test-subtopic-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + + try { + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + fail("Subscribe operation should have failed"); + } catch (Exception e) { + assertTrue(e instanceof PulsarClientException); + } + assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + + // Reset to default + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); + } + + @Test + public void testSubscriptionCreationWithAutoCreationDisable() throws Exception{ + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + final String topicName = "persistent://prop/ns-abc/test-subtopic"; + final String subscriptionName = "test-subtopic-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + assertFalse(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + + // Create the subscription by PulsarAdmin + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName)); + + // Subscribe operation should be successful + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + + // Reset to default + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); + } + /** * CheckAllowAutoCreation's default value is false. * So using getPartitionedTopicMetadata() directly will not produce partitioned topic diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index fdfb7dec9b626..b23c707cc5c3a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -162,6 +163,10 @@ public void start(URI dlogUri, this.connectorsManager = new ConnectorsManager(workerConfig); //create membership manager + String coordinationTopic = workerConfig.getClusterCoordinationTopic(); + if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) { + brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest); + } this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin); // create function runtime manager diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index cdfe0d419a70d..edc3f996451b3 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -131,6 +131,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit | -1 | |allowAutoTopicCreation| Enable topic auto creation if a new producer or consumer connected |true| |allowAutoTopicCreationType| The topic type (partitioned or non-partitioned) that is allowed to be automatically created. |Partitioned| +|allowAutoSubscriptionCreation| Enable subscription auto creation if a new consumer connected |true| |defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if `allowAutoTopicCreationType` is partitioned |1| |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true| |brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60|