diff --git a/automq-shell/src/main/java/com/automq/shell/AutoMQCLI.java b/automq-shell/src/main/java/com/automq/shell/AutoMQCLI.java index 3ab2a0a543..1570ec94d7 100644 --- a/automq-shell/src/main/java/com/automq/shell/AutoMQCLI.java +++ b/automq-shell/src/main/java/com/automq/shell/AutoMQCLI.java @@ -24,6 +24,8 @@ public class AutoMQCLI { @CommandLine.Option(names = {"-b", "--bootstrap-server"}, description = "The Kafka server to connect to.", required = true) public String bootstrapServer; + @CommandLine.Option(names = {"-c", "--command-config"}, description = "Property file containing configs to be passed to Admin Client.") + public String commandConfig; public static void main(String... args) { int exitCode = new CommandLine(new AutoMQCLI()).execute(args); diff --git a/automq-shell/src/main/java/com/automq/shell/command/ForceClose.java b/automq-shell/src/main/java/com/automq/shell/command/ForceClose.java index 5e24c592a7..0b70fed493 100644 --- a/automq-shell/src/main/java/com/automq/shell/command/ForceClose.java +++ b/automq-shell/src/main/java/com/automq/shell/command/ForceClose.java @@ -20,6 +20,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -28,6 +29,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import picocli.CommandLine; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Command; @@ -70,16 +72,30 @@ static class Exclusive { @Override public Integer call() throws Exception { Properties properties = new Properties(); + if (cli.commandConfig != null) { + try { + properties = Utils.loadProps(cli.commandConfig); + } catch (Exception e) { + System.err.println("Error loading command config file: " + ExceptionUtils.getRootCauseMessage(e)); + return 1; + } + } properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cli.bootstrapServer); Admin admin = Admin.create(properties); - Optional nodeOptional = findAnyNode(admin); + Optional nodeOptional; + try { + nodeOptional = admin.describeCluster().nodes().get().stream().findFirst(); + } catch (Exception e) { + System.err.println("Failed to get Kafka node: " + ExceptionUtils.getRootCauseMessage(e)); + return 1; + } if (nodeOptional.isEmpty()) { System.err.println("No controller node found."); return 1; } - NetworkClient client = CLIUtils.buildNetworkClient("automq-cli", new AdminClientConfig(new Properties()), new Metrics(), Time.SYSTEM, new LogContext()); + NetworkClient client = CLIUtils.buildNetworkClient("automq-cli", new AdminClientConfig(properties), new Metrics(), Time.SYSTEM, new LogContext()); ClientStreamManager manager = new ClientStreamManager(client, nodeOptional.get()); List list; @@ -127,8 +143,4 @@ public Integer call() throws Exception { return 0; } - - private Optional findAnyNode(Admin admin) throws Exception { - return admin.describeCluster().nodes().get().stream().findFirst(); - } } diff --git a/automq-shell/src/main/java/com/automq/shell/command/Recreate.java b/automq-shell/src/main/java/com/automq/shell/command/Recreate.java index 6bdf43ae6f..d6c6f6eb57 100644 --- a/automq-shell/src/main/java/com/automq/shell/command/Recreate.java +++ b/automq-shell/src/main/java/com/automq/shell/command/Recreate.java @@ -20,6 +20,7 @@ import java.util.Properties; import java.util.concurrent.Callable; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -30,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import picocli.CommandLine; @CommandLine.Command(name = "recreate", description = "Discard all data and recreate partition(s).", mixinStandardHelpOptions = true) @@ -52,16 +54,30 @@ public class Recreate implements Callable { @Override public Integer call() throws Exception { Properties properties = new Properties(); + if (cli.commandConfig != null) { + try { + properties = Utils.loadProps(cli.commandConfig); + } catch (Exception e) { + System.err.println("Error loading command config file: " + ExceptionUtils.getRootCauseMessage(e)); + return 1; + } + } properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cli.bootstrapServer); Admin admin = Admin.create(properties); - Optional nodeOptional = findAnyNode(admin); + Optional nodeOptional; + try { + nodeOptional = admin.describeCluster().nodes().get().stream().findFirst(); + } catch (Exception e) { + System.err.println("Failed to get Kafka node: " + ExceptionUtils.getRootCauseMessage(e)); + return 1; + } if (nodeOptional.isEmpty()) { - System.err.println("No controller node found."); + System.err.println("No Kafka node found."); return 1; } - NetworkClient client = CLIUtils.buildNetworkClient("automq-cli", new AdminClientConfig(new Properties()), new Metrics(), Time.SYSTEM, new LogContext()); + NetworkClient client = CLIUtils.buildNetworkClient("automq-cli", new AdminClientConfig(properties), new Metrics(), Time.SYSTEM, new LogContext()); ClientKVClient clientKVClient = new ClientKVClient(client, nodeOptional.get()); if (StringUtils.isBlank(namespace)) { @@ -76,7 +92,7 @@ public Integer call() throws Exception { System.err.println("Topic " + topicName + " not found."); return 1; } catch (Exception e) { - System.err.println("Failed to describe topic " + topicName + ": " + e.getMessage()); + System.err.println("Failed to describe topic " + topicName + ": " + ExceptionUtils.getRootCauseMessage(e)); return 1; } @@ -112,8 +128,4 @@ public Integer call() throws Exception { private String formatStreamKey(String namespace, String topicId, int partition) { return namespace + "/" + topicId + "/" + partition; } - - private Optional findAnyNode(Admin admin) throws Exception { - return admin.describeCluster().nodes().get().stream().findFirst(); - } } diff --git a/bin/automq-cli.sh b/bin/automq-cli.sh new file mode 100755 index 0000000000..49130e9ddc --- /dev/null +++ b/bin/automq-cli.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh com.automq.shell.AutoMQCLI "$@" diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 8a48bc83e2..4cc4dff88c 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -365,7 +365,7 @@ + files="(StreamControlManager|S3StreamsMetadataImage|CompactionManagerTest|S3StreamMetricsManager|CompactionManager|BlockCache|DefaultS3BlockCache|StreamReader|DefaultS3Operator|S3Utils|AnomalyDetector|Recreate|ForceClose).java"/>