Skip to content

Commit

Permalink
Remove bootstrap-server option and deprecation warnings
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
  • Loading branch information
fvaleri committed Jul 21, 2023
1 parent b904cda commit 2eef0e6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 55 deletions.
Expand Up @@ -16,19 +16,17 @@
*/
package org.apache.kafka.server.util;

import joptsimple.OptionParser;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Utils;

import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class ToolsUtils {

/**
* print out the metrics in alphabetical order
* @param metrics the metrics to be printed out
Expand Down Expand Up @@ -102,25 +100,4 @@ public static void prettyPrintTable(
printRow(columnLengths, headers, out);
rows.forEach(row -> printRow(columnLengths, row, out));
}

public static void validateBootstrapServer(OptionParser parser, String bootstrapServer) {
if (parser == null || bootstrapServer == null) {
throw new RuntimeException("No option parser or bootstrap server found");
}
if (bootstrapServer.isEmpty()) {
CommandLineUtils.printUsageAndExit(parser, "Empty bootstrap server option");
}

String[] hostPorts;
if (bootstrapServer.contains(",")) hostPorts = bootstrapServer.split(",");
else hostPorts = new String[]{bootstrapServer};

String[] validHostPort = Arrays.stream(hostPorts)
.filter(hostPortData -> Utils.getPort(hostPortData) != null)
.toArray(String[]::new);

if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) {
CommandLineUtils.printUsageAndExit(parser, "Invalid bootstrap server option");
}
}
}
2 changes: 1 addition & 1 deletion tests/kafkatest/services/replica_verification_tool.py
Expand Up @@ -72,7 +72,7 @@ def get_lag_for_partition(self, topic, partition):

def start_cmd(self, node):
cmd = self.path.script("kafka-replica-verification.sh", node)
cmd += " --bootstrap-server %s --topics-include %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)
cmd += " --broker-list %s --topics-include %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)

cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &"
return cmd
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.tools;

import joptsimple.OptionParser;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
Expand Down Expand Up @@ -55,13 +56,13 @@
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.server.util.ToolsUtils;
import org.apache.kafka.server.util.TopicFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -113,9 +114,9 @@ public static void main(String[] args) {
ReplicaVerificationToolOptions options = new ReplicaVerificationToolOptions(args);
// getting topic metadata
LOG.info("Getting topic metadata...");
String bootstrapServer = options.brokerHostsAndPorts();
String brokerList = options.brokerHostsAndPorts();

try (Admin adminClient = createAdminClient(bootstrapServer)) {
try (Admin adminClient = createAdminClient(brokerList)) {
Collection<TopicDescription> topicsMetadata = listTopicsMetadata(adminClient);
Map<Integer, Node> brokerInfo = brokerDetails(adminClient);

Expand Down Expand Up @@ -165,7 +166,7 @@ public static void main(String[] args) {
)
.collect(Collectors.toList());

Properties consumerProps = consumerConfig(bootstrapServer);
Properties consumerProps = consumerConfig(brokerList);

ReplicaBuffer replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
initialOffsets(topicPartitions, consumerProps, options.initialOffsetTime()),
Expand Down Expand Up @@ -250,15 +251,14 @@ private static Collection<TopicDescription> listTopicsMetadata(Admin adminClient
return adminClient.describeTopics(topics).allTopicNames().get().values();
}

private static Admin createAdminClient(String bootstrapServer) {
private static Admin createAdminClient(String brokerList) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return Admin.create(props);
}

private static class ReplicaVerificationToolOptions extends CommandDefaultOptions {
private final OptionSpec<String> brokerListOpt;
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Integer> maxWaitMsOpt;
private final OptionSpec<String> topicWhiteListOpt;
Expand All @@ -268,16 +268,10 @@ private static class ReplicaVerificationToolOptions extends CommandDefaultOption

ReplicaVerificationToolOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server",
"REQUIRED. The list of hostname and port of the server to connect to.")
brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
.withRequiredArg()
.describedAs("hostname:port,...,hostname:port")
.ofType(String.class);
brokerListOpt = parser.accepts("broker-list", "DEPRECATED. Use --bootstrap-server. " +
"The list of hostname and port of the server to connect to.")
.withOptionalArg()
.describedAs("hostname:port,...,hostname:port")
.ofType(String.class);
fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
.withRequiredArg()
.describedAs("bytes")
Expand All @@ -288,8 +282,8 @@ private static class ReplicaVerificationToolOptions extends CommandDefaultOption
.describedAs("ms")
.ofType(Integer.class)
.defaultsTo(1_000);
topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED. Use --topics-include. " +
"List of topics to verify replica consistency.")
topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED use --topics-include instead; " +
"ignored if --topics-include specified. List of topics to verify replica consistency.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class)
Expand All @@ -316,27 +310,37 @@ private static class ReplicaVerificationToolOptions extends CommandDefaultOption
if (options.has(versionOpt)) {
CommandLineUtils.printVersionAndExit();
}
CommandLineUtils.checkInvalidArgs(parser, options, bootstrapServerOpt, brokerListOpt);
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt);
CommandLineUtils.checkInvalidArgs(parser, options, topicsIncludeOpt, topicWhiteListOpt);
if (!options.has(bootstrapServerOpt) && !options.has(brokerListOpt)) {
CommandLineUtils.printUsageAndExit(parser, format("The %s option is required", bootstrapServerOpt));
}
if (options.has(brokerListOpt)) {
System.out.printf("WARNING: The %s option is deprecated and will be removed. " +
"Use the %s option with the same syntax.%n", brokerListOpt, bootstrapServerOpt);
}
if (options.has(topicWhiteListOpt)) {
System.out.printf("WARNING: The %s option is deprecated and will be removed. " +
"Use the %s option with the same syntax.%n", topicWhiteListOpt, topicsIncludeOpt);
}
}

String brokerHostsAndPorts() {
String brokerList = options.valueOf(options.has(bootstrapServerOpt) ? bootstrapServerOpt : brokerListOpt);
ToolsUtils.validateBootstrapServer(parser, brokerList);
String brokerList = options.valueOf(brokerListOpt);
validateBrokerList(parser, brokerList);
return brokerList;
}

void validateBrokerList(OptionParser parser, String brokerList) {
if (parser == null || brokerList == null) {
throw new RuntimeException("No option parser or broker list found");
}
if (brokerList.isEmpty()) {
CommandLineUtils.printUsageAndExit(parser, "Empty broker list option");
}

String[] hostPorts;
if (brokerList.contains(",")) hostPorts = brokerList.split(",");
else hostPorts = new String[]{brokerList};

String[] validHostPort = Arrays.stream(hostPorts)
.filter(hostPortData -> Utils.getPort(hostPortData) != null)
.toArray(String[]::new);

if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) {
CommandLineUtils.printUsageAndExit(parser, "Invalid broker list option");
}
}

TopicFilter.IncludeList topicsIncludeFilter() {
String regex = options.valueOf(options.has(topicsIncludeOpt) ? topicsIncludeOpt : topicWhiteListOpt);
try {
Expand Down

0 comments on commit 2eef0e6

Please sign in to comment.