diff --git a/config/kraft/server.properties b/config/kraft/server.properties index d255cfbb97..b8377b3afb 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -149,9 +149,10 @@ elasticstream.enable=true # The Elastic Stream endpoint, ex. es://hostname1:port1,hostname2:port2,hostname3:port3. # You could also PoC launch it in memory mode with endpoint like memory:// or redis mode with redis://. # Note that in memory mode, this Kafka node can not work in a cluster. -elasticstream.endpoint=es://localhost:12378 -# The Elastic Stream kv endpoint. It should be the same as the endpoint for now. -elasticstream.kv.endpoint=es://localhost:12379 +elasticstream.endpoint=s3:// # The stream namespace, default is clusterId. # elasticstream.namespace=xxxx +s3.endpoint=https://s3.amazonaws.com +s3.region=us-east-1 +s3.bucket=ko3 diff --git a/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java new file mode 100644 index 0000000000..1bd45026f9 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.client.s3; + +import com.automq.elasticstream.client.api.Client; +import kafka.log.es.AlwaysSuccessClient; +import kafka.log.es.client.Context; +import kafka.log.s3.S3Client; + +public class ClientFactory { + public static Client get(Context context) { + return new AlwaysSuccessClient(new S3Client()); + } +} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d732d583ff..b740186754 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -672,11 +672,13 @@ object KafkaConfig { // elastic stream inject end // Kafka on S3 inject start - val S3RegionProp = "s3.region"; - val S3BucketProp = "s3.bucket"; + val S3EndpointProp = "s3.endpoint" + val S3RegionProp = "s3.region" + val S3BucketProp = "s3.bucket" - val S3RegionDoc = "Specifies the S3 region, ex. us-east-1.\n" - val S3BucketDoc = "Specifies the S3 bucket, ex. my-bucket.\n" + val S3EndpointDoc = "Specifies the S3 endpoint, ex. https://s3.{region}.amazonaws.com." + val S3RegionDoc = "Specifies the S3 region, ex. us-east-1." + val S3BucketDoc = "Specifies the S3 bucket, ex. my-bucket." // Kafka on S3 inject end /* Documentation */ @@ -1474,6 +1476,7 @@ object KafkaConfig { // elastic stream inject end // Kafka on S3 inject start + .define(S3EndpointProp, STRING, null, HIGH, S3EndpointDoc) .define(S3RegionProp, STRING, null, HIGH, S3RegionDoc) .define(S3BucketProp, STRING, null, HIGH, S3BucketDoc) // Kafka on S3 inject end @@ -2011,8 +2014,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // Kafka on S3 inject start /** ********* Kafka on S3 Configuration *********/ - val s3Region = getString(KafkaConfig.S3RegionProp) - val s3Bucket = getString(KafkaConfig.S3BucketProp) + val s3Endpoint = getString(KafkaConfig.S3EndpointProp) + val s3Region = getString(KafkaConfig.S3RegionProp) + val s3Bucket = getString(KafkaConfig.S3BucketProp) // Kafka on S3 inject end def addReconfigurable(reconfigurable: Reconfigurable): Unit = {