Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions config/kraft/controller.properties
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ log.retention.check.interval.ms=300000
# Whether to enable store data in elastic stream layer
elasticstream.enable=true

# The topic creation policy for AutoMQ for Kafka
create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy

# The endpoint for S3 service
# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
Expand Down
3 changes: 0 additions & 3 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,6 @@ log.retention.check.interval.ms=300000
# Whether to enable store data in elastic stream layer
elasticstream.enable=true

# The topic creation policy for AutoMQ for Kafka
create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy

# The endpoint for S3 service
# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
Expand Down
3 changes: 0 additions & 3 deletions kshell-sdk/src/main/resources/template/controller.properties
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ log.retention.check.interval.ms=300000
# Whether to enable store data in elastic stream layer
elasticstream.enable=true

# The topic creation policy for AutoMQ for Kafka
create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy

# The endpoint for S3 service
# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
Expand Down
3 changes: 0 additions & 3 deletions kshell-sdk/src/main/resources/template/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,6 @@ log.retention.check.interval.ms=300000
# Whether to enable store data in elastic stream layer
elasticstream.enable=true

# The topic creation policy for AutoMQ for Kafka
create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy

# The endpoint for S3 service
# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.es.AutoMQCreateTopicPolicy;
import org.apache.kafka.controller.es.CreatePartitionPolicy;
import org.apache.kafka.controller.es.ElasticCreatePartitionPolicy;
import org.apache.kafka.controller.es.PartitionLeaderSelector;
Expand Down Expand Up @@ -328,6 +329,11 @@ static Map<String, String> translateCreationConfigs(CreateableTopicConfigCollect
*/
private final Optional<CreateTopicPolicy> createTopicPolicy;

/**
* The policy that must be validated before creating a topic.
*/
private final CreateTopicPolicy autoMQCreateTopicPolicy = new AutoMQCreateTopicPolicy();

/**
* The policy to use to validate that partition assignments are valid, if one is present.
*/
Expand Down Expand Up @@ -812,13 +818,15 @@ private ApiError createTopic(CreatableTopic topic,
}

private ApiError maybeCheckCreateTopicPolicy(Supplier<CreateTopicPolicy.RequestMetadata> supplier) {
if (createTopicPolicy.isPresent()) {
try {
createTopicPolicy.get().validate(supplier.get());
} catch (PolicyViolationException e) {
return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
}
// AutoMQ for Kafka inject start
try {
autoMQCreateTopicPolicy.validate(supplier.get());
createTopicPolicy.ifPresent(topicPolicy -> topicPolicy.validate(supplier.get()));
} catch (PolicyViolationException e) {
return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
}
// AutoMQ for Kafka inject end

return ApiError.NONE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.es;
package org.apache.kafka.controller.es;

import java.util.Map;
import org.apache.kafka.common.errors.PolicyViolationException;
Expand All @@ -23,7 +23,7 @@
/**
* <p>A policy on create topic requests.
*/
public class ElasticCreateTopicPolicy implements CreateTopicPolicy {
public class AutoMQCreateTopicPolicy implements CreateTopicPolicy {
@Override
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
if (requestMetadata.replicationFactor() != null && requestMetadata.replicationFactor() != 1) {
Expand Down
1 change: 0 additions & 1 deletion tests/kafkatest/services/kafka/templates/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ group.initial.rebalance.delay.ms=100
log.cleaner.dedupe.buffer.size=33554432

############################# Settings for es #############################
#create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
# enable store data in object storage
elasticstream.enable=true
elasticstream.endpoint=s3://
Expand Down