Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5225: StreamsResetter doesn't allow custom Consumer properties #3970

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ project(':streams') {
testCompile project(':core').sourceSets.test.output
testCompile libs.junit
testCompile libs.easymock
testCompile libs.bcpkix

testRuntime libs.slf4jlog4j
}
Expand Down
60 changes: 34 additions & 26 deletions core/src/main/scala/kafka/tools/StreamsResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
*/
package kafka.tools;


import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
Expand All @@ -27,9 +31,11 @@
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -38,12 +44,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;

/**
* {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch.
* <p>
Expand Down Expand Up @@ -71,25 +71,22 @@ public class StreamsResetter {
private static final int EXIT_CODE_ERROR = 1;

private static OptionSpec<String> bootstrapServerOption;
private static OptionSpecBuilder zookeeperOption;
private static OptionSpec<String> applicationIdOption;
private static OptionSpec<String> inputTopicsOption;
private static OptionSpec<String> intermediateTopicsOption;
private static OptionSpecBuilder dryRunOption;
private static OptionSpec<String> commandConfigOption;

private OptionSet options = null;
private final Properties consumerConfig = new Properties();
private final List<String> allTopics = new LinkedList<>();
private boolean dryRun = false;

public int run(final String[] args) {
return run(args, new Properties());
}

public int run(final String[] args, final Properties config) {
consumerConfig.clear();
consumerConfig.putAll(config);

public int run(final String[] args,
final Properties config) {
int exitCode = EXIT_CODE_SUCCESS;

KafkaAdminClient kafkaAdminClient = null;
Expand All @@ -99,20 +96,25 @@ public int run(final String[] args, final Properties config) {
dryRun = options.has(dryRunOption);

final String groupId = options.valueOf(applicationIdOption);
final Properties properties = new Properties();
if (options.has(commandConfigOption)) {
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
}
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));

validateNoActiveConsumers(groupId);

final Properties adminClientProperties = new Properties();
adminClientProperties.put("bootstrap.servers", options.valueOf(bootstrapServerOption));
kafkaAdminClient = (KafkaAdminClient) AdminClient.create(adminClientProperties);
validateNoActiveConsumers(groupId, properties);
kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties);

allTopics.clear();
allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS));

if (dryRun) {
System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
}
maybeResetInputAndSeekToEndIntermediateTopicOffsets();

final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
consumerConfig.putAll(properties);
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig);
maybeDeleteInternalTopics(kafkaAdminClient);

} catch (final Throwable e) {
Expand All @@ -128,10 +130,11 @@ public int run(final String[] args, final Properties config) {
return exitCode;
}

private void validateNoActiveConsumers(final String groupId) {
private void validateNoActiveConsumers(final String groupId,
final Properties properties) {
kafka.admin.AdminClient olderAdminClient = null;
try {
olderAdminClient = kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
olderAdminClient = kafka.admin.AdminClient.create(properties);
if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active. "
+ "Make sure to stop all running application instances before running the reset tool.");
Expand All @@ -156,8 +159,6 @@ private void parseArguments(final String[] args) throws IOException {
.ofType(String.class)
.defaultsTo("localhost:9092")
.describedAs("urls");
zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");

inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.")
.withRequiredArg()
.ofType(String.class)
Expand All @@ -168,8 +169,15 @@ private void parseArguments(final String[] args) throws IOException {
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.")
.withRequiredArg()
.ofType(String.class)
.describedAs("file name");
dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");

// TODO: deprecated in 1.0; can be removed eventually
optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");

try {
options = optionParser.parse(args);
} catch (final OptionException e) {
Expand All @@ -178,7 +186,7 @@ private void parseArguments(final String[] args) throws IOException {
}
}

private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig) {
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);

Expand Down Expand Up @@ -219,7 +227,6 @@ private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {

final Properties config = new Properties();
config.putAll(consumerConfig);
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Expand Down Expand Up @@ -274,7 +281,8 @@ private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
System.out.println("Done.");
}

private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client, final Set<TopicPartition> intermediateTopicPartitions) {
private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client,
final Set<TopicPartition> intermediateTopicPartitions) {

final String groupId = options.valueOf(applicationIdOption);
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
Expand Down