diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index 04cd94abed..9390df4708 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -144,6 +144,10 @@ s3.region=us-east-1 # The bucket of S3 service to store data s3.bucket=ko3 +# Use path style access for S3, default false +# If you are using minio for storage, you have to set this to true. +#s3.path.style=true + # The file path of delta WAL in block device s3.wal.path=/tmp/kraft-broker-logs/s3wal diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index b9f3b033cb..aa027a40b4 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -140,6 +140,10 @@ s3.region=us-east-1 # The bucket of S3 service to store data s3.bucket=ko3 +# Use path style access for S3, default false +# If you are using minio for storage, you have to set this to true. +#s3.path.style=true + ############################# Settings for Auto Balancer ############################# # Whether to enabled Auto Balancer in controller, default true autobalancer.controller.enable=true diff --git a/config/kraft/server.properties b/config/kraft/server.properties index 48837968d0..1a701d1dee 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -150,6 +150,10 @@ s3.region=us-east-1 # The bucket of S3 service to store data s3.bucket=ko3 +# Use path style access for S3, default false +# If you are using minio for storage, you have to set this to true. +#s3.path.style=true + # The file path of delta WAL in block device s3.wal.path=/tmp/kraft-combined-logs/s3wal diff --git a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java index 9878c0f8b3..839c54fff2 100644 --- a/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java +++ b/core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java @@ -63,6 +63,7 @@ public static Config to(KafkaConfig s) { .refillPeriodMs(s.s3RefillPeriodMsProp()) .objectRetentionTimeInSecond(s.s3ObjectRetentionTimeInSecond()) .failoverEnable(s.s3FailoverEnable()) + .forcePathStyle(s.s3PathStyle()) .accessKey(accessKey) .secretKey(secretKey); } diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index f689c50bff..04667a7b38 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -87,10 +87,11 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) { config.networkBaselineBandwidth(), config.refillPeriodMs(), config.networkBaselineBandwidth()); String accessKey = this.config.accessKey(); String secretKey = this.config.secretKey(); + boolean forcePathStyle = this.config.forcePathStyle(); S3Operator s3Operator = DefaultS3Operator.builder().endpoint(endpoint).region(region).bucket(bucket).accessKey(accessKey).secretKey(secretKey) - .inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).readWriteIsolate(true).build(); + .inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).readWriteIsolate(true).forcePathStyle(forcePathStyle).build(); S3Operator compactionS3Operator = DefaultS3Operator.builder().endpoint(endpoint).region(region).bucket(bucket).accessKey(accessKey).secretKey(secretKey) - .inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).build(); + .inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).forcePathStyle(forcePathStyle).build(); ControllerRequestSender.RetryPolicyContext retryPolicyContext = new ControllerRequestSender.RetryPolicyContext(kafkaConfig.s3ControllerRequestRetryMaxCount(), kafkaConfig.s3ControllerRequestRetryBaseDelayMs()); this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext); diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f01d0733d7..3a938e1b27 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -687,6 +687,7 @@ object KafkaConfig { // AutoMQ for Kafka inject start val S3EndpointProp = "s3.endpoint" val S3RegionProp = "s3.region" + val S3PathStyleProp = "s3.path.style" val S3BucketProp = "s3.bucket" val S3WALPathProp = "s3.wal.path" val S3WALCapacityProp = "s3.wal.capacity" @@ -723,6 +724,7 @@ object KafkaConfig { val S3EndpointDoc = "The S3 endpoint, ex. https://s3.{region}.amazonaws.com." val S3RegionDoc = "The S3 region, ex. us-east-1." + val S3PathStyleDoc = "The S3 path style, ex. true." val S3BucketDoc = "The S3 bucket, ex. my-bucket." val S3WALPathDoc = "The S3 WAL path. It could be a block device like /dev/xxx or file path in file system" val S3WALCapacityDoc = "The S3 WAL capacity. The value should be larger than s3.wal.cache.size cause of log storage format may not compact." @@ -1558,6 +1560,7 @@ object KafkaConfig { // AutoMQ for Kafka inject start .define(S3EndpointProp, STRING, null, HIGH, S3EndpointDoc) .define(S3RegionProp, STRING, null, HIGH, S3RegionDoc) + .define(S3PathStyleProp, BOOLEAN, false, HIGH, S3PathStyleDoc) .define(S3BucketProp, STRING, null, HIGH, S3BucketDoc) .define(S3WALPathProp, STRING, null, HIGH, S3WALPathDoc) .define(S3WALCacheSizeProp, LONG, 209715200L, MEDIUM, S3WALCacheSizeDoc) @@ -2127,6 +2130,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami /** ********* Kafka on S3 Configuration *********/ val s3Endpoint = getString(KafkaConfig.S3EndpointProp) val s3Region = getString(KafkaConfig.S3RegionProp) + val s3PathStyle = getBoolean(KafkaConfig.S3PathStyleProp) val s3Bucket = getString(KafkaConfig.S3BucketProp) val s3WALPath = getString(KafkaConfig.S3WALPathProp) val s3WALCacheSize = getLong(KafkaConfig.S3WALCacheSizeProp) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 7a01c24870..4c2f9e9f9a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1930,7 +1930,7 @@ private QuorumController( s3Operator = new MemoryS3Operator(); } else { S3StreamMetricsRegistry.setMetricsGroup(new KafkaS3StreamMetricsGroup()); - s3Operator = new DefaultS3Operator(streamConfig.endpoint(), streamConfig.region(), streamConfig.bucket(), false, + s3Operator = new DefaultS3Operator(streamConfig.endpoint(), streamConfig.region(), streamConfig.bucket(), streamConfig.forcePathStyle(), streamConfig.accessKey(), streamConfig.secretKey()); } this.s3ObjectControlManager = new S3ObjectControlManager(