diff --git a/bin/automq-kafka-admin.sh b/bin/automq-kafka-admin.sh
new file mode 100755
index 0000000000..15400db234
--- /dev/null
+++ b/bin/automq-kafka-admin.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.automq.AutoMQKafkaAdminTool "$@"
+
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 81b67e2d1c..67c500e9ed 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -335,6 +335,8 @@ if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
fi
KAFKA_JDK_COMPATIBILITY_OPTS=""
+# We need to override KAFKA_S3_ACCESS_KEY and KAFKA_S3_SECRET_KEY. There is no method called System.setEnv, so we set system environment variable by reflection. Add this --add-opens to enable reflection to set system env in class EnvUtil
+KAFKA_JDK_COMPATIBILITY_OPTS="${KAFKA_JDK_COMPATIBILITY_OPTS} --add-opens=java.base/java.util=ALL-UNNAMED "
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
KAFKA_JDK_COMPATIBILITY_OPTS="${KAFKA_JDK_COMPATIBILITY_OPTS} --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true"
fi
diff --git a/build.gradle b/build.gradle
index cf8e221069..d36637786a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -925,7 +925,7 @@ project(':core') {
implementation project(':metadata')
implementation project(':raft')
implementation project(':storage')
-
+ implementation project(':s3shell-kafka-sdk')
implementation libs.argparse4j
implementation libs.jacksonDatabind
@@ -1171,6 +1171,8 @@ project(':core') {
from(project(':tools').configurations.runtimeClasspath) { into("libs/") }
from(project(':trogdor').jar) { into("libs/") }
from(project(':trogdor').configurations.runtimeClasspath) { into("libs/") }
+ from(project(':s3shell-kafka-sdk').jar) { into("libs/") }
+ from(project(':s3shell-kafka-sdk').configurations.runtimeClasspath) { into("libs/") }
from(project(':shell').jar) { into("libs/") }
from(project(':shell').configurations.runtimeClasspath) { into("libs/") }
from(project(':connect:api').jar) { into("libs/") }
@@ -1842,27 +1844,59 @@ project(':storage') {
}
}
+project(':s3shell-kafka-sdk') {
+ archivesBaseName = "s3shell-kafka-sdk"
+
+ checkstyle {
+ configProperties = checkstyleConfigProperties("import-control-automq.xml")
+ }
+}
+
project(':tools') {
archivesBaseName = "kafka-tools"
dependencies {
- implementation project(':clients')
- implementation project(':server-common')
- implementation project(':log4j-appender')
+ implementation (project(':clients')){
+ exclude group: 'org.slf4j', module: '*'
+ }
+ implementation (project(':server-common')){
+ exclude group: 'org.slf4j', module: '*'
+ }
+ implementation (project(':log4j-appender')){
+ exclude group: 'org.slf4j', module: '*'
+ }
+ implementation project(':s3shell-kafka-sdk')
+
implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.slf4jApi
implementation libs.log4j
-
- implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
+ // for SASL/OAUTHBEARER JWT validation
+ implementation (libs.jose4j){
+ exclude group: 'org.slf4j', module: '*'
+ }
implementation libs.jacksonJaxrsJsonProvider
+ implementation(libs.s3stream) {
+ exclude group: 'org.slf4j', module: '*'
+ exclude group: 'net.sourceforge.argparse4j', module: '*'
+ }
+ implementation libs.commonio
- testImplementation project(':clients')
+
+ testImplementation (project(':clients')){
+ exclude group: 'org.slf4j', module: '*'
+ }
testImplementation project(':clients').sourceSets.test.output
- testImplementation project(':core')
+ testImplementation (project(':core')){
+ exclude group: 'org.slf4j', module: '*'
+ }
testImplementation project(':core').sourceSets.test.output
- testImplementation project(':server-common')
+ testImplementation (project(':server-common')){
+ exclude group: 'org.slf4j', module: '*'
+ }
+
+
testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
diff --git a/checkstyle/import-control-automq.xml b/checkstyle/import-control-automq.xml
new file mode 100644
index 0000000000..0dd7803c8e
--- /dev/null
+++ b/checkstyle/import-control-automq.xml
@@ -0,0 +1,55 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 16dd106287..9d16713b6c 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -146,4 +146,12 @@
+
+
+
+
+
+
+
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7ae0ce07e0..e2b3d61a60 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -40,6 +40,10 @@
+
+
+
+
diff --git a/core/src/main/java/kafka/s3shell/util/EnvUtil.java b/core/src/main/java/kafka/s3shell/util/EnvUtil.java
new file mode 100644
index 0000000000..42878bd7ff
--- /dev/null
+++ b/core/src/main/java/kafka/s3shell/util/EnvUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.s3shell.util;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+public class EnvUtil {
+ public static void setEnv(String key, String value) {
+ try {
+ Map env = System.getenv();
+ Field field = env.getClass().getDeclaredField("m");
+ field.setAccessible(true);
+ Map writableEnv = (Map) field.get(env);
+ writableEnv.put(key, value);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to set environment variable", e);
+ }
+ }
+}
diff --git a/core/src/main/java/kafka/s3shell/util/KafkaFormatUtil.java b/core/src/main/java/kafka/s3shell/util/KafkaFormatUtil.java
new file mode 100644
index 0000000000..bd26677a50
--- /dev/null
+++ b/core/src/main/java/kafka/s3shell/util/KafkaFormatUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.s3shell.util;
+
+import com.automq.s3shell.sdk.util.S3PropUtil;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+import kafka.tools.StorageTool;
+
+public class KafkaFormatUtil {
+ public static void formatStorage(String clusterId, Properties props) throws IOException {
+ String propFileName = String.format("automq-%s.properties", clusterId);
+ String propFilePath = "generated/" + propFileName;
+ String logDirPath = props.getProperty("log.dirs");
+
+ Path propPath = Paths.get(propFilePath);
+ if (Files.exists(propPath)) {
+ //delete if exists
+ Files.delete(propPath);
+ }
+
+ S3PropUtil.persist(props, propFileName);
+ if (!Files.isDirectory(Paths.get(logDirPath)) || !Files.exists(Paths.get(logDirPath, "meta.properties"))) {
+ StorageTool.main(new String[] {"auto-format", "-t", clusterId, "-c=" + propFilePath});
+ }
+ }
+}
diff --git a/core/src/main/java/kafka/s3shell/util/S3ShellPropUtil.java b/core/src/main/java/kafka/s3shell/util/S3ShellPropUtil.java
new file mode 100644
index 0000000000..f27ba664f0
--- /dev/null
+++ b/core/src/main/java/kafka/s3shell/util/S3ShellPropUtil.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.s3shell.util;
+
+import com.automq.s3shell.sdk.constant.ServerConfigKey;
+import com.automq.s3shell.sdk.model.S3Url;
+import com.automq.s3shell.sdk.util.S3PropUtil;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import org.apache.kafka.common.internals.FatalExitError;
+
+import static kafka.log.stream.s3.ConfigUtils.ACCESS_KEY_NAME;
+import static kafka.log.stream.s3.ConfigUtils.SECRET_KEY_NAME;
+
+public class S3ShellPropUtil {
+
+ private static OptionParser acceptOption() {
+ OptionParser optionParser = new OptionParser();
+ optionParser.accepts("override", "Optional property that should override values set in server.properties file")
+ .withRequiredArg()
+ .ofType(String.class);
+ optionParser.accepts("config", "Path to server.properties file")
+ .withRequiredArg()
+ .ofType(String.class);
+ optionParser.accepts("s3-url", "URL for S3 storage")
+ .withRequiredArg()
+ .ofType(String.class);
+ optionParser.accepts("version", "Print version information and exit.");
+ return optionParser;
+ }
+
+ public static Properties autoGenPropsByCmd(String[] args, String processRole) throws IOException {
+ if (args.length < 1) {
+ throw new FatalExitError(1);
+ }
+
+ Properties props = new Properties();
+ switch (processRole) {
+ case "broker":
+ props.putAll(S3PropUtil.loadTemplateProps(S3PropUtil.BROKER_PROPS_PATH));
+ break;
+ case "controller":
+ props.putAll(S3PropUtil.loadTemplateProps(S3PropUtil.CONTROLLER_PROPS_PATH));
+ break;
+ case "broker,controller":
+ case "controller,broker":
+ props.putAll(S3PropUtil.loadTemplateProps(S3PropUtil.SERVER_PROPS_PATH));
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid process role:" + processRole);
+ }
+
+ // Handle --override options
+ OptionParser optionParser = acceptOption();
+ OptionSet options = optionParser.parse(args);
+ handleOption(options, props);
+
+ return props;
+ }
+
+ private static void handleOption(OptionSet options, Properties props) {
+ S3Url s3Url = null;
+ if (options.has("s3-url")) {
+ String s3UrlStr = (String) options.valueOf("s3-url");
+ s3Url = S3Url.parse(s3UrlStr);
+ props.put(ServerConfigKey.S3_ENDPOINT.getKeyName(), s3Url.getEndpointProtocol().getName() + "://" + s3Url.getS3Endpoint());
+ props.put(ServerConfigKey.S3_REGION.getKeyName(), s3Url.getS3Region());
+ props.put(ServerConfigKey.S3_BUCKET.getKeyName(), s3Url.getS3DataBucket());
+ props.put(ServerConfigKey.S3_PATH_STYLE.getKeyName(), String.valueOf(s3Url.isS3PathStyle()));
+
+ // override system env
+ EnvUtil.setEnv(ACCESS_KEY_NAME, s3Url.getS3AccessKey());
+ EnvUtil.setEnv(SECRET_KEY_NAME, s3Url.getS3SecretKey());
+ }
+
+ if (options.has("override")) {
+ List> overrideOptions = options.valuesOf("override");
+ for (Object o : overrideOptions) {
+ String option = (String) o;
+ String[] keyValue = option.split("=", 2);
+ if (keyValue.length == 2) {
+ props.setProperty(keyValue[0], keyValue[1]);
+ } else {
+ throw new IllegalArgumentException("Invalid override option format: " + option);
+ }
+ }
+ }
+
+ //format storage
+ if (s3Url != null) {
+ try {
+ KafkaFormatUtil.formatStorage(s3Url.getClusterId(), props);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("Format storage failed for cluster:%s", s3Url.getClusterId()), e);
+ }
+ }
+ }
+}
+
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index dad462dcad..b36e20f474 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -20,6 +20,7 @@ package kafka
import java.util.Properties
import joptsimple.OptionParser
+import kafka.s3shell.util.S3ShellPropUtil
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.utils.Implicits._
import kafka.utils.{CommandLineUtils, Exit, Logging}
@@ -30,6 +31,39 @@ import scala.jdk.CollectionConverters._
object Kafka extends Logging {
def getPropsFromArgs(args: Array[String]): Properties = {
+ // AutoMQ for Kafka inject start
+ if (args.exists(_.contains("s3-url"))) {
+ val roleInfo = args.find(_.startsWith("process.roles="))
+ if (roleInfo.isEmpty) {
+ throw new IllegalArgumentException("'--override process.roles=broker|controller' is required")
+ }
+ if (!args.exists(_.startsWith("node.id"))) {
+ throw new IllegalArgumentException(s"'--override node.id= ' is required")
+ }
+ if (!args.exists(_.startsWith("controller.quorum.voters"))) {
+ throw new IllegalArgumentException(s"'--override controller.quorum.voters=''' is required")
+ }
+ if (!args.exists(_.startsWith("listeners"))) {
+ throw new IllegalArgumentException(s"'--override listeners=''' is required")
+ }
+
+ roleInfo match {
+ case Some("process.roles=broker") =>
+ if (!args.exists(_.startsWith("advertised.listeners"))) {
+ throw new IllegalArgumentException(s"'--override advertised.listeners=''' is required")
+ }
+ return S3ShellPropUtil.autoGenPropsByCmd(args, "broker")
+ case Some("process.roles=controller") =>
+ return S3ShellPropUtil.autoGenPropsByCmd(args, "controller")
+ case _ =>
+ if (!args.exists(_.startsWith("advertised.listeners"))) {
+ throw new IllegalArgumentException(s"'--override advertised.listeners=''' is required")
+ }
+ return S3ShellPropUtil.autoGenPropsByCmd(args, "broker,controller")
+ }
+ }
+ // AutoMQ for Kafka inject end
+
val optionParser = new OptionParser(false)
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 0f798e24fc..4237b3e541 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -17,18 +17,18 @@
package kafka.tools
-import java.io.PrintStream
-import java.nio.file.{Files, Paths}
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
-import net.sourceforge.argparse4j.inf.Namespace
+import net.sourceforge.argparse4j.inf.{Namespace, Subparser}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.server.common.MetadataVersion
+import java.io.PrintStream
+import java.nio.file.{Files, Paths}
import java.util.Optional
import scala.collection.mutable
@@ -45,7 +45,7 @@ object StorageTool extends Logging {
val selfManagedMode = configToSelfManagedMode(config.get)
Exit.exit(infoCommand(System.out, selfManagedMode, directories))
- case "format" =>
+ case "format" | "auto-format" =>
val directories = configToLogDirectories(config.get)
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace, Option(config.get.interBrokerProtocolVersionString))
@@ -56,10 +56,12 @@ object StorageTool extends Logging {
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
if (!configToSelfManagedMode(config.get)) {
throw new TerseFailure("The kafka configuration file appears to be for " +
- "a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
+ "a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
+ }
+ val commandResult = formatCommand(System.out, directories, metaProperties, metadataVersion, ignoreFormatted)
+ if (command == "format") {
+ Exit.exit(commandResult)
}
- Exit.exit(formatCommand(System.out, directories, metaProperties, metadataVersion, ignoreFormatted))
-
case "random-uuid" =>
System.out.println(Uuid.randomUuid)
Exit.exit(0)
@@ -85,26 +87,32 @@ object StorageTool extends Logging {
help("Get information about the Kafka log directories on this node.")
val formatParser = subparsers.addParser("format").
help("Format the Kafka log directories on this node.")
+ val autoFormatParser = subparsers.addParser("auto-format").
+ help("Auto format the Kafka log directories on this node. ")
subparsers.addParser("random-uuid").help("Print a random UUID.")
- List(infoParser, formatParser).foreach(parser => {
+ List(infoParser, formatParser,autoFormatParser).foreach(parser => {
parser.addArgument("--config", "-c").
action(store()).
required(true).
help("The Kafka configuration file to use.")
})
- formatParser.addArgument("--cluster-id", "-t").
- action(store()).
- required(true).
- help("The cluster ID to use.")
- formatParser.addArgument("--ignore-formatted", "-g").
- action(storeTrue())
- formatParser.addArgument("--release-version", "-r").
- action(store()).
- help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}")
-
+ configureFormatParser(formatParser)
+ configureFormatParser(autoFormatParser)
parser.parseArgsOrFail(args)
}
+ private def configureFormatParser(parser: Subparser): Unit = {
+ parser.addArgument("--cluster-id", "-t").
+ action(store()).
+ required(true).
+ help("The cluster ID to use.")
+ parser.addArgument("--ignore-formatted", "-g").
+ action(storeTrue())
+ parser.addArgument("--release-version", "-r").
+ action(store()).
+ help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}")
+ }
+
def configToLogDirectories(config: KafkaConfig): Seq[String] = {
val directories = new mutable.TreeSet[String]
directories ++= config.logDirs
diff --git a/core/src/test/java/kafka/s3shell/util/S3ShellPropUtilTest.java b/core/src/test/java/kafka/s3shell/util/S3ShellPropUtilTest.java
new file mode 100644
index 0000000000..8be6b5e59c
--- /dev/null
+++ b/core/src/test/java/kafka/s3shell/util/S3ShellPropUtilTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.s3shell.util;
+
+import java.io.IOException;
+import java.util.Properties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class S3ShellPropUtilTest {
+ @Test
+ public void testAutoGenPropsWithBrokerRole() {
+ String[] arg = new String[] {
+ "--s3-url",
+ "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew",
+ "--override",
+ "process.roles=controller",
+ "--override",
+ "controller.quorum.voters=1@localhost:9093",
+ "--override",
+ "listeners=CONTROLLER://localhost:9093",
+ "--override",
+ "node.id=1",
+ "--override",
+ "autobalancer.reporter.network.out.capacity=12345"
+ };
+ try {
+ Properties prop = S3ShellPropUtil.autoGenPropsByCmd(arg, "controller");
+ Assertions.assertEquals("controller", prop.getProperty("process.roles"));
+ Assertions.assertEquals("1", prop.getProperty("node.id"));
+ Assertions.assertEquals("12345", prop.getProperty("autobalancer.reporter.network.out.capacity"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index fc4dd51a09..777d470fdb 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -16,18 +16,18 @@
*/
package kafka
-import java.nio.file.Files
-import java.util
-import java.util.Properties
import kafka.server.KafkaConfig
-import kafka.utils.{Exit, TestUtils}
import kafka.utils.TestUtils.assertBadConfigContainingMessage
+import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import java.nio.file.Files
+import java.util
+import java.util.Properties
import scala.jdk.CollectionConverters._
class KafkaTest {
@@ -81,7 +81,7 @@ class KafkaTest {
@Test
def testBrokerRoleNodeIdValidation(): Unit = {
- // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters
+ // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters
val propertiesFile = new Properties
propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
@@ -97,7 +97,7 @@ class KafkaTest {
@Test
def testControllerRoleNodeIdValidation(): Unit = {
- // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters
+ // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters
val propertiesFile = new Properties
propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
@@ -113,7 +113,7 @@ class KafkaTest {
@Test
def testColocatedRoleNodeIdValidation(): Unit = {
- // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters
+ // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters
val propertiesFile = new Properties
propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
@@ -169,7 +169,7 @@ class KafkaTest {
}
if (!(hasControllerRole & !hasBrokerRole)) { // not controller-only
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
- props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
}
}
@@ -341,6 +341,32 @@ class KafkaTest {
assertEquals(expected, config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS).asInstanceOf[Long])
}
+ @Test
+ def testGetPropsFromArgsMissingS3Url(): Unit = {
+ val propertiesFile = prepareDefaultConfig()
+ val s3UrlMissingArgsOkController = Array[String](propertiesFile, "--s3-url", "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew","--override", "process.roles=controller", "--override", "controller.quorum.voters=1@localhost:9093", "--override", "listeners=CONTROLLER://localhost:9093", "--override", "node.id=1")
+ assertDoesNotThrow(() => KafkaConfig.fromProps(Kafka.getPropsFromArgs(s3UrlMissingArgsOkController)))
+
+ val s3UrlMissingArgsOkBroker = Array[String](propertiesFile, "--s3-url", "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew", "--override", "process.roles=controller,broker", "--override", "controller.quorum.voters=1@localhost:9093", "--override", "listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093", "--override", "advertised.listeners=PLAINTEXT://localhost:9092","--override", "node.id=1")
+ assertDoesNotThrow(() => KafkaConfig.fromProps(Kafka.getPropsFromArgs(s3UrlMissingArgsOkBroker)))
+
+ val s3UrlMissingArgs1 = Array[String](propertiesFile, "--s3-url", "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew", "--override", "controller.quorum.voters=1@localhost:9093", "--override", "listeners=PLAINTEXT://localhost:9092","--override", "advertised.listeners=PLAINTEXT://localhost:9092","--override", "node.id=1")
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(s3UrlMissingArgs1)))
+ val s3UrlMissingArgs2 = Array[String](propertiesFile, "--s3-url", "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew","--override", "process.roles=broker", "--override", "listeners=PLAINTEXT://localhost:9092","--override", "advertised.listeners=PLAINTEXT://localhost:9092","--override", "node.id=1")
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(s3UrlMissingArgs2)))
+
+ val s3UrlMissingArgs3 = Array[String](propertiesFile, "--s3-url", "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew","--override", "process.roles=broker", "--override", "controller.quorum.voters=1@localhost:9093", "--override", "advertised.listeners=PLAINTEXT://localhost:9092","--override", "node.id=1")
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(s3UrlMissingArgs3)))
+
+ val s3UrlMissingArgs4 = Array[String](propertiesFile, "--s3-url", "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew","--override", "process.roles=controller,broker", "--override", "controller.quorum.voters=1@localhost:9093", "--override", "PLAINTEXT://localhost,CONTROLLER://localhost:9093","--override", "--override", "node.id=1")
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(s3UrlMissingArgs4)))
+
+ val s3UrlMissingArgs5 = Array[String](propertiesFile, "--s3-url", "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xx&s3-secret-key=xx&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-path-style=false&cluster-id=1dxiawPlSI2OlUO6U22-Ew", "--override", "process.roles=controller", "--override", "controller.quorum.voters=1@localhost:9093", "--override", "listeners=CONTROLLER://localhost:9093", "--override", "advertised.listeners=PLAINTEXT://localhost:9092", "--override")
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(s3UrlMissingArgs5)))
+ }
+
+
+
private def testZkConfig[T, U](kafkaPropName: String,
expectedKafkaPropName: String,
sysPropName: String,
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 932f231ff2..b920908e80 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -128,6 +128,7 @@ versions += [
zookeeper: "3.6.3",
zstd: "1.5.2-1",
commonLang: "3.12.0",
+ commonio: "2.15.1",
s3stream: "0.16.0-SNAPSHOT",
opentelemetry: "1.32.0",
opentelemetryAlpha: "1.32.0-alpha",
@@ -217,7 +218,7 @@ libs += [
slf4jlog4j: "org.slf4j:slf4j-log4j12:$versions.slf4j",
slf4jBridge: "org.slf4j:jul-to-slf4j:$versions.slf4j",
snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
- swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$versions.swaggerAnnotations",
+ swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$versions.swaggerAnnotations",
swaggerJaxrs2: "io.swagger.core.v3:swagger-jaxrs2:$versions.swaggerJaxrs2",
zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper",
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
@@ -225,6 +226,7 @@ libs += [
zstd: "com.github.luben:zstd-jni:$versions.zstd",
httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient",
commonLang: "org.apache.commons:commons-lang3:$versions.commonLang",
+ commonio : "commons-io:commons-io:$versions.commonio",
nettyHttp2: "io.netty:netty-codec-http2:$versions.netty",
s3stream: "com.automq.elasticstream:s3stream:$versions.s3stream",
opentelemetryJava8: "io.opentelemetry.instrumentation:opentelemetry-runtime-telemetry-java8:$versions.opentelemetryAlpha",
diff --git a/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/constant/ServerConfigKey.java b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/constant/ServerConfigKey.java
new file mode 100644
index 0000000000..26f561590e
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/constant/ServerConfigKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.automq.s3shell.sdk.constant;
+
+public enum ServerConfigKey {
+
+ NODE_ID("node.id"),
+ CONTROLLER_QUORUM_VOTERS("controller.quorum.voters"),
+
+ LISTENERS("listeners"),
+
+ ADVERTISED_LISTENERS("advertised.listeners"),
+
+ S3_ENDPOINT("s3.endpoint"),
+
+ S3_REGION("s3.region"),
+
+ S3_BUCKET("s3.bucket"),
+
+ S3_PATH_STYLE("s3.path.style");
+
+ ServerConfigKey(String keyName) {
+ this.keyName = keyName;
+ }
+
+ private final String keyName;
+
+ public String getKeyName() {
+ return keyName;
+ }
+}
diff --git a/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/AuthMethod.java b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/AuthMethod.java
new file mode 100644
index 0000000000..fb19682b2d
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/AuthMethod.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.automq.s3shell.sdk.model;
+
+public enum AuthMethod {
+ KEY_FROM_ENV("key-from-env"),
+
+ KEY_FROM_ARGS("key-from-args"),
+
+ // assume role
+ USER_ROLE("user-role"),
+
+ // instance profile
+ INSTANCE_ROLE("instance-role");
+
+ AuthMethod(String keyName) {
+ this.keyName = keyName;
+ }
+
+ private final String keyName;
+
+ public String getKeyName() {
+ return keyName;
+ }
+
+ public static AuthMethod getByName(String methodName) {
+ for (AuthMethod method : AuthMethod.values()) {
+ if (method.getKeyName().equals(methodName)) {
+ return method;
+ }
+ }
+ throw new IllegalArgumentException("Invalid auth method: " + methodName);
+ }
+}
\ No newline at end of file
diff --git a/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/EndpointProtocol.java b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/EndpointProtocol.java
new file mode 100644
index 0000000000..cd0ce44be8
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/EndpointProtocol.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.automq.s3shell.sdk.model;
+
+public enum EndpointProtocol {
+
+ HTTP("http"),
+ HTTPS("https");
+
+ EndpointProtocol(String key) {
+ this.name = key;
+ }
+
+ private final String name;
+
+ public String getName() {
+ return name;
+ }
+
+ public static EndpointProtocol getByName(String protocolName) {
+ for (EndpointProtocol protocol : EndpointProtocol.values()) {
+ if (protocol.getName().equals(protocolName)) {
+ return protocol;
+ }
+ }
+ throw new IllegalArgumentException("Invalid protocol: " + protocolName);
+ }
+}
diff --git a/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/S3Url.java b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/S3Url.java
new file mode 100644
index 0000000000..418efee893
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/model/S3Url.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.automq.s3shell.sdk.model;
+
+public class S3Url {
+
+ final String s3AccessKey;
+ final String s3SecretKey;
+
+ final AuthMethod s3AuthMethod;
+
+ final String s3Region;
+
+ final EndpointProtocol endpointProtocol;
+
+ final String s3Endpoint;
+
+ final String s3DataBucket;
+
+ final String s3OpsBucket;
+
+ final String clusterId;
+
+ final boolean s3PathStyle;
+
+ public S3Url(String s3AccessKey, String s3SecretKey, AuthMethod s3AuthMethod, String s3Region,
+ EndpointProtocol endpointProtocol, String s3Endpoint, String s3DataBucket, String s3OpsBucket, String clusterId,
+ boolean s3PathStyle) {
+ this.s3AccessKey = s3AccessKey;
+ this.s3SecretKey = s3SecretKey;
+ this.s3AuthMethod = s3AuthMethod;
+ this.s3Region = s3Region;
+ this.endpointProtocol = endpointProtocol;
+ this.s3Endpoint = s3Endpoint;
+ this.s3DataBucket = s3DataBucket;
+ this.s3OpsBucket = s3OpsBucket;
+ this.clusterId = clusterId;
+ this.s3PathStyle = s3PathStyle;
+ }
+
+ public static S3Url parse(String s3Url) throws IllegalArgumentException {
+ // skip the first prefix "s3://"
+ String s3Endpoint = s3Url.substring(5, s3Url.indexOf('?'));
+
+ String paramsPart = s3Url.substring(s3Url.indexOf('?') + 1);
+ String[] params = paramsPart.split("&");
+
+ String accessKey = null;
+ String secretKey = null;
+ AuthMethod authMethod = null;
+ String region = null;
+ EndpointProtocol protocol = null;
+ String dataBucket = null;
+ String opsBucket = null;
+ String clusterId = null;
+ boolean s3PathStyle = false;
+
+ for (String param : params) {
+ String[] keyValue = param.split("=");
+ if (keyValue.length != 2) {
+ throw new IllegalArgumentException("Invalid parameter format: " + param);
+ }
+
+ String key = keyValue[0];
+ String value = keyValue[1];
+
+ switch (key) {
+ case "s3-access-key":
+ accessKey = value;
+ break;
+ case "s3-secret-key":
+ secretKey = value;
+ break;
+ case "s3-auth-method":
+ authMethod = AuthMethod.getByName(value);
+ break;
+ case "s3-region":
+ region = value;
+ break;
+ case "s3-endpoint-protocol":
+ protocol = EndpointProtocol.getByName(value);
+ break;
+ case "s3-data-bucket":
+ dataBucket = value;
+ break;
+ case "s3-ops-bucket":
+ opsBucket = value;
+ break;
+ case "cluster-id":
+ clusterId = value;
+ break;
+ case "s3-path-style":
+ s3PathStyle = Boolean.valueOf(value);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown parameter: " + key);
+ }
+ }
+
+ return new S3Url(accessKey, secretKey, authMethod, region, protocol, s3Endpoint, dataBucket, opsBucket, clusterId, s3PathStyle);
+ }
+
+ public String getS3AccessKey() {
+ return s3AccessKey;
+ }
+
+ public String getS3SecretKey() {
+ return s3SecretKey;
+ }
+
+ public AuthMethod getS3AuthMethod() {
+ return s3AuthMethod;
+ }
+
+ public String getS3Region() {
+ return s3Region;
+ }
+
+ public EndpointProtocol getEndpointProtocol() {
+ return endpointProtocol;
+ }
+
+ public String getS3Endpoint() {
+ return s3Endpoint;
+ }
+
+ public String getS3DataBucket() {
+ return s3DataBucket;
+ }
+
+ public String getS3OpsBucket() {
+ return s3OpsBucket;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public boolean isS3PathStyle() {
+ return s3PathStyle;
+ }
+}
diff --git a/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/util/S3PropUtil.java b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/util/S3PropUtil.java
new file mode 100644
index 0000000000..6d0cf58f4b
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/java/com/automq/s3shell/sdk/util/S3PropUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.automq.s3shell.sdk.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.Enumeration;
+import java.util.Properties;
+
+public class S3PropUtil {
+ public static final String BROKER_PROPS_PATH = "template/broker.properties";
+ public static final String CONTROLLER_PROPS_PATH = "template/controller.properties";
+ public static final String SERVER_PROPS_PATH = "template/server.properties";
+
+ public static void persist(Properties props, String fileName) throws IOException {
+ File directory = new File("generated");
+ if (!directory.exists() && !directory.mkdirs()) {
+ throw new IOException("Can't create directory " + directory.getAbsolutePath());
+ }
+
+ String targetPath = "generated/" + fileName;
+ File file = new File(targetPath);
+ try (PrintWriter pw = new PrintWriter(file, Charset.forName("utf-8"))) {
+ for (Enumeration e = props.propertyNames(); e.hasMoreElements(); ) {
+ String key = (String) e.nextElement();
+ pw.println(key + "=" + props.getProperty(key));
+ }
+ }
+ }
+
+ public static Properties loadTemplateProps(String propsPath) throws IOException {
+ try (var in = S3PropUtil.class.getClassLoader().getResourceAsStream(propsPath)) {
+ if (in != null) {
+ Properties props = new Properties();
+ props.load(in);
+ return props;
+ } else {
+ throw new IOException(String.format("Can not find resource file under path: %s", propsPath));
+ }
+ }
+ }
+}
diff --git a/s3shell-kafka-sdk/src/main/resources/template/broker.properties b/s3shell-kafka-sdk/src/main/resources/template/broker.properties
new file mode 100644
index 0000000000..bb9cfcec15
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/resources/template/broker.properties
@@ -0,0 +1,190 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This configuration file is intended for use in KRaft mode, where
+# Apache ZooKeeper is not present. See config/kraft/README.md for details.
+#
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in KRaft mode
+process.roles=broker
+
+# The node id associated with this instance's roles
+node.id=2
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. If not configured, the host name will be equal to the value of
+# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://localhost:9092
+
+# Name of listener used for communication between brokers.
+inter.broker.listener.name=PLAINTEXT
+
+# Listener name, hostname and port the broker will advertise to clients.
+# If not set, it uses the value for "listeners".
+advertised.listeners=PLAINTEXT://localhost:9092
+
+# A comma-separated list of the names of the listeners used by the controller.
+# This is required if running in KRaft mode. On a node with `process.roles=broker`, only the first listed listener will be used by the broker.
+controller.listener.names=CONTROLLER
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/kraft-broker-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Settings for AutoMQ for Kafka #############################
+# Whether to enable store data in elastic stream layer
+elasticstream.enable=true
+
+# The endpoint for S3 service
+# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
+# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
+# Note that bucket name should not be included in the endpoint.
+s3.endpoint=https://s3.amazonaws.com
+
+# The region of S3 service
+# For Aliyun, you have to set the region to aws-global. See https://www.alibabacloud.com/help/zh/oss/developer-reference/use-amazon-s3-sdks-to-access-oss.
+s3.region=us-east-1
+
+# The bucket of S3 service to store data
+s3.bucket=ko3
+
+# Use path style access for S3, default false
+# If you are using minio for storage, you have to set this to true.
+#s3.path.style=true
+
+# The file path of delta WAL in block device
+s3.wal.path=/tmp/kraft-broker-logs/s3wal
+
+# The maximum size of delta WAL in block device, default 2GB
+# s3.wal.capacity=2147483648
+
+# The maximum size of WAL cache can use, default 2GB
+# s3.wal.cache.size=2147483648
+
+# The batched size of delta WAL before being uploaded to S3, default 500MB
+# s3.wal.upload.threshold=524288000
+
+# The maximum size of block cache the broker can use to cache data read from S3, default 1GB
+# s3.block.cache.size=1073741824
+
+# The baseline network bandwidth of the broker in bytes/s, default 100MB/s. This is used to throttle the network usage during compaction
+# and catch up read
+# s3.network.baseline.bandwidth=104857600
+
+############################# Settings for telemetry #############################
+# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
+# s3.telemetry.metrics.exporter.type=otlp
+
+# The Prometheus HTTP server host and port, if exporter type is set to prometheus
+# s3.metrics.exporter.prom.host=127.0.0.1
+# s3.metrics.exporter.prom.port=9090
+
+# The OTel Collector endpoint, if exporter type is set to otlp or tracing is enabled
+# s3.telemetry.exporter.otlp.endpoint=http://${your_host_name}:4317
+
+############################# Settings for Auto Balancer #############################
+# The metric reporter to collect and report metrics for Auto Balancer
+metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
+
+# The network inbound bandwidth in bytes/s, default 100MB/s. Used in NetworkInCapacityGoal and calculation of inbound bandwidth utilization
+# autobalancer.reporter.network.in.capacity=104857600
+
+# The network outbound bandwidth in bytes/s, default 100MB/s. Used in NetworkOutCapacityGoal and calculation of outbound bandwidth utilization
+# autobalancer.reporter.network.out.capacity=104857600
diff --git a/s3shell-kafka-sdk/src/main/resources/template/controller.properties b/s3shell-kafka-sdk/src/main/resources/template/controller.properties
new file mode 100644
index 0000000000..b4df521e60
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/resources/template/controller.properties
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This configuration file is intended for use in KRaft mode, where
+# Apache ZooKeeper is not present. See config/kraft/README.md for details.
+#
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in KRaft mode
+process.roles=controller
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on.
+# Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+listeners=CONTROLLER://:9093
+
+# A comma-separated list of the names of the listeners used by the controller.
+# This is required if running in KRaft mode.
+controller.listener.names=CONTROLLER
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/kraft-controller-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Settings for AutoMQ for Kafka #############################
+# Whether to enable store data in elastic stream layer
+elasticstream.enable=true
+
+# The topic creation policy for AutoMQ for Kafka
+create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
+
+# The endpoint for S3 service
+# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
+# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
+# Note that bucket name should not be included in the endpoint.
+s3.endpoint=https://s3.amazonaws.com
+
+# The region of S3 service
+# For Aliyun, you have to set the region to aws-global. See https://www.alibabacloud.com/help/zh/oss/developer-reference/use-amazon-s3-sdks-to-access-oss.
+s3.region=us-east-1
+
+# The bucket of S3 service to store data
+s3.bucket=ko3
+
+# Use path style access for S3, default false
+# If you are using minio for storage, you have to set this to true.
+#s3.path.style=true
+
+############################# Settings of Controller for Auto Balancer #############################
+# Whether to enabled Auto Balancer in controller, default false
+# autobalancer.controller.enable=false
+
+# The topics to be excluded from balancing
+#autobalancer.controller.exclude.topics=topic-a,topic-b,topic-c
+
+# The broker ids to be excluded from balancing
+#autobalancer.controller.exclude.broker.ids=0,1,2
+
diff --git a/s3shell-kafka-sdk/src/main/resources/template/server.properties b/s3shell-kafka-sdk/src/main/resources/template/server.properties
new file mode 100644
index 0000000000..efe1ad0a82
--- /dev/null
+++ b/s3shell-kafka-sdk/src/main/resources/template/server.properties
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This configuration file is intended for use in KRaft mode, where
+# Apache ZooKeeper is not present. See config/kraft/README.md for details.
+#
+
+############################# Server Basics #############################
+
+# The role of this server. Setting this puts us in KRaft mode
+process.roles=broker,controller
+
+# The node id associated with this instance's roles
+node.id=1
+
+# The connect string for the controller quorum
+controller.quorum.voters=1@localhost:9093
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on.
+# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
+# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
+# with PLAINTEXT listener name, and port 9092.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+listeners=PLAINTEXT://:9092,CONTROLLER://:9093
+
+# Name of listener used for communication between brokers.
+inter.broker.listener.name=PLAINTEXT
+
+# Listener name, hostname and port the broker will advertise to clients.
+# If not set, it uses the value for "listeners".
+advertised.listeners=PLAINTEXT://localhost:9092
+
+# A comma-separated list of the names of the listeners used by the controller.
+# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
+# This is required if running in KRaft mode.
+controller.listener.names=CONTROLLER
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/kraft-combined-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Settings for AutoMQ for Kafka #############################
+# Whether to enable store data in elastic stream layer
+elasticstream.enable=true
+
+# The topic creation policy for AutoMQ for Kafka
+create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
+
+# The endpoint for S3 service
+# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
+# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
+# Note that bucket name should not be included in the endpoint.
+s3.endpoint=https://s3.amazonaws.com
+
+# The region of S3 service
+# For Aliyun, you have to set the region to aws-global. See https://www.alibabacloud.com/help/zh/oss/developer-reference/use-amazon-s3-sdks-to-access-oss.
+s3.region=us-east-1
+
+# The bucket of S3 service to store data
+s3.bucket=ko3
+
+# Use path style access for S3, default false
+# If you are using minio for storage, you have to set this to true.
+#s3.path.style=true
+
+# The file path of delta WAL in block device
+s3.wal.path=/tmp/kraft-combined-logs/s3wal
+
+# The maximum size of delta WAL in block device, default 2GB
+# s3.wal.capacity=2147483648
+
+# The maximum size of WAL cache can use, default 2GB
+# s3.wal.cache.size=2147483648
+
+# The batched size of delta WAL before being uploaded to S3, default 500MB
+# s3.wal.upload.threshold=524288000
+
+# The maximum size of block cache the broker can use to cache data read from S3, default 1GB
+# s3.block.cache.size=1073741824
+
+# The baseline network bandwidth of the broker in bytes/s, default 100MB/s. This is used to throttle the network usage during compaction
+# and catch up read
+# s3.network.baseline.bandwidth=104857600
+
+############################# Settings for telemetry #############################
+# The metrics exporter type, supported values are otlp, prometheus, log. Use comma to separate multiple exporters.
+# s3.telemetry.metrics.exporter.type=otlp
+
+# The Prometheus HTTP server host and port, if exporter type is set to prometheus
+# s3.metrics.exporter.prom.host=127.0.0.1
+# s3.metrics.exporter.prom.port=9090
+
+# The OTel Collector endpoint, if exporter type is set to otlp or tracing is enabled
+# s3.telemetry.exporter.otlp.endpoint=http://${your_host_name}:4317
+
+############################# Settings for Auto Balancer #############################
+# The metric reporter to collect and report metrics for Auto Balancer
+metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
+
+# The network inbound bandwidth in bytes/s, default 100MB/s. Used in NetworkInCapacityGoal and calculation of inbound bandwidth utilization
+# autobalancer.reporter.network.in.capacity=104857600
+
+# The network outbound bandwidth in bytes/s, default 100MB/s. Used in NetworkOutCapacityGoal and calculation of outbound bandwidth utilization
+# autobalancer.reporter.network.out.capacity=104857600
+
+############################# Settings of Controller for Auto Balancer #############################
+# Whether to enabled Auto Balancer in controller, default false
+# autobalancer.controller.enable=false
+
+# The topics to be excluded from balancing
+#autobalancer.controller.exclude.topics=topic-a,topic-b,topic-c
+
+# The broker ids to be excluded from balancing
+#autobalancer.controller.exclude.broker.ids=0,1,2
+
diff --git a/settings.gradle b/settings.gradle
index 8d287e3d9b..2b69f7392a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -58,5 +58,6 @@ include 'clients',
'streams:upgrade-system-tests-32',
'streams:upgrade-system-tests-33',
'tools',
- 'trogdor'
+ 'trogdor',
+ 's3shell-kafka-sdk'
diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/AutoMQKafkaAdminTool.java b/tools/src/main/java/org/apache/kafka/tools/automq/AutoMQKafkaAdminTool.java
new file mode 100644
index 0000000000..a710ca4ce1
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/automq/AutoMQKafkaAdminTool.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.common.utils.Exit;
+
+public class AutoMQKafkaAdminTool {
+ public static final String GENERATE_S3_URL_CMD = "generate-s3-url";
+ public static final String GENERATE_START_COMMAND_CMD = "generate-start-command";
+ public static final String GENERATE_CONFIG_PROPERTIES_CMD = "generate-config-properties";
+
+ public static void main(String[] args) {
+ // suppress slf4j inner warning log
+ System.err.close();
+ ArgumentParser parser = ArgumentParsers
+ .newArgumentParser("automq-admin-tool")
+ .defaultHelp(true)
+ .description("This AutoMQ admin tool contains several tools to help user init and manage AutoMQ cluster easily.");
+ if (args.length == 0) {
+ System.out.println("Please pass valid arguments. Check usage first.");
+ parser.printHelp();
+ Exit.exit(0);
+ }
+
+ Subparsers subparsers = parser.addSubparsers().title("commands");
+
+ Subparser generateS3UrlCmdParser = subparsers.addParser(GENERATE_S3_URL_CMD)
+ .help("generate s3url for AutoMQ")
+ .description(String.format("This cmd is used to generate s3url for AutoMQ that is used to connect to s3 or other cloud object storage service. Execute '%s -h' to check its usage.", GENERATE_S3_URL_CMD));
+ GenerateS3UrlCmd.addArguments(generateS3UrlCmdParser);
+
+ Subparser generateStartCommandCmdParser = subparsers.addParser(GENERATE_START_COMMAND_CMD)
+ .help("generate config file and local start command")
+ .description(String.format("This cmd is used to generate config file and local start command. Execute '%s -h' to check its usage.", GENERATE_START_COMMAND_CMD));
+ GenerateStartCmdCmd.addArguments(generateStartCommandCmdParser);
+
+ Subparser generateConfigPropertiesCmdParser = subparsers.addParser(GENERATE_CONFIG_PROPERTIES_CMD)
+ .help("generate multi config properties")
+ .description(String.format("This cmd is used to generate multi config properties depend on your arguments. Execute '%s -h' to check its usage.", GENERATE_CONFIG_PROPERTIES_CMD));
+ GenerateConfigFileCmd.addArguments(generateConfigPropertiesCmdParser);
+
+ switch (args[0]) {
+ case GENERATE_S3_URL_CMD:
+ processGenerateS3UrlCmd(args, generateS3UrlCmdParser);
+ break;
+ case GENERATE_START_COMMAND_CMD:
+ processGenerateStartCmd(args, generateStartCommandCmdParser);
+ break;
+ case GENERATE_CONFIG_PROPERTIES_CMD:
+ processGenConfigPropertiesCmd(args, generateConfigPropertiesCmdParser);
+ break;
+ default:
+ System.out.println(String.format("Not supported command %s. Check usage first.", args[0]));
+ parser.printHelp();
+ Exit.exit(0);
+ }
+
+ Exit.exit(0);
+
+ }
+
+ private static Namespace parseArguments(ArgumentParser parser, String[] args) {
+ try {
+ return parser.parseArgs(args);
+ } catch (ArgumentParserException e) {
+ if (args.length == 1) {
+ parser.printHelp();
+ Exit.exit(0);
+ } else {
+ parser.handleError(e);
+ Exit.exit(1);
+ }
+ return null;
+ }
+ }
+
+ private static void runCommandWithParameter(ArgumentParser parser, Namespace res, Command command) {
+ if (res == null) {
+ parser.handleError(new ArgumentParserException("Namespace is null", parser));
+ Exit.exit(1);
+ } else {
+ try {
+ command.run();
+ } catch (Exception e) {
+ System.out.printf("FAILED: Caught exception %s%n%n", e.getMessage());
+ e.printStackTrace();
+ Exit.exit(1);
+ }
+ }
+ }
+
+ @FunctionalInterface
+ public interface Command {
+ void run() throws Exception;
+ }
+
+ private static void processGenerateS3UrlCmd(String[] args, ArgumentParser parser) {
+ Namespace res = parseArguments(parser, args);
+ runCommandWithParameter(parser, res, () -> new GenerateS3UrlCmd(new GenerateS3UrlCmd.Parameter(res)).run());
+ }
+
+ private static void processGenerateStartCmd(String[] args, ArgumentParser parser) {
+ Namespace res = parseArguments(parser, args);
+ runCommandWithParameter(parser, res, () -> new GenerateStartCmdCmd(new GenerateStartCmdCmd.Parameter(res)).run());
+ }
+
+ private static void processGenConfigPropertiesCmd(String[] args, ArgumentParser parser) {
+ Namespace res = parseArguments(parser, args);
+ runCommandWithParameter(parser, res, () -> new GenerateConfigFileCmd(new GenerateConfigFileCmd.Parameter(res)).run());
+ }
+
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/GenerateConfigFileCmd.java b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateConfigFileCmd.java
new file mode 100644
index 0000000000..3134865cf8
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateConfigFileCmd.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq;
+
+import com.automq.s3shell.sdk.constant.ServerConfigKey;
+import com.automq.s3shell.sdk.model.S3Url;
+import com.automq.s3shell.sdk.util.S3PropUtil;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import org.apache.kafka.tools.automq.model.ServerGroupConfig;
+import org.apache.kafka.tools.automq.util.ConfigParserUtil;
+
+import static com.automq.s3shell.sdk.util.S3PropUtil.BROKER_PROPS_PATH;
+import static com.automq.s3shell.sdk.util.S3PropUtil.CONTROLLER_PROPS_PATH;
+import static com.automq.s3shell.sdk.util.S3PropUtil.SERVER_PROPS_PATH;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static org.apache.kafka.tools.automq.AutoMQKafkaAdminTool.GENERATE_CONFIG_PROPERTIES_CMD;
+import static org.apache.kafka.tools.automq.AutoMQKafkaAdminTool.GENERATE_S3_URL_CMD;
+
+/**
+ * Start kafka server by s3url
+ */
+public class GenerateConfigFileCmd {
+ private final Parameter parameter;
+
+ public GenerateConfigFileCmd(GenerateConfigFileCmd.Parameter parameter) {
+ this.parameter = parameter;
+ }
+
+ static class Parameter {
+ final String s3Url;
+ final String controllerAddress;
+
+ final String brokerAddress;
+
+ final String networkBaselineBandwidthMB;
+
+ final boolean controllerOnlyMode;
+
+ Parameter(Namespace res) {
+ this.s3Url = res.getString("s3-url");
+ this.brokerAddress = res.getString("broker-address");
+ this.controllerAddress = res.getString("controller-address");
+ this.networkBaselineBandwidthMB = res.getString("network-baseline-bandwidth-mb");
+ this.controllerOnlyMode = res.getBoolean("controller-only-mode");
+ }
+ }
+
+ public static ArgumentParser addArguments(Subparser parser) {
+ parser.addArgument(GENERATE_CONFIG_PROPERTIES_CMD)
+ .action(store())
+ .required(true);
+ parser.addArgument("--s3-url")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("s3-url")
+ .metavar("S3-URL")
+ .help(String.format("AutoMQ use s3 url to access your s3 and create AutoMQ cluster. You can generate s3 url with cmd 'bin/automq-kafka-admin.sh %s'", GENERATE_S3_URL_CMD));
+ parser.addArgument("--controller-address")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("controller-address")
+ .metavar("CONTROLLER-ADDRESS")
+ .help("Your controller ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
+ parser.addArgument("--broker-address")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("broker-address")
+ .metavar("BROKER-ADDRESS")
+ .help("Your broker ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
+ parser.addArgument("--controller-only-mode")
+ .action(store())
+ .required(false)
+ .type(Boolean.class)
+ .dest("controller-only-mode")
+ .setDefault(false)
+ .metavar("CONTROLLER-ONLY-MODE")
+ .help("If this is set to true, all controllers is also seen as broker. If you want to run controller only, set this to true. Default is false.");
+ parser.addArgument("--network-baseline-bandwidth-mb")
+ .action(store())
+ .required(false)
+ .type(Integer.class)
+ .dest("network-baseline-bandwidth-mb")
+ .metavar("NETWORK-BASELINE-BANDWIDTH-MB")
+ .help("Network baseline bandwidth of your machine to run broker or controller. Usually you can get it from your cloud provider's official instance document. Example: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/memory-optimized-instances.html");
+
+ return parser;
+ }
+
+ public void run() throws IOException {
+ S3Url s3Url = S3Url.parse(parameter.s3Url);
+
+ List controllerPropFileNameList;
+ ServerGroupConfig controllerGroupConfig;
+ if (parameter.controllerOnlyMode) {
+ controllerGroupConfig = ConfigParserUtil.genControllerConfig(parameter.controllerAddress, parameter.controllerOnlyMode);
+ controllerPropFileNameList = processGroupConfig(controllerGroupConfig, CONTROLLER_PROPS_PATH, "controller", s3Url);
+ } else {
+ controllerGroupConfig = ConfigParserUtil.genControllerConfig(parameter.controllerAddress, parameter.controllerOnlyMode);
+ controllerPropFileNameList = processGroupConfig(controllerGroupConfig, SERVER_PROPS_PATH, "server", s3Url);
+ }
+ List brokerPropsFileNameList;
+ ServerGroupConfig brokerGroupConfig = ConfigParserUtil.genBrokerConfig(parameter.brokerAddress, controllerGroupConfig);
+ brokerPropsFileNameList = processGroupConfig(brokerGroupConfig, BROKER_PROPS_PATH, "broker", s3Url);
+
+ System.out.println("#################################### GENERATED PROPERTIES #################################");
+
+ System.out.println("Generated controller or server properties under current directory:");
+ for (String propFileName : controllerPropFileNameList) {
+ System.out.println(propFileName);
+ }
+ System.out.println();
+
+ System.out.println("Generated broker under current directory:");
+ for (String propFileName : brokerPropsFileNameList) {
+ System.out.println(propFileName);
+ }
+ System.out.println();
+ }
+
+ public List processGroupConfig(ServerGroupConfig groupConfig, String propFilePath,
+ String outputFilePrefix, S3Url s3Url) throws IOException {
+ List propFileNameList = new ArrayList<>();
+ for (int i = 0; i < groupConfig.getNodeIdList().size(); i++) {
+ int nodeId = groupConfig.getNodeIdList().get(i);
+ Properties groupProps = S3PropUtil.loadTemplateProps(propFilePath);
+ groupProps.put(ServerConfigKey.NODE_ID.getKeyName(), String.valueOf(nodeId));
+ groupProps.put(ServerConfigKey.CONTROLLER_QUORUM_VOTERS.getKeyName(), groupConfig.getQuorumVoters());
+ groupProps.put(ServerConfigKey.LISTENERS.getKeyName(), groupConfig.getListenerMap().get(nodeId));
+ // use same value as listeners by default
+ groupProps.put(ServerConfigKey.ADVERTISED_LISTENERS.getKeyName(), groupConfig.getAdvertisedListenerMap().get(nodeId));
+ groupProps.put(ServerConfigKey.S3_ENDPOINT.getKeyName(), s3Url.getEndpointProtocol().getName() + "://" + s3Url.getS3Endpoint());
+ groupProps.put(ServerConfigKey.S3_REGION.getKeyName(), s3Url.getS3Region());
+ groupProps.put(ServerConfigKey.S3_BUCKET.getKeyName(), s3Url.getS3DataBucket());
+ groupProps.put(ServerConfigKey.S3_PATH_STYLE.getKeyName(), s3Url.isS3PathStyle());
+
+ String fileName = String.format("%s-%s.properties", outputFilePrefix, nodeId);
+ flushProps(groupProps, fileName);
+ propFileNameList.add(fileName);
+ }
+ return propFileNameList;
+ }
+
+ protected void flushProps(Properties props, String fileName) throws IOException {
+ S3PropUtil.persist(props, fileName);
+ }
+
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/GenerateS3UrlCmd.java b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateS3UrlCmd.java
new file mode 100644
index 0000000000..fbc219faee
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateS3UrlCmd.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq;
+
+import com.automq.s3shell.sdk.model.AuthMethod;
+import com.automq.s3shell.sdk.model.EndpointProtocol;
+import com.automq.stream.utils.S3Utils;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import org.apache.kafka.common.Uuid;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static org.apache.kafka.tools.automq.AutoMQKafkaAdminTool.GENERATE_START_COMMAND_CMD;
+
+/**
+ * Generate s3url for user
+ */
+public class GenerateS3UrlCmd {
+
+ private final GenerateS3UrlCmd.Parameter parameter;
+
+ public GenerateS3UrlCmd(Parameter parameter) {
+ this.parameter = parameter;
+ }
+
+ static class Parameter {
+ final String s3AccessKey;
+ final String s3SecretKey;
+
+ final AuthMethod s3AuthMethod;
+
+ final String s3Region;
+
+ final EndpointProtocol endpointProtocol;
+
+ final String s3Endpoint;
+
+ final String s3DataBucket;
+
+ final String s3OpsBucket;
+
+ final boolean s3PathStyle;
+
+ Parameter(Namespace res) {
+ this.s3AccessKey = res.getString("s3-access-key");
+ this.s3SecretKey = res.getString("s3-secret-key");
+ String authMethodName = res.getString("s3-auth-method");
+ if (authMethodName == null || authMethodName.trim().isEmpty()) {
+ this.s3AuthMethod = AuthMethod.KEY_FROM_ARGS;
+ } else {
+ this.s3AuthMethod = AuthMethod.getByName(authMethodName);
+ }
+ this.s3Region = res.getString("s3-region");
+ String endpointProtocolStr = res.get("s3-endpoint-protocol");
+ this.endpointProtocol = EndpointProtocol.getByName(endpointProtocolStr);
+ this.s3Endpoint = res.getString("s3-endpoint");
+ this.s3DataBucket = res.getString("s3-data-bucket");
+ String s3OpsBucketFromArg = res.getString("s3-ops-bucket");
+ if (s3OpsBucketFromArg == null) {
+ this.s3OpsBucket = this.s3DataBucket;
+ } else {
+ this.s3OpsBucket = s3OpsBucketFromArg;
+ }
+ this.s3PathStyle = Boolean.valueOf(res.getString("s3-path-style"));
+ }
+ }
+
+ public static ArgumentParser addArguments(Subparser parser) {
+ parser.addArgument("generate-s3-url")
+ .action(store())
+ .required(true);
+ parser.addArgument("--s3-access-key")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("s3-access-key")
+ .metavar("S3-ACCESS-KEY")
+ .help("Your accessKey that used to access S3");
+ parser.addArgument("--s3-secret-key")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("s3-secret-key")
+ .metavar("S3-SECRET-KEY")
+ .help("Your secretKey that used to access S3");
+ parser.addArgument("--s3-auth-method")
+ .action(store())
+ .required(false)
+ .setDefault(AuthMethod.KEY_FROM_ARGS.getKeyName())
+ .type(String.class)
+ .dest("s3-auth-method")
+ .metavar("S3-AUTH-METHOD")
+ .help("The auth method that used to access S3, default is key-from-env, other options are key-from-args and role");
+ parser.addArgument("--s3-region")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("s3-region")
+ .metavar("S3-REGION")
+ .help("The region of S3");
+ parser.addArgument("--s3-endpoint-protocol")
+ .action(store())
+ .required(false)
+ .setDefault("https")
+ .type(String.class)
+ .dest("s3-endpoint-protocol")
+ .metavar("S3-ENDPOINT-PROTOCOL")
+ .help("The protocol of S3 endpoint. Default is https, other options are http");
+ parser.addArgument("--s3-endpoint")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("s3-endpoint")
+ .metavar("S3-ENDPOINT")
+ .help("The endpoint of S3. Pay attention that protocol is not included. Example: s3.amazonaws.com");
+ parser.addArgument("--s3-data-bucket")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("s3-data-bucket")
+ .metavar("S3-DATA-BUCKET")
+ .help("The bucket name of S3 that used to store kafka's stream data");
+ parser.addArgument("--s3-ops-bucket")
+ .action(store())
+ .required(false)
+ .type(String.class)
+ .dest("s3-ops-bucket")
+ .metavar("S3-OPS-BUCKET")
+ .help("The bucket name of S3 that used to store operation data like metric,log,naming service info etc. Use different bucket to store data and ops data is recommended");
+ parser.addArgument("--s3-path-style")
+ .action(store())
+ .required(false)
+ .setDefault(false)
+ .type(String.class)
+ .dest("s3-path-style")
+ .metavar("S3-PATH-STYLE")
+ .help("Enable s3 path style. If you are using minio, you need set it to true, default value is false.");
+
+ return parser;
+ }
+
+ public String run() {
+ System.out.println("#################################### S3 PRECHECK #################################");
+ System.out.println();
+
+ //precheck
+ var context = S3Utils.S3Context.builder()
+ .setEndpoint(parameter.endpointProtocol.getName() + "://" + parameter.s3Endpoint)
+ .setAccessKey(parameter.s3AccessKey)
+ .setSecretKey(parameter.s3SecretKey)
+ .setBucketName(parameter.s3DataBucket)
+ .setRegion(parameter.s3Region)
+ .setForcePathStyle(false)
+ .build();
+ S3Utils.checkS3Access(context);
+
+ String s3Url = buildS3Url();
+ System.out.println("########## S3 URL RESULT ############");
+ System.out.println();
+ System.out.println("Your S3 URL is: \n");
+ System.out.println(s3Url);
+ System.out.println("\n");
+
+ System.out.println("############ S3 URL USAGE ##############");
+ System.out.println("You can use s3url to generate start command to start AutoMQ");
+ System.out.println("------------------------ COPY ME ------------------");
+ //tips: Not add whitespace after \\
+ System.out.println(String.format("bin/automq-kafka-admin.sh %s \\%n"
+ + "--s3-url=\"%s\" \\%n"
+ + "--controller-address=\"192.168.0.1:9093;192.168.0.2:9093;192.168.0.3:9093\" \\%n"
+ + "--broker-address=\"192.168.0.4:9092;192.168.0.5:9092\" %n", GENERATE_START_COMMAND_CMD, s3Url
+ ));
+ System.out.println("TIPS: Replace the controller-address and broker-address with your real ip list.");
+
+ return s3Url;
+ }
+
+ private String buildS3Url() {
+ StringBuilder s3UrlBuilder = new StringBuilder();
+ s3UrlBuilder
+ .append("s3://")
+ .append(parameter.s3Endpoint)
+ .append("?").append("s3-access-key=").append(parameter.s3AccessKey)
+ .append("&").append("s3-secret-key=").append(parameter.s3SecretKey)
+ .append("&").append("s3-region=").append(parameter.s3Region)
+ .append("&").append("s3-auth-method=").append(parameter.s3AuthMethod.getKeyName())
+ .append("&").append("s3-endpoint-protocol=").append(parameter.endpointProtocol.getName())
+ .append("&").append("s3-data-bucket=").append(parameter.s3DataBucket)
+ .append("&").append("s3-path-style=").append(parameter.s3PathStyle)
+ .append("&").append("s3-ops-bucket=").append(parameter.s3OpsBucket)
+ .append("&").append("cluster-id=").append(Uuid.randomUuid());
+ return s3UrlBuilder.toString();
+ }
+
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/GenerateStartCmdCmd.java b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateStartCmdCmd.java
new file mode 100644
index 0000000000..8d3d20ab82
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/automq/GenerateStartCmdCmd.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq;
+
+import java.io.IOException;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import org.apache.kafka.tools.automq.model.ServerGroupConfig;
+import org.apache.kafka.tools.automq.util.ConfigParserUtil;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static org.apache.kafka.tools.automq.AutoMQKafkaAdminTool.GENERATE_CONFIG_PROPERTIES_CMD;
+import static org.apache.kafka.tools.automq.AutoMQKafkaAdminTool.GENERATE_S3_URL_CMD;
+
+public class GenerateStartCmdCmd {
+ private final Parameter parameter;
+
+ public GenerateStartCmdCmd(GenerateStartCmdCmd.Parameter parameter) {
+ this.parameter = parameter;
+ }
+
+ static class Parameter {
+ final String s3Url;
+ final String controllerAddress;
+
+ final String brokerAddress;
+
+ final String networkBaselineBandwidthMB;
+
+ final boolean controllerOnlyMode;
+
+ Parameter(Namespace res) {
+ this.s3Url = res.getString("s3-url");
+ this.brokerAddress = res.getString("broker-address");
+ this.controllerAddress = res.getString("controller-address");
+ this.networkBaselineBandwidthMB = res.getString("network-baseline-bandwidth-mb");
+ this.controllerOnlyMode = res.getBoolean("controller-only-mode");
+ }
+ }
+
+ public static ArgumentParser addArguments(Subparser parser) {
+ parser.addArgument(GENERATE_CONFIG_PROPERTIES_CMD)
+ .action(store())
+ .required(true);
+ parser.addArgument("--s3-url")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("s3-url")
+ .metavar("S3-URL")
+ .help(String.format("AutoMQ use s3 url to access your s3 and create AutoMQ cluster. You can generate s3 url with cmd 'bin/automq-kafka-admin.sh %s'", GENERATE_S3_URL_CMD));
+ parser.addArgument("--controller-address")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("controller-address")
+ .metavar("CONTROLLER-ADDRESS")
+ .help("Your controller ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
+ parser.addArgument("--broker-address")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("broker-address")
+ .metavar("BROKER-ADDRESS")
+ .help("Your broker ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
+ parser.addArgument("--controller-only-mode")
+ .action(store())
+ .required(false)
+ .type(Boolean.class)
+ .dest("controller-only-mode")
+ .setDefault(false)
+ .metavar("CONTROLLER-ONLY-MODE")
+ .help("If this is set to true, all controllers is also seen as broker. If you want to run controller only, set this to true. Default is false.");
+ parser.addArgument("--network-baseline-bandwidth-mb")
+ .action(store())
+ .required(false)
+ .type(Integer.class)
+ .dest("network-baseline-bandwidth-mb")
+ .metavar("NETWORK-BASELINE-BANDWIDTH-MB")
+ .help("Network baseline bandwidth of your machine to run broker or controller. Usually you can get it from your cloud provider's official instance document. Example: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/memory-optimized-instances.html");
+
+ return parser;
+ }
+
+ public void run() throws IOException {
+ ServerGroupConfig controllerGroupConfig = ConfigParserUtil.genControllerConfig(parameter.controllerAddress, parameter.controllerOnlyMode);
+ ServerGroupConfig brokerGroupConfig = ConfigParserUtil.genBrokerConfig(parameter.brokerAddress, controllerGroupConfig);
+
+ System.out.println("############## START CMD LIST ###########");
+ System.out.println("You can copy the command to where your AutoMQ tgz located and run following command to start a AutoMQ kafka server: \n");
+ System.out.println("Ensure that your compute instance already have JDK17 installed. Execute 'java -version' to check.");
+ System.out.println();
+ System.out.println("------------------------ COPY ME ------------------");
+
+ for (int controllerNodeId : controllerGroupConfig.getNodeIdList()) {
+ if (parameter.controllerOnlyMode) {
+ System.out.println(String.format("bin/kafka-server-start.sh "
+ + "--s3-url=\"%s\" "
+ + "--override process.roles=controller "
+ + "--override node.id=%s "
+ + "--override controller.quorum.voters=%s "
+ + "--override listeners=%s ", parameter.s3Url, controllerNodeId, controllerGroupConfig.getQuorumVoters(), controllerGroupConfig.getListenerMap().get(controllerNodeId)));
+ } else {
+ System.out.println(String.format("bin/kafka-server-start.sh "
+ + "--s3-url=\"%s\" "
+ + "--override process.roles=broker,controller "
+ + "--override node.id=%s "
+ + "--override controller.quorum.voters=%s "
+ + "--override listeners=%s "
+ + "--override advertised.listeners=%s ", parameter.s3Url, controllerNodeId, controllerGroupConfig.getQuorumVoters(), controllerGroupConfig.getListenerMap().get(controllerNodeId), controllerGroupConfig.getAdvertisedListenerMap().get(controllerNodeId)));
+ }
+ System.out.println();
+ }
+
+ for (int brokerNodeId : brokerGroupConfig.getNodeIdList()) {
+ System.out.println(String.format("bin/kafka-server-start.sh "
+ + "--s3-url=\"%s\" "
+ + "--override process.roles=broker "
+ + "--override node.id=%s "
+ + "--override controller.quorum.voters=%s "
+ + "--override listeners=%s "
+ + "--override advertised.listeners=%s ", parameter.s3Url, brokerNodeId, brokerGroupConfig.getQuorumVoters(), brokerGroupConfig.getListenerMap().get(brokerNodeId), brokerGroupConfig.getAdvertisedListenerMap().get(brokerNodeId)));
+ System.out.println();
+ }
+ System.out.println();
+ System.out.println("TIPS: Start controllers first and then the brokers.");
+ System.out.println();
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/model/ServerGroupConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/model/ServerGroupConfig.java
new file mode 100644
index 0000000000..ff4b37031e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/automq/model/ServerGroupConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq.model;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Contain several server's config
+ */
+public class ServerGroupConfig {
+ final private List nodeIdList;
+
+ final private String quorumVoters;
+
+ /**
+ * Key is allocated node id and value is listener info
+ */
+ final private Map listenerMap;
+
+ final private Map advertisedListenerMap;
+
+ public ServerGroupConfig(List nodeIdList, String quorumVoters, Map listenerMap,
+ Map advertisedListenerMap) {
+ this.nodeIdList = nodeIdList;
+ this.quorumVoters = quorumVoters;
+ this.listenerMap = listenerMap;
+ this.advertisedListenerMap = advertisedListenerMap;
+ }
+
+ public List getNodeIdList() {
+ return nodeIdList;
+ }
+
+ public String getQuorumVoters() {
+ return quorumVoters;
+ }
+
+ public Map getListenerMap() {
+ return listenerMap;
+ }
+
+ public Map getAdvertisedListenerMap() {
+ return advertisedListenerMap;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/util/ConfigParserUtil.java b/tools/src/main/java/org/apache/kafka/tools/automq/util/ConfigParserUtil.java
new file mode 100644
index 0000000000..9cda9d6867
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/automq/util/ConfigParserUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.tools.automq.model.ServerGroupConfig;
+
+public class ConfigParserUtil {
+
+ public static ServerGroupConfig genControllerConfig(String ipPortList, boolean isControllerOnlyMode) {
+ String[] ipPortPairs = ipPortList.split(";");
+ List nodeIdList = new ArrayList<>();
+ StringBuilder quorumVoters = new StringBuilder();
+ Map listenerMap = new HashMap<>();
+ Map advertisedListenerMap = new HashMap<>();
+
+ for (int i = 0; i < ipPortPairs.length; i++) {
+ nodeIdList.add(i);
+ // Build quorumVoters
+ if (i > 0) {
+ quorumVoters.append(",");
+ }
+ quorumVoters.append(i).append("@").append(ipPortPairs[i]);
+
+ if (isControllerOnlyMode) {
+ listenerMap.put(i, "CONTROLLER://" + ipPortPairs[i]);
+ } else {
+ if ("9092".equals(ipPortPairs[i].split(":")[1])) {
+ throw new UnsupportedOperationException("Controller port can not be 9092 in server mode,because it will conflict with broker port");
+ }
+
+ // server force to listen 9092 by default
+ String advertisedListener = "PLAINTEXT://" + ipPortPairs[i].split(":")[0] + ":9092";
+ listenerMap.put(i, advertisedListener + "," + "CONTROLLER://" + ipPortPairs[i]);
+ advertisedListenerMap.put(i, advertisedListener);
+ }
+ }
+
+ return new ServerGroupConfig(nodeIdList, quorumVoters.toString(), listenerMap, advertisedListenerMap);
+ }
+
+ public static ServerGroupConfig genBrokerConfig(String ipPortList,
+ ServerGroupConfig controllerGroupConfig) {
+ String[] ipPortPairs = ipPortList.split(";");
+ List nodeIdList = new ArrayList<>();
+ Map listenerMap = new HashMap<>();
+ int startIndex = controllerGroupConfig.getNodeIdList().size();
+ for (int i = startIndex; i < startIndex + ipPortPairs.length; i++) {
+ listenerMap.put(i, "PLAINTEXT://" + ipPortPairs[i - startIndex]);
+ nodeIdList.add(i);
+ }
+
+ return new ServerGroupConfig(
+ nodeIdList,
+ controllerGroupConfig.getQuorumVoters(),
+ listenerMap, listenerMap);
+
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/automq/GenerateConfigFileCmdTest.java b/tools/src/test/java/org/apache/kafka/tools/automq/GenerateConfigFileCmdTest.java
new file mode 100644
index 0000000000..2de5c60771
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/automq/GenerateConfigFileCmdTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq;
+
+import com.automq.s3shell.sdk.model.S3Url;
+import com.automq.s3shell.sdk.util.S3PropUtil;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.tools.automq.model.ServerGroupConfig;
+import org.apache.kafka.tools.automq.util.ConfigParserUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static com.automq.s3shell.sdk.util.S3PropUtil.BROKER_PROPS_PATH;
+import static com.automq.s3shell.sdk.util.S3PropUtil.CONTROLLER_PROPS_PATH;
+import static com.automq.s3shell.sdk.util.S3PropUtil.SERVER_PROPS_PATH;
+
+class GenerateConfigFileCmdTest {
+
+ private static GenerateConfigFileCmd cmd;
+
+ @BeforeAll
+ public static void setup() {
+
+ Namespace namespace = new Namespace(
+ Map.of(
+ "s3-url", "myak",
+
+ "broker-address", "mysk",
+
+ "controller-address", "192.168.0.1:9093",
+
+ "network-baseline-bandwidth-mb", 100,
+
+ "controller-only-mode", false
+
+ )
+ );
+ GenerateConfigFileCmd.Parameter parameter = new GenerateConfigFileCmd.Parameter(namespace);
+ cmd = new GenerateConfigFileCmd(parameter);
+ }
+
+ @Test
+ void loadTemplatePropsTest() {
+
+ Properties props = null;
+ try {
+ props = S3PropUtil.loadTemplateProps(CONTROLLER_PROPS_PATH);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Assertions.assertNotNull(props);
+ Assertions.assertEquals(props.getProperty("controller.quorum.voters"), "1@localhost:9093");
+
+ try {
+ props = S3PropUtil.loadTemplateProps(BROKER_PROPS_PATH);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Assertions.assertNotNull(props);
+ Assertions.assertEquals(props.getProperty("process.roles"), "broker");
+
+ try {
+ props = S3PropUtil.loadTemplateProps(SERVER_PROPS_PATH);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Assertions.assertNotNull(props);
+ Assertions.assertEquals(props.getProperty("process.roles"), "broker,controller");
+ }
+
+ @Test
+ void flushPropsTest() {
+ Properties props = null;
+ try {
+ props = S3PropUtil.loadTemplateProps(BROKER_PROPS_PATH);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Assertions.assertNotNull(props);
+ try {
+ cmd.flushProps(props, "broker.properties");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Assertions.assertNotNull(props);
+
+ File propFile = new File("generated/broker.properties");
+ Properties propsFromFile;
+ try {
+ propsFromFile = new Properties();
+ propsFromFile.load(FileUtils.openInputStream(propFile));
+ Assertions.assertEquals("broker", propsFromFile.getProperty("process.roles"));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Assertions.assertNotNull(propsFromFile);
+
+ }
+
+ @Test
+ void processGroupConfigTest() {
+
+ String s3UrlStr = "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xxx&s3-secret-key=yyy&s3-region=cn-northwest-1&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&cluster-id=HvxKzNetT1GOCUkqCG5eyQ";
+ S3Url s3Url = S3Url.parse(s3UrlStr);
+ ServerGroupConfig controllerGroupConfig = ConfigParserUtil.genControllerConfig("192.168.0.1:9093;192.168.0.2:9093;192.168.0.3:9093", false);
+
+ ServerGroupConfig brokerGroupConfig = ConfigParserUtil.genBrokerConfig("192.168.0.4:9092;192.168.0.5:9092", controllerGroupConfig);
+ try {
+ GenerateConfigFileCmd cmd = new GenerateConfigFileCmd(null);
+ List propFiles = cmd.processGroupConfig(brokerGroupConfig, BROKER_PROPS_PATH, "broker", s3Url);
+ Assertions.assertEquals(2, propFiles.size());
+ Properties groupProps = S3PropUtil.loadTemplateProps(propFiles.get(0));
+ Assertions.assertEquals(3, groupProps.getProperty("node.id"));
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/kafka/tools/automq/GenerateS3UrlCmdTest.java b/tools/src/test/java/org/apache/kafka/tools/automq/GenerateS3UrlCmdTest.java
new file mode 100644
index 0000000000..b6f9219b8b
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/automq/GenerateS3UrlCmdTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq;
+
+import java.util.Map;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+class GenerateS3UrlCmdTest {
+
+ @Test
+ @Disabled
+ void run() {
+ Namespace namespace = new Namespace(
+ Map.of(
+ "s3-access-key", "myak",
+
+ "s3-secret-key", "mysk",
+
+ "s3-auth-method", "key-from-args",
+
+ "s3-region", "cn-northwest-1",
+
+ "s3-endpoint-protocol", "https",
+
+ "s3-endpoint", "s3.amazonaws.com",
+
+ "s3-data-bucket", "automq-data-bucket",
+
+ "s3-ops-bucket", "automq-ops-bucket"
+ )
+ );
+ GenerateS3UrlCmd.Parameter parameter = new GenerateS3UrlCmd.Parameter(namespace);
+ GenerateS3UrlCmd cmd = new GenerateS3UrlCmd(parameter);
+ Assertions.assertTrue(cmd.run().startsWith("s3://s3.amazonaws.com?s3-access-key=myak&s3-secret-key=mysk&s3-region=cn-northwest-1&s3-auth-method=key-from-args&s3-endpoint-protocol=https&s3-data-bucket=automq-data-bucket&s3-ops-bucket=automq-ops-bucket&cluster-id="));
+ }
+}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/kafka/tools/automq/model/S3UrlTest.java b/tools/src/test/java/org/apache/kafka/tools/automq/model/S3UrlTest.java
new file mode 100644
index 0000000000..fb08269c6a
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/automq/model/S3UrlTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq.model;
+
+import com.automq.s3shell.sdk.model.S3Url;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class S3UrlTest {
+
+ @Test
+ void parse() {
+ String s3Url = "s3://s3.cn-northwest-1.amazonaws.com.cn?s3-access-key=xxx&s3-secret-key=yyy&s3-region=cn-northwest-1&s3-auth-method=key-from-args&s3-endpoint-protocol=https&s3-data-bucket=wanshao-test&s3-ops-bucket=automq-ops-bucket&cluster-id=fZGPJht6Rf-o7WgrUakLxQ";
+ S3Url s3UrlObj = S3Url.parse(s3Url);
+ assertEquals("xxx", s3UrlObj.getS3AccessKey());
+ assertEquals("yyy", s3UrlObj.getS3SecretKey());
+ assertEquals("cn-northwest-1", s3UrlObj.getS3Region());
+ assertEquals("key-from-args", s3UrlObj.getS3AuthMethod().getKeyName());
+ assertEquals("https", s3UrlObj.getEndpointProtocol().getName());
+ assertEquals("s3.cn-northwest-1.amazonaws.com.cn", s3UrlObj.getS3Endpoint());
+ assertEquals("wanshao-test", s3UrlObj.getS3DataBucket());
+ assertEquals("automq-ops-bucket", s3UrlObj.getS3OpsBucket());
+ assertEquals("fZGPJht6Rf-o7WgrUakLxQ", s3UrlObj.getClusterId());
+
+ }
+}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/kafka/tools/automq/util/ConfigParserUtilTest.java b/tools/src/test/java/org/apache/kafka/tools/automq/util/ConfigParserUtilTest.java
new file mode 100644
index 0000000000..e0c9dff5d5
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/automq/util/ConfigParserUtilTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.automq.util;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.kafka.tools.automq.model.ServerGroupConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class ConfigParserUtilTest {
+
+ @Test
+ void generatedServerConfig() {
+ ServerGroupConfig config = ConfigParserUtil.genControllerConfig("192.168.0.1:9093;192.168.0.2:9094;192.168.0.3:9095", false);
+ assertEquals(3, config.getNodeIdList().size());
+ assertEquals("0@192.168.0.1:9093,1@192.168.0.2:9094,2@192.168.0.3:9095", config.getQuorumVoters());
+ List listenerList = Arrays.asList("PLAINTEXT://192.168.0.1:9092,CONTROLLER://192.168.0.1:9093", "PLAINTEXT://192.168.0.2:9092,CONTROLLER://192.168.0.2:9094", "PLAINTEXT://192.168.0.3:9092,CONTROLLER://192.168.0.3:9095");
+ for (int i = 0; i < config.getListenerMap().size(); i++) {
+ assertEquals(listenerList.get(i), config.getListenerMap().get(i));
+ }
+
+ }
+
+ @Test
+ void genBrokerConfig() {
+ ServerGroupConfig controllerGroupConfig = ConfigParserUtil.genControllerConfig("192.168.0.1:9093;192.168.0.2:9094;192.168.0.3:9095", false);
+ List controllerListeners = Arrays.asList("PLAINTEXT://192.168.0.1:9092,CONTROLLER://192.168.0.1:9093", "PLAINTEXT://192.168.0.2:9092,CONTROLLER://192.168.0.2:9094", "PLAINTEXT://192.168.0.3:9092,CONTROLLER://192.168.0.3:9095");
+ for (int i = 0; i < controllerGroupConfig.getListenerMap().size(); i++) {
+ int nodeId = controllerGroupConfig.getNodeIdList().get(i);
+ assertEquals(controllerListeners.get(i), controllerGroupConfig.getListenerMap().get(nodeId));
+ }
+
+ ServerGroupConfig brokerGroupConfig = ConfigParserUtil.genBrokerConfig("192.168.0.4:9092;192.168.0.5:9092;192.168.0.6:9092", controllerGroupConfig);
+
+ Assertions.assertEquals(3, brokerGroupConfig.getNodeIdList().get(0));
+ Assertions.assertEquals(4, brokerGroupConfig.getNodeIdList().get(1));
+ Assertions.assertEquals(5, brokerGroupConfig.getNodeIdList().get(2));
+ List listenerList = Arrays.asList("PLAINTEXT://192.168.0.4:9092", "PLAINTEXT://192.168.0.5:9092", "PLAINTEXT://192.168.0.6:9092");
+ for (int i = 0; i < brokerGroupConfig.getListenerMap().size(); i++) {
+ int nodeId = brokerGroupConfig.getNodeIdList().get(i);
+ assertEquals(listenerList.get(i), brokerGroupConfig.getListenerMap().get(nodeId));
+ }
+
+ }
+}
\ No newline at end of file