diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index b4df521e60..5a0493ee2b 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -125,9 +125,6 @@ log.retention.check.interval.ms=300000 # Whether to enable store data in elastic stream layer elasticstream.enable=true -# The topic creation policy for AutoMQ for Kafka -create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy - # The endpoint for S3 service # see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3 # For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead. diff --git a/config/kraft/server.properties b/config/kraft/server.properties index 506e4321e5..a39857279d 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -135,9 +135,6 @@ log.retention.check.interval.ms=300000 # Whether to enable store data in elastic stream layer elasticstream.enable=true -# The topic creation policy for AutoMQ for Kafka -create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy - # The endpoint for S3 service # see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3 # For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead. diff --git a/kshell-sdk/src/main/resources/template/controller.properties b/kshell-sdk/src/main/resources/template/controller.properties index b4df521e60..5a0493ee2b 100644 --- a/kshell-sdk/src/main/resources/template/controller.properties +++ b/kshell-sdk/src/main/resources/template/controller.properties @@ -125,9 +125,6 @@ log.retention.check.interval.ms=300000 # Whether to enable store data in elastic stream layer elasticstream.enable=true -# The topic creation policy for AutoMQ for Kafka -create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy - # The endpoint for S3 service # see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3 # For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead. diff --git a/kshell-sdk/src/main/resources/template/server.properties b/kshell-sdk/src/main/resources/template/server.properties index efe1ad0a82..c2417521c4 100644 --- a/kshell-sdk/src/main/resources/template/server.properties +++ b/kshell-sdk/src/main/resources/template/server.properties @@ -135,9 +135,6 @@ log.retention.check.interval.ms=300000 # Whether to enable store data in elastic stream layer elasticstream.enable=true -# The topic creation policy for AutoMQ for Kafka -create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy - # The endpoint for S3 service # see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3 # For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead. diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index aa37fafcd4..cdc3ae44ea 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -82,6 +82,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.es.AutoMQCreateTopicPolicy; import org.apache.kafka.controller.es.CreatePartitionPolicy; import org.apache.kafka.controller.es.ElasticCreatePartitionPolicy; import org.apache.kafka.controller.es.PartitionLeaderSelector; @@ -328,6 +329,11 @@ static Map translateCreationConfigs(CreateableTopicConfigCollect */ private final Optional createTopicPolicy; + /** + * The policy that must be validated before creating a topic. + */ + private final CreateTopicPolicy autoMQCreateTopicPolicy = new AutoMQCreateTopicPolicy(); + /** * The policy to use to validate that partition assignments are valid, if one is present. */ @@ -812,13 +818,15 @@ private ApiError createTopic(CreatableTopic topic, } private ApiError maybeCheckCreateTopicPolicy(Supplier supplier) { - if (createTopicPolicy.isPresent()) { - try { - createTopicPolicy.get().validate(supplier.get()); - } catch (PolicyViolationException e) { - return new ApiError(Errors.POLICY_VIOLATION, e.getMessage()); - } + // AutoMQ for Kafka inject start + try { + autoMQCreateTopicPolicy.validate(supplier.get()); + createTopicPolicy.ifPresent(topicPolicy -> topicPolicy.validate(supplier.get())); + } catch (PolicyViolationException e) { + return new ApiError(Errors.POLICY_VIOLATION, e.getMessage()); } + // AutoMQ for Kafka inject end + return ApiError.NONE; } diff --git a/core/src/main/scala/kafka/server/es/ElasticCreateTopicPolicy.java b/metadata/src/main/java/org/apache/kafka/controller/es/AutoMQCreateTopicPolicy.java similarity index 94% rename from core/src/main/scala/kafka/server/es/ElasticCreateTopicPolicy.java rename to metadata/src/main/java/org/apache/kafka/controller/es/AutoMQCreateTopicPolicy.java index eda3c697c7..369ded3d69 100644 --- a/core/src/main/scala/kafka/server/es/ElasticCreateTopicPolicy.java +++ b/metadata/src/main/java/org/apache/kafka/controller/es/AutoMQCreateTopicPolicy.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.server.es; +package org.apache.kafka.controller.es; import java.util.Map; import org.apache.kafka.common.errors.PolicyViolationException; @@ -23,7 +23,7 @@ /** *

A policy on create topic requests. */ -public class ElasticCreateTopicPolicy implements CreateTopicPolicy { +public class AutoMQCreateTopicPolicy implements CreateTopicPolicy { @Override public void validate(RequestMetadata requestMetadata) throws PolicyViolationException { if (requestMetadata.replicationFactor() != null && requestMetadata.replicationFactor() != 1) { diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 37685415fa..c4053bba8e 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -136,7 +136,6 @@ group.initial.rebalance.delay.ms=100 log.cleaner.dedupe.buffer.size=33554432 ############################# Settings for es ############################# -#create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy # enable store data in object storage elasticstream.enable=true elasticstream.endpoint=s3://