Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6d1c0ca
feat(tool): add automq kafka admin tool
KaimingWan Jan 9, 2024
97c2fff
feat(tool): support precheck when execute admin tool
KaimingWan Jan 9, 2024
a8afa2a
feat(tool): optimize generate s3 url command
KaimingWan Jan 9, 2024
29d3d69
feat(tool): support parse url
KaimingWan Jan 9, 2024
7adbb64
feat(tool): enable generate config properties
KaimingWan Jan 10, 2024
1188eeb
fix(tool): fix admin tool output
KaimingWan Jan 10, 2024
70eb008
fix(tool): fix output style
KaimingWan Jan 10, 2024
0416d32
fix(tool): fix get protocol
KaimingWan Jan 10, 2024
8f6537e
fix(tool): fix admin tool output style
KaimingWan Jan 10, 2024
5688a12
fix(tool): fix admin tool output style
KaimingWan Jan 10, 2024
f413ee8
fix(tool): fix admin tool output style
KaimingWan Jan 10, 2024
f1e8df6
fix(tool): fix not set listener correctly for server listeners
KaimingWan Jan 10, 2024
c0d7119
fix(tool): fix admin tool output style
KaimingWan Jan 10, 2024
b59d66f
fix(tool): fix admin tool output style
KaimingWan Jan 10, 2024
f568110
fix(tool): fix advertised listeners
KaimingWan Jan 10, 2024
26ccb2b
fix(tool): fix not set bucket
KaimingWan Jan 10, 2024
b6fa038
fix(tool): fix output
KaimingWan Jan 10, 2024
71bab07
fix(tool): rename parameter
KaimingWan Jan 11, 2024
8821199
fix(tool): make http protocol not required
KaimingWan Jan 11, 2024
d3449bd
fix(tool): support s3 path style parameter
KaimingWan Jan 11, 2024
683661e
feat(tool): add s3shell sdk and support load s3 url in kafka main
KaimingWan Jan 11, 2024
ac815ef
feat(tool): support start by s3 url
KaimingWan Jan 11, 2024
df2f1ab
fix(tool): fix system env set
KaimingWan Jan 12, 2024
9579b81
fix(tool): optimize start command
KaimingWan Jan 12, 2024
5222c3a
fix(tool): fix format storage
KaimingWan Jan 12, 2024
5ab049e
fix(tool): suppress warning log
KaimingWan Jan 12, 2024
7f1027a
chore(tool): refactor some code and add testcase
KaimingWan Jan 12, 2024
d080f72
fix(tool): fix controller only mode
KaimingWan Jan 12, 2024
b6f8cee
fix(tool): remove comment
KaimingWan Jan 12, 2024
b9ac6e8
fix(tool): fix testcase
KaimingWan Jan 12, 2024
bebad63
fix(tool): fix code style
KaimingWan Jan 12, 2024
3a32086
fix(tool): remove useless pkg
KaimingWan Jan 12, 2024
575cd09
chore: use gradle to manage new module
KaimingWan Jan 12, 2024
b25bd5b
chore: add check style
KaimingWan Jan 12, 2024
0266d7b
fix(tool): remove useless template file
KaimingWan Jan 12, 2024
f6d8cd2
fix(tool): fix style
KaimingWan Jan 12, 2024
12c11c0
chore: move args to kafka-run-class.sh
KaimingWan Jan 12, 2024
382d108
chore: fix style
KaimingWan Jan 12, 2024
075257d
refactor(tool): use sub-parser and remove useless code
KaimingWan Jan 12, 2024
0b8f774
refactor(tool): move code to more suitable place
KaimingWan Jan 15, 2024
f0fa605
chore: use default 2c16g configuration in template config file
KaimingWan Jan 15, 2024
efa13e7
chore: open auth method and ops-bucket for automq enterprise
KaimingWan Jan 15, 2024
9fe6e8e
chore: fix scala code indent
KaimingWan Jan 15, 2024
612a406
Merge branch 'develop' into feat/admin_tool
KaimingWan Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bin/automq-kafka-admin.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.automq.AutoMQKafkaAdminTool "$@"

2 changes: 2 additions & 0 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
fi

KAFKA_JDK_COMPATIBILITY_OPTS=""
# We need to override KAFKA_S3_ACCESS_KEY and KAFKA_S3_SECRET_KEY. There is no method called System.setEnv, so we set system environment variable by reflection. Add this --add-opens to enable reflection to set system env in class EnvUtil
KAFKA_JDK_COMPATIBILITY_OPTS="${KAFKA_JDK_COMPATIBILITY_OPTS} --add-opens=java.base/java.util=ALL-UNNAMED "
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
KAFKA_JDK_COMPATIBILITY_OPTS="${KAFKA_JDK_COMPATIBILITY_OPTS} --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true"
fi
Expand Down
52 changes: 43 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ project(':core') {
implementation project(':metadata')
implementation project(':raft')
implementation project(':storage')

implementation project(':s3shell-kafka-sdk')

implementation libs.argparse4j
implementation libs.jacksonDatabind
Expand Down Expand Up @@ -1171,6 +1171,8 @@ project(':core') {
from(project(':tools').configurations.runtimeClasspath) { into("libs/") }
from(project(':trogdor').jar) { into("libs/") }
from(project(':trogdor').configurations.runtimeClasspath) { into("libs/") }
from(project(':s3shell-kafka-sdk').jar) { into("libs/") }
from(project(':s3shell-kafka-sdk').configurations.runtimeClasspath) { into("libs/") }
from(project(':shell').jar) { into("libs/") }
from(project(':shell').configurations.runtimeClasspath) { into("libs/") }
from(project(':connect:api').jar) { into("libs/") }
Expand Down Expand Up @@ -1842,27 +1844,59 @@ project(':storage') {
}
}

project(':s3shell-kafka-sdk') {
archivesBaseName = "s3shell-kafka-sdk"

checkstyle {
configProperties = checkstyleConfigProperties("import-control-automq.xml")
}
}

project(':tools') {
archivesBaseName = "kafka-tools"

dependencies {
implementation project(':clients')
implementation project(':server-common')
implementation project(':log4j-appender')
implementation (project(':clients')){
exclude group: 'org.slf4j', module: '*'
}
implementation (project(':server-common')){
exclude group: 'org.slf4j', module: '*'
}
implementation (project(':log4j-appender')){
exclude group: 'org.slf4j', module: '*'
}
implementation project(':s3shell-kafka-sdk')

implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.slf4jApi
implementation libs.log4j

implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
// for SASL/OAUTHBEARER JWT validation
implementation (libs.jose4j){
exclude group: 'org.slf4j', module: '*'
}
implementation libs.jacksonJaxrsJsonProvider
implementation(libs.s3stream) {
exclude group: 'org.slf4j', module: '*'
exclude group: 'net.sourceforge.argparse4j', module: '*'
}
implementation libs.commonio

testImplementation project(':clients')

testImplementation (project(':clients')){
exclude group: 'org.slf4j', module: '*'
}
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation (project(':core')){
exclude group: 'org.slf4j', module: '*'
}
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation (project(':server-common')){
exclude group: 'org.slf4j', module: '*'
}


testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
Expand Down
55 changes: 55 additions & 0 deletions checkstyle/import-control-automq.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<import-control pkg="com.automq">

<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->

<!-- common library dependencies -->
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<allow pkg="io.netty" />
<allow pkg="software.amazon.awssdk" />
<allow pkg="com.automq.stream" />
<allow pkg="org.apache.kafka.tools.automq"/>
<allow pkg="com.automq.s3shell"/>
<allow pkg="org.apache.commons.io"/>

<allow pkg="com.github.luben.zstd" />
<allow pkg="com.google.common.cache" />





</import-control>
8 changes: 8 additions & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,12 @@
<allow pkg="org.apache.directory" />
<allow pkg="org.mockito" />
</subpackage>

<subpackage name="s3shell">
<allow pkg="joptsimple" />
<allow pkg="kafka.log.stream.s3" />
<allow pkg="kafka.s3shell" />
<allow pkg="kafka.tools" />
<allow pkg="com.automq.s3shell.sdk" />
</subpackage>
</import-control>
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<allow pkg="io.netty" />
<allow pkg="software.amazon.awssdk" />
<allow pkg="com.automq.stream" />
<allow pkg="org.apache.kafka.tools.automq"/>
<allow pkg="com.automq.s3shell"/>
<allow pkg="org.apache.commons.io"/>

<allow pkg="com.github.luben.zstd" />
<allow pkg="com.google.common.cache" />

Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/kafka/s3shell/util/EnvUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.s3shell.util;

import java.lang.reflect.Field;
import java.util.Map;

public class EnvUtil {
public static void setEnv(String key, String value) {
try {
Map<String, String> env = System.getenv();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that setting the environments using reflection here.

S3StreamConfig already supports config credentials, consider setting ak/sk directly to avoid using reflection.

Field field = env.getClass().getDeclaredField("m");
field.setAccessible(true);
Map<String, String> writableEnv = (Map<String, String>) field.get(env);
writableEnv.put(key, value);
} catch (Exception e) {
throw new IllegalStateException("Failed to set environment variable", e);
}
}
}
44 changes: 44 additions & 0 deletions core/src/main/java/kafka/s3shell/util/KafkaFormatUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.s3shell.util;

import com.automq.s3shell.sdk.util.S3PropUtil;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
import kafka.tools.StorageTool;

public class KafkaFormatUtil {
public static void formatStorage(String clusterId, Properties props) throws IOException {
String propFileName = String.format("automq-%s.properties", clusterId);
String propFilePath = "generated/" + propFileName;
String logDirPath = props.getProperty("log.dirs");

Path propPath = Paths.get(propFilePath);
if (Files.exists(propPath)) {
//delete if exists
Files.delete(propPath);
}

S3PropUtil.persist(props, propFileName);
if (!Files.isDirectory(Paths.get(logDirPath)) || !Files.exists(Paths.get(logDirPath, "meta.properties"))) {
StorageTool.main(new String[] {"auto-format", "-t", clusterId, "-c=" + propFilePath});
}
}
}
116 changes: 116 additions & 0 deletions core/src/main/java/kafka/s3shell/util/S3ShellPropUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.s3shell.util;

import com.automq.s3shell.sdk.constant.ServerConfigKey;
import com.automq.s3shell.sdk.model.S3Url;
import com.automq.s3shell.sdk.util.S3PropUtil;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import org.apache.kafka.common.internals.FatalExitError;

import static kafka.log.stream.s3.ConfigUtils.ACCESS_KEY_NAME;
import static kafka.log.stream.s3.ConfigUtils.SECRET_KEY_NAME;

public class S3ShellPropUtil {

private static OptionParser acceptOption() {
OptionParser optionParser = new OptionParser();
optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(String.class);
optionParser.accepts("config", "Path to server.properties file")
.withRequiredArg()
.ofType(String.class);
optionParser.accepts("s3-url", "URL for S3 storage")
.withRequiredArg()
.ofType(String.class);
optionParser.accepts("version", "Print version information and exit.");
return optionParser;
}

public static Properties autoGenPropsByCmd(String[] args, String processRole) throws IOException {
if (args.length < 1) {
throw new FatalExitError(1);
}

Properties props = new Properties();
switch (processRole) {
case "broker":
props.putAll(S3PropUtil.loadTemplateProps(S3PropUtil.BROKER_PROPS_PATH));
break;
case "controller":
props.putAll(S3PropUtil.loadTemplateProps(S3PropUtil.CONTROLLER_PROPS_PATH));
break;
case "broker,controller":
case "controller,broker":
props.putAll(S3PropUtil.loadTemplateProps(S3PropUtil.SERVER_PROPS_PATH));
break;
default:
throw new IllegalArgumentException("Invalid process role:" + processRole);
}

// Handle --override options
OptionParser optionParser = acceptOption();
OptionSet options = optionParser.parse(args);
handleOption(options, props);

return props;
}

private static void handleOption(OptionSet options, Properties props) {
S3Url s3Url = null;
if (options.has("s3-url")) {
String s3UrlStr = (String) options.valueOf("s3-url");
s3Url = S3Url.parse(s3UrlStr);
props.put(ServerConfigKey.S3_ENDPOINT.getKeyName(), s3Url.getEndpointProtocol().getName() + "://" + s3Url.getS3Endpoint());
props.put(ServerConfigKey.S3_REGION.getKeyName(), s3Url.getS3Region());
props.put(ServerConfigKey.S3_BUCKET.getKeyName(), s3Url.getS3DataBucket());
props.put(ServerConfigKey.S3_PATH_STYLE.getKeyName(), String.valueOf(s3Url.isS3PathStyle()));

// override system env
EnvUtil.setEnv(ACCESS_KEY_NAME, s3Url.getS3AccessKey());
EnvUtil.setEnv(SECRET_KEY_NAME, s3Url.getS3SecretKey());
}

if (options.has("override")) {
List<?> overrideOptions = options.valuesOf("override");
for (Object o : overrideOptions) {
String option = (String) o;
String[] keyValue = option.split("=", 2);
if (keyValue.length == 2) {
props.setProperty(keyValue[0], keyValue[1]);
} else {
throw new IllegalArgumentException("Invalid override option format: " + option);
}
}
}

//format storage
if (s3Url != null) {
try {
KafkaFormatUtil.formatStorage(s3Url.getClusterId(), props);
} catch (IOException e) {
throw new RuntimeException(String.format("Format storage failed for cluster:%s", s3Url.getClusterId()), e);
}
}
}
}

Loading