Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ s3.region=us-east-1
# The bucket of S3 service to store data
s3.bucket=ko3

# Use path style access for S3, default false
# If you are using minio for storage, you have to set this to true.
#s3.path.style=true

# The file path of delta WAL in block device
s3.wal.path=/tmp/kraft-broker-logs/s3wal

Expand Down
4 changes: 4 additions & 0 deletions config/kraft/controller.properties
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ s3.region=us-east-1
# The bucket of S3 service to store data
s3.bucket=ko3

# Use path style access for S3, default false
# If you are using minio for storage, you have to set this to true.
#s3.path.style=true

############################# Settings for Auto Balancer #############################
# Whether to enabled Auto Balancer in controller, default true
autobalancer.controller.enable=true
Expand Down
4 changes: 4 additions & 0 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ s3.region=us-east-1
# The bucket of S3 service to store data
s3.bucket=ko3

# Use path style access for S3, default false
# If you are using minio for storage, you have to set this to true.
#s3.path.style=true

# The file path of delta WAL in block device
s3.wal.path=/tmp/kraft-combined-logs/s3wal

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static Config to(KafkaConfig s) {
.refillPeriodMs(s.s3RefillPeriodMsProp())
.objectRetentionTimeInSecond(s.s3ObjectRetentionTimeInSecond())
.failoverEnable(s.s3FailoverEnable())
.forcePathStyle(s.s3PathStyle())
.accessKey(accessKey)
.secretKey(secretKey);
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) {
config.networkBaselineBandwidth(), config.refillPeriodMs(), config.networkBaselineBandwidth());
String accessKey = this.config.accessKey();
String secretKey = this.config.secretKey();
boolean forcePathStyle = this.config.forcePathStyle();
S3Operator s3Operator = DefaultS3Operator.builder().endpoint(endpoint).region(region).bucket(bucket).accessKey(accessKey).secretKey(secretKey)
.inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).readWriteIsolate(true).build();
.inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).readWriteIsolate(true).forcePathStyle(forcePathStyle).build();
S3Operator compactionS3Operator = DefaultS3Operator.builder().endpoint(endpoint).region(region).bucket(bucket).accessKey(accessKey).secretKey(secretKey)
.inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).build();
.inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).forcePathStyle(forcePathStyle).build();
ControllerRequestSender.RetryPolicyContext retryPolicyContext = new ControllerRequestSender.RetryPolicyContext(kafkaConfig.s3ControllerRequestRetryMaxCount(),
kafkaConfig.s3ControllerRequestRetryBaseDelayMs());
this.requestSender = new ControllerRequestSender(brokerServer, retryPolicyContext);
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ object KafkaConfig {
// AutoMQ for Kafka inject start
val S3EndpointProp = "s3.endpoint"
val S3RegionProp = "s3.region"
val S3PathStyleProp = "s3.path.style"
val S3BucketProp = "s3.bucket"
val S3WALPathProp = "s3.wal.path"
val S3WALCapacityProp = "s3.wal.capacity"
Expand Down Expand Up @@ -723,6 +724,7 @@ object KafkaConfig {

val S3EndpointDoc = "The S3 endpoint, ex. <code>https://s3.{region}.amazonaws.com</code>."
val S3RegionDoc = "The S3 region, ex. <code>us-east-1</code>."
val S3PathStyleDoc = "The S3 path style, ex. <code>true</code>."
val S3BucketDoc = "The S3 bucket, ex. <code>my-bucket</code>."
val S3WALPathDoc = "The S3 WAL path. It could be a block device like /dev/xxx or file path in file system"
val S3WALCapacityDoc = "The S3 WAL capacity. The value should be larger than s3.wal.cache.size cause of log storage format may not compact."
Expand Down Expand Up @@ -1558,6 +1560,7 @@ object KafkaConfig {
// AutoMQ for Kafka inject start
.define(S3EndpointProp, STRING, null, HIGH, S3EndpointDoc)
.define(S3RegionProp, STRING, null, HIGH, S3RegionDoc)
.define(S3PathStyleProp, BOOLEAN, false, HIGH, S3PathStyleDoc)
.define(S3BucketProp, STRING, null, HIGH, S3BucketDoc)
.define(S3WALPathProp, STRING, null, HIGH, S3WALPathDoc)
.define(S3WALCacheSizeProp, LONG, 209715200L, MEDIUM, S3WALCacheSizeDoc)
Expand Down Expand Up @@ -2127,6 +2130,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
/** ********* Kafka on S3 Configuration *********/
val s3Endpoint = getString(KafkaConfig.S3EndpointProp)
val s3Region = getString(KafkaConfig.S3RegionProp)
val s3PathStyle = getBoolean(KafkaConfig.S3PathStyleProp)
val s3Bucket = getString(KafkaConfig.S3BucketProp)
val s3WALPath = getString(KafkaConfig.S3WALPathProp)
val s3WALCacheSize = getLong(KafkaConfig.S3WALCacheSizeProp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,7 @@ private QuorumController(
s3Operator = new MemoryS3Operator();
} else {
S3StreamMetricsRegistry.setMetricsGroup(new KafkaS3StreamMetricsGroup());
s3Operator = new DefaultS3Operator(streamConfig.endpoint(), streamConfig.region(), streamConfig.bucket(), false,
s3Operator = new DefaultS3Operator(streamConfig.endpoint(), streamConfig.region(), streamConfig.bucket(), streamConfig.forcePathStyle(),
streamConfig.accessKey(), streamConfig.secretKey());
}
this.s3ObjectControlManager = new S3ObjectControlManager(
Expand Down