Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,24 @@ elasticstream.enable=true
# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
# Note that bucket name should not be included in the endpoint.
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.endpoint=https://s3.amazonaws.com

# The region of S3 service
# For Aliyun, you have to set the region to aws-global. See https://www.alibabacloud.com/help/zh/oss/developer-reference/use-amazon-s3-sdks-to-access-oss.
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.region=us-east-1

# The bucket of S3 service to store data
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.bucket=ko3

# Use path style access for S3, default false
# If you are using minio for storage, you have to set this to true.
#s3.path.style=true

# The file path of delta WAL in block device
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.wal.path=/tmp/kraft-broker-logs/s3wal

# The maximum size of delta WAL in block device, default 2GB
Expand Down
3 changes: 3 additions & 0 deletions config/kraft/controller.properties
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,16 @@ elasticstream.enable=true
# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
# Note that bucket name should not be included in the endpoint.
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.endpoint=https://s3.amazonaws.com

# The region of S3 service
# For Aliyun, you have to set the region to aws-global. See https://www.alibabacloud.com/help/zh/oss/developer-reference/use-amazon-s3-sdks-to-access-oss.
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.region=us-east-1

# The bucket of S3 service to store data
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.bucket=ko3

# Use path style access for S3, default false
Expand Down
4 changes: 4 additions & 0 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,24 @@ elasticstream.enable=true
# see https://docs.aws.amazon.com/general/latest/gr/s3.html for AWS S3
# For Baidu Cloud, some regions, like cn-shanghai, may not support connecting with https, you can use http instead.
# Note that bucket name should not be included in the endpoint.
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.endpoint=https://s3.amazonaws.com

# The region of S3 service
# For Aliyun, you have to set the region to aws-global. See https://www.alibabacloud.com/help/zh/oss/developer-reference/use-amazon-s3-sdks-to-access-oss.
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.region=us-east-1

# The bucket of S3 service to store data
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.bucket=ko3

# Use path style access for S3, default false
# If you are using minio for storage, you have to set this to true.
#s3.path.style=true

# The file path of delta WAL in block device
# DEPRECATED TIPS: This property will be replaced by S3Url later in the future version. Reference the official document and use S3Url to start AutoMQ is recommended.
s3.wal.path=/tmp/kraft-combined-logs/s3wal

# The maximum size of delta WAL in block device, default 2GB
Expand Down
9 changes: 2 additions & 7 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,8 @@ object Kafka extends Logging {
try {
// AutoMQ for Kafka inject start
val serverProps = getPropsFromArgs(args)
var s3UrlString = "";
for (elem <- args) {
if (elem.startsWith("s3-url")) {
s3UrlString = elem.split("=")(1)
}
}
if (s3UrlString == null || s3UrlString.isEmpty) {
val s3UrlString = S3Url.parseS3UrlValFromArgs(args)
if (s3UrlString == null ) {
CredentialsProviderHolder.create(EnvVariableCredentialsProvider.get())
} else {
val s3Url = S3Url.parse(s3UrlString)
Expand Down

This file was deleted.

28 changes: 15 additions & 13 deletions kshell-sdk/src/main/java/com/automq/s3shell/sdk/model/S3Url.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public class S3Url {
final String s3AccessKey;
final String s3SecretKey;

final AuthMethod s3AuthMethod;

final String s3Region;

final EndpointProtocol endpointProtocol;
Expand All @@ -39,12 +37,11 @@ public class S3Url {

final boolean s3PathStyle;

public S3Url(String s3AccessKey, String s3SecretKey, AuthMethod s3AuthMethod, String s3Region,
public S3Url(String s3AccessKey, String s3SecretKey, String s3Region,
EndpointProtocol endpointProtocol, String s3Endpoint, String s3DataBucket, String s3OpsBucket, String clusterId,
boolean s3PathStyle) {
this.s3AccessKey = s3AccessKey;
this.s3SecretKey = s3SecretKey;
this.s3AuthMethod = s3AuthMethod;
this.s3Region = s3Region;
this.endpointProtocol = endpointProtocol;
this.s3Endpoint = s3Endpoint;
Expand All @@ -54,6 +51,19 @@ public S3Url(String s3AccessKey, String s3SecretKey, AuthMethod s3AuthMethod, St
this.s3PathStyle = s3PathStyle;
}

/**
* @param args input args to start AutoMQ
* @return s3Url value from args, or null if not found
*/
public static String parseS3UrlValFromArgs(String[] args) {
for (String arg : args) {
if (arg.startsWith("--s3-url=")) {
return arg.substring("--s3-url=".length());
}
}
return null;
}

public static S3Url parse(String s3Url) throws IllegalArgumentException {
if (StringUtils.isBlank(s3Url)) {
throw new IllegalArgumentException("s3Url required");
Expand All @@ -66,7 +76,6 @@ public static S3Url parse(String s3Url) throws IllegalArgumentException {

String accessKey = null;
String secretKey = null;
AuthMethod authMethod = null;
String region = null;
EndpointProtocol protocol = null;
String dataBucket = null;
Expand All @@ -90,9 +99,6 @@ public static S3Url parse(String s3Url) throws IllegalArgumentException {
case "s3-secret-key":
secretKey = value;
break;
case "s3-auth-method":
authMethod = AuthMethod.getByName(value);
break;
case "s3-region":
region = value;
break;
Expand All @@ -116,7 +122,7 @@ public static S3Url parse(String s3Url) throws IllegalArgumentException {
}
}

return new S3Url(accessKey, secretKey, authMethod, region, protocol, s3Endpoint, dataBucket, opsBucket, clusterId, s3PathStyle);
return new S3Url(accessKey, secretKey, region, protocol, s3Endpoint, dataBucket, opsBucket, clusterId, s3PathStyle);
}

public String getS3AccessKey() {
Expand All @@ -127,10 +133,6 @@ public String getS3SecretKey() {
return s3SecretKey;
}

public AuthMethod getS3AuthMethod() {
return s3AuthMethod;
}

public String getS3Region() {
return s3Region;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ static class Parameter {

Parameter(Namespace res) {
this.s3Url = res.getString("s3-url");
this.brokerAddress = res.getString("broker-address");
this.controllerAddress = res.getString("controller-address");
this.brokerAddress = res.getString("broker-list");
this.controllerAddress = res.getString("controller-list");
this.networkBaselineBandwidthMB = res.getString("network-baseline-bandwidth-mb");
this.controllerOnlyMode = res.getBoolean("controller-only-mode");
}
Expand All @@ -76,19 +76,19 @@ public static ArgumentParser addArguments(Subparser parser) {
.dest("s3-url")
.metavar("S3-URL")
.help(String.format("AutoMQ use s3 url to access your s3 and create AutoMQ cluster. You can generate s3 url with cmd 'bin/automq-kafka-admin.sh %s'", GENERATE_S3_URL_CMD));
parser.addArgument("--controller-address")
parser.addArgument("--controller-list")
.action(store())
.required(true)
.type(String.class)
.dest("controller-address")
.metavar("CONTROLLER-ADDRESS")
.dest("controller-list")
.metavar("CONTROLLER-LIST")
.help("Your controller ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
parser.addArgument("--broker-address")
.action(store())
.required(true)
.type(String.class)
.dest("broker-address")
.metavar("BROKER-ADDRESS")
.dest("broker-list")
.metavar("BROKER-LIST")
.help("Your broker ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
parser.addArgument("--controller-only-mode")
.action(store())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
*/
package org.apache.kafka.tools.automq;

import com.automq.s3shell.sdk.model.AuthMethod;
import com.automq.s3shell.sdk.model.EndpointProtocol;
import com.automq.stream.utils.S3Utils;
import java.util.List;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.apache.kafka.common.Uuid;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;

import java.util.List;

import static net.sourceforge.argparse4j.impl.Arguments.store;
import static org.apache.kafka.tools.automq.AutoMQKafkaAdminTool.GENERATE_START_COMMAND_CMD;

Expand All @@ -45,8 +43,6 @@ static class Parameter {
final String s3AccessKey;
final String s3SecretKey;

final AuthMethod s3AuthMethod;

final String s3Region;

final EndpointProtocol endpointProtocol;
Expand All @@ -62,12 +58,6 @@ static class Parameter {
Parameter(Namespace res) {
this.s3AccessKey = res.getString("s3-access-key");
this.s3SecretKey = res.getString("s3-secret-key");
String authMethodName = res.getString("s3-auth-method");
if (authMethodName == null || authMethodName.trim().isEmpty()) {
this.s3AuthMethod = AuthMethod.KEY_FROM_ARGS;
} else {
this.s3AuthMethod = AuthMethod.getByName(authMethodName);
}
this.s3Region = res.getString("s3-region");
String endpointProtocolStr = res.get("s3-endpoint-protocol");
this.endpointProtocol = EndpointProtocol.getByName(endpointProtocolStr);
Expand Down Expand Up @@ -101,14 +91,6 @@ public static ArgumentParser addArguments(Subparser parser) {
.dest("s3-secret-key")
.metavar("S3-SECRET-KEY")
.help("Your secretKey that used to access S3");
parser.addArgument("--s3-auth-method")
.action(store())
.required(false)
.setDefault(AuthMethod.KEY_FROM_ARGS.getKeyName())
.type(String.class)
.dest("s3-auth-method")
.metavar("S3-AUTH-METHOD")
.help("The auth method that used to access S3, default is key-from-env, other options are key-from-args and role");
parser.addArgument("--s3-region")
.action(store())
.required(true)
Expand Down Expand Up @@ -140,7 +122,7 @@ public static ArgumentParser addArguments(Subparser parser) {
.help("The bucket name of S3 that used to store kafka's stream data");
parser.addArgument("--s3-ops-bucket")
.action(store())
.required(false)
.required(true)
.type(String.class)
.dest("s3-ops-bucket")
.metavar("S3-OPS-BUCKET")
Expand Down Expand Up @@ -184,10 +166,10 @@ public String run() {
//tips: Not add whitespace after \\
System.out.println(String.format("bin/automq-kafka-admin.sh %s \\%n"
+ "--s3-url=\"%s\" \\%n"
+ "--controller-address=\"192.168.0.1:9093;192.168.0.2:9093;192.168.0.3:9093\" \\%n"
+ "--broker-address=\"192.168.0.4:9092;192.168.0.5:9092\" %n", GENERATE_START_COMMAND_CMD, s3Url
+ "--controller-list=\"192.168.0.1:9093;192.168.0.2:9093;192.168.0.3:9093\" \\%n"
+ "--broker-list=\"192.168.0.4:9092;192.168.0.5:9092\" %n", GENERATE_START_COMMAND_CMD, s3Url
));
System.out.println("TIPS: Replace the controller-address and broker-address with your real ip list.");
System.out.println("TIPS: Replace the controller-list and broker-list with your real ip list.");

return s3Url;
}
Expand All @@ -200,7 +182,6 @@ private String buildS3Url() {
.append("?").append("s3-access-key=").append(parameter.s3AccessKey)
.append("&").append("s3-secret-key=").append(parameter.s3SecretKey)
.append("&").append("s3-region=").append(parameter.s3Region)
.append("&").append("s3-auth-method=").append(parameter.s3AuthMethod.getKeyName())
.append("&").append("s3-endpoint-protocol=").append(parameter.endpointProtocol.getName())
.append("&").append("s3-data-bucket=").append(parameter.s3DataBucket)
.append("&").append("s3-path-style=").append(parameter.s3PathStyle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ public GenerateStartCmdCmd(GenerateStartCmdCmd.Parameter parameter) {

static class Parameter {
final String s3Url;
final String controllerAddress;
final String controllerList;

final String brokerAddress;
final String brokerList;

final String networkBaselineBandwidthMB;

final boolean controllerOnlyMode;

Parameter(Namespace res) {
this.s3Url = res.getString("s3-url");
this.brokerAddress = res.getString("broker-address");
this.controllerAddress = res.getString("controller-address");
this.brokerList = res.getString("broker-list");
this.controllerList = res.getString("controller-list");
this.networkBaselineBandwidthMB = res.getString("network-baseline-bandwidth-mb");
this.controllerOnlyMode = res.getBoolean("controller-only-mode");
}
Expand All @@ -64,19 +64,19 @@ public static ArgumentParser addArguments(Subparser parser) {
.dest("s3-url")
.metavar("S3-URL")
.help(String.format("AutoMQ use s3 url to access your s3 and create AutoMQ cluster. You can generate s3 url with cmd 'bin/automq-kafka-admin.sh %s'", GENERATE_S3_URL_CMD));
parser.addArgument("--controller-address")
parser.addArgument("--controller-list")
.action(store())
.required(true)
.type(String.class)
.dest("controller-address")
.metavar("CONTROLLER-ADDRESS")
.dest("controller-list")
.metavar("CONTROLLER-LIST")
.help("Your controller ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
parser.addArgument("--broker-address")
parser.addArgument("--broker-list")
.action(store())
.required(true)
.type(String.class)
.dest("broker-address")
.metavar("BROKER-ADDRESS")
.dest("broker-list")
.metavar("BROKER-LIST")
.help("Your broker ip:port list, split by ':'. Example: 192.168.0.1:9092;192.168.0.2:9092");
parser.addArgument("--controller-only-mode")
.action(store())
Expand All @@ -98,8 +98,8 @@ public static ArgumentParser addArguments(Subparser parser) {
}

public void run() throws IOException {
ServerGroupConfig controllerGroupConfig = ConfigParserUtil.genControllerConfig(parameter.controllerAddress, parameter.controllerOnlyMode);
ServerGroupConfig brokerGroupConfig = ConfigParserUtil.genBrokerConfig(parameter.brokerAddress, controllerGroupConfig);
ServerGroupConfig controllerGroupConfig = ConfigParserUtil.genControllerConfig(parameter.controllerList, parameter.controllerOnlyMode);
ServerGroupConfig brokerGroupConfig = ConfigParserUtil.genBrokerConfig(parameter.brokerList, controllerGroupConfig);

System.out.println("############## START CMD LIST ###########");
System.out.println("You can copy the command to where your AutoMQ tgz located and run following command to start a AutoMQ kafka server: \n");
Expand Down
Loading