From 2eef0e611edf3e456782db79e76f314c8fd44aec Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Fri, 21 Jul 2023 14:51:28 +0200 Subject: [PATCH] Remove bootstrap-server option and deprecation warnings Signed-off-by: Federico Valeri --- .../apache/kafka/server/util/ToolsUtils.java | 25 +------- .../services/replica_verification_tool.py | 2 +- .../kafka/tools/ReplicaVerificationTool.java | 64 ++++++++++--------- 3 files changed, 36 insertions(+), 55 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java index f71abb209aa5..b14f079b15cd 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java @@ -16,19 +16,17 @@ */ package org.apache.kafka.server.util; -import joptsimple.OptionParser; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.utils.Utils; import java.io.PrintStream; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; public class ToolsUtils { + /** * print out the metrics in alphabetical order * @param metrics the metrics to be printed out @@ -102,25 +100,4 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } - - public static void validateBootstrapServer(OptionParser parser, String bootstrapServer) { - if (parser == null || bootstrapServer == null) { - throw new RuntimeException("No option parser or bootstrap server found"); - } - if (bootstrapServer.isEmpty()) { - CommandLineUtils.printUsageAndExit(parser, "Empty bootstrap server option"); - } - - String[] hostPorts; - if (bootstrapServer.contains(",")) hostPorts = bootstrapServer.split(","); - else hostPorts = new String[]{bootstrapServer}; - - String[] validHostPort = Arrays.stream(hostPorts) - .filter(hostPortData -> Utils.getPort(hostPortData) != null) - .toArray(String[]::new); - - if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) { - CommandLineUtils.printUsageAndExit(parser, "Invalid bootstrap server option"); - } - } } diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index ea2804b0adaa..ecc47b2a6dfb 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -72,7 +72,7 @@ def get_lag_for_partition(self, topic, partition): def start_cmd(self, node): cmd = self.path.script("kafka-replica-verification.sh", node) - cmd += " --bootstrap-server %s --topics-include %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms) + cmd += " --broker-list %s --topics-include %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms) cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &" return cmd diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java index e455bc7d04ec..187609cabbae 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tools; +import joptsimple.OptionParser; import joptsimple.OptionSpec; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; @@ -55,13 +56,13 @@ import org.apache.kafka.server.util.CommandDefaultOptions; import org.apache.kafka.server.util.CommandLineUtils; import org.apache.kafka.server.util.ShutdownableThread; -import org.apache.kafka.server.util.ToolsUtils; import org.apache.kafka.server.util.TopicFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.SocketTimeoutException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -113,9 +114,9 @@ public static void main(String[] args) { ReplicaVerificationToolOptions options = new ReplicaVerificationToolOptions(args); // getting topic metadata LOG.info("Getting topic metadata..."); - String bootstrapServer = options.brokerHostsAndPorts(); + String brokerList = options.brokerHostsAndPorts(); - try (Admin adminClient = createAdminClient(bootstrapServer)) { + try (Admin adminClient = createAdminClient(brokerList)) { Collection topicsMetadata = listTopicsMetadata(adminClient); Map brokerInfo = brokerDetails(adminClient); @@ -165,7 +166,7 @@ public static void main(String[] args) { ) .collect(Collectors.toList()); - Properties consumerProps = consumerConfig(bootstrapServer); + Properties consumerProps = consumerConfig(brokerList); ReplicaBuffer replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition, initialOffsets(topicPartitions, consumerProps, options.initialOffsetTime()), @@ -250,15 +251,14 @@ private static Collection listTopicsMetadata(Admin adminClient return adminClient.describeTopics(topics).allTopicNames().get().values(); } - private static Admin createAdminClient(String bootstrapServer) { + private static Admin createAdminClient(String brokerList) { Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList); return Admin.create(props); } private static class ReplicaVerificationToolOptions extends CommandDefaultOptions { private final OptionSpec brokerListOpt; - private final OptionSpec bootstrapServerOpt; private final OptionSpec fetchSizeOpt; private final OptionSpec maxWaitMsOpt; private final OptionSpec topicWhiteListOpt; @@ -268,16 +268,10 @@ private static class ReplicaVerificationToolOptions extends CommandDefaultOption ReplicaVerificationToolOptions(String[] args) { super(args); - bootstrapServerOpt = parser.accepts("bootstrap-server", - "REQUIRED. The list of hostname and port of the server to connect to.") + brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") .withRequiredArg() .describedAs("hostname:port,...,hostname:port") .ofType(String.class); - brokerListOpt = parser.accepts("broker-list", "DEPRECATED. Use --bootstrap-server. " + - "The list of hostname and port of the server to connect to.") - .withOptionalArg() - .describedAs("hostname:port,...,hostname:port") - .ofType(String.class); fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") .withRequiredArg() .describedAs("bytes") @@ -288,8 +282,8 @@ private static class ReplicaVerificationToolOptions extends CommandDefaultOption .describedAs("ms") .ofType(Integer.class) .defaultsTo(1_000); - topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED. Use --topics-include. " + - "List of topics to verify replica consistency.") + topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED use --topics-include instead; " + + "ignored if --topics-include specified. List of topics to verify replica consistency.") .withRequiredArg() .describedAs("Java regex (String)") .ofType(String.class) @@ -316,27 +310,37 @@ private static class ReplicaVerificationToolOptions extends CommandDefaultOption if (options.has(versionOpt)) { CommandLineUtils.printVersionAndExit(); } - CommandLineUtils.checkInvalidArgs(parser, options, bootstrapServerOpt, brokerListOpt); + CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt); CommandLineUtils.checkInvalidArgs(parser, options, topicsIncludeOpt, topicWhiteListOpt); - if (!options.has(bootstrapServerOpt) && !options.has(brokerListOpt)) { - CommandLineUtils.printUsageAndExit(parser, format("The %s option is required", bootstrapServerOpt)); - } - if (options.has(brokerListOpt)) { - System.out.printf("WARNING: The %s option is deprecated and will be removed. " + - "Use the %s option with the same syntax.%n", brokerListOpt, bootstrapServerOpt); - } - if (options.has(topicWhiteListOpt)) { - System.out.printf("WARNING: The %s option is deprecated and will be removed. " + - "Use the %s option with the same syntax.%n", topicWhiteListOpt, topicsIncludeOpt); - } } String brokerHostsAndPorts() { - String brokerList = options.valueOf(options.has(bootstrapServerOpt) ? bootstrapServerOpt : brokerListOpt); - ToolsUtils.validateBootstrapServer(parser, brokerList); + String brokerList = options.valueOf(brokerListOpt); + validateBrokerList(parser, brokerList); return brokerList; } + void validateBrokerList(OptionParser parser, String brokerList) { + if (parser == null || brokerList == null) { + throw new RuntimeException("No option parser or broker list found"); + } + if (brokerList.isEmpty()) { + CommandLineUtils.printUsageAndExit(parser, "Empty broker list option"); + } + + String[] hostPorts; + if (brokerList.contains(",")) hostPorts = brokerList.split(","); + else hostPorts = new String[]{brokerList}; + + String[] validHostPort = Arrays.stream(hostPorts) + .filter(hostPortData -> Utils.getPort(hostPortData) != null) + .toArray(String[]::new); + + if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) { + CommandLineUtils.printUsageAndExit(parser, "Invalid broker list option"); + } + } + TopicFilter.IncludeList topicsIncludeFilter() { String regex = options.valueOf(options.has(topicsIncludeOpt) ? topicsIncludeOpt : topicWhiteListOpt); try {