From 1ff54f8355d697d1b6f1b0a29c3e8c86342fc6aa Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Mon, 8 May 2017 13:03:31 -0700 Subject: [PATCH 1/2] KAFKA-5166: Add option dry run to Streams application reset tool --- .../scala/kafka/tools/StreamsResetter.java | 104 +++++++++++++++--- 1 file changed, 91 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 83166cddb6fb5..69e5b71afe734 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -16,10 +16,18 @@ */ package kafka.tools; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; import joptsimple.OptionException; import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; import kafka.admin.AdminClient; import kafka.admin.TopicCommand; import kafka.utils.ZkUtils; @@ -31,13 +39,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.Set; - /** * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. *

@@ -69,6 +70,7 @@ public class StreamsResetter { private static OptionSpec applicationIdOption; private static OptionSpec inputTopicsOption; private static OptionSpec intermediateTopicsOption; + private static OptionSpecBuilder dryRunOption; private OptionSet options = null; private final Properties consumerConfig = new Properties(); @@ -91,10 +93,7 @@ public int run(final String[] args, final Properties config) { adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption)); final String groupId = options.valueOf(applicationIdOption); - if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + - "Make sure to stop all running application instances before running the reset tool."); - } + zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption), 30000, @@ -104,8 +103,19 @@ public int run(final String[] args, final Properties config) { allTopics.clear(); allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - resetInputAndInternalAndSeekToEndIntermediateTopicOffsets(); - deleteInternalTopics(zkUtils); + Boolean dryRun = options.has(dryRunOption); + + if (!dryRun) { + if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + + "Make sure to stop all running application instances before running the reset tool."); + } + resetInputAndInternalAndSeekToEndIntermediateTopicOffsets(); + deleteInternalTopics(zkUtils); + } else { + dryRun(); + } + } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; System.err.println("ERROR: " + e.getMessage()); @@ -121,6 +131,73 @@ public int run(final String[] args, final Properties config) { return exitCode; } + private void dryRun() { + final List inputTopics = options.valuesOf(inputTopicsOption); + final List intermediateTopics = options.valuesOf(intermediateTopicsOption); + + System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----"); + + List notfoundInputTopics = new ArrayList<>(); + List notFoundIntermediateTopics = new ArrayList<>(); + List internalTopics = new ArrayList<>(); + + if (inputTopics.size() > 0) { + System.out.println("Following input topics offsets will be reset to beginning"); + for (final String topic : inputTopics) { + if (allTopics.contains(topic)) { + System.out.println("Topic - " + topic); + } else { + notfoundInputTopics.add(topic); + } + } + } + + if (intermediateTopics.size() > 0) { + System.out.println("Following intermediate topics offsets will be reset to end"); + for (final String topic : intermediateTopics) { + if (!allTopics.contains(topic)) { + System.err.println("Topic - " + topic); + } else { + notFoundIntermediateTopics.add(topic); + } + } + } + + for (final String topic : allTopics) { + if (isInternalTopic(topic)) { + internalTopics.add(topic); + } + } + + if (internalTopics.size() > 0) { + System.out.println("Following internal topics offsets will be reset to beginning"); + for (String topic : internalTopics) { + System.out.println("Topic - " + topic); + } + } + + if (internalTopics.size() > 0) { + System.out.println("Following internal topics will be deleted"); + for (String topic : internalTopics) { + System.out.println("Topic - " + topic); + } + } + + if (notfoundInputTopics.size() > 0) { + System.out.println("Following input topics are not found, skipping them"); + for (String topic : notfoundInputTopics) { + System.out.println("Topic - " + topic); + } + } + + if (notFoundIntermediateTopics.size() > 0) { + System.out.println("Following intermediate topics are not found, skipping them"); + for (String topic : notFoundIntermediateTopics) { + System.out.println("Topic - " + topic); + } + } + } + private void parseArguments(final String[] args) throws IOException { final OptionParser optionParser = new OptionParser(); applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)") @@ -148,6 +225,7 @@ private void parseArguments(final String[] args) throws IOException { .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); + dryRunOption = optionParser.accepts("dry-run", "Option to indicate to run streams reset tool to display actions it will perform"); try { options = optionParser.parse(args); From d63e9862435c97aac92fa0d8d706a08c4cb90712 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Mon, 8 May 2017 14:00:43 -0700 Subject: [PATCH 2/2] Updated to add consumer group in description --- core/src/main/scala/kafka/tools/StreamsResetter.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 69e5b71afe734..5fd6ccf44d39e 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -134,6 +134,7 @@ public int run(final String[] args, final Properties config) { private void dryRun() { final List inputTopics = options.valuesOf(inputTopicsOption); final List intermediateTopics = options.valuesOf(intermediateTopicsOption); + String groupId = options.valueOf(applicationIdOption); System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----"); @@ -142,7 +143,7 @@ private void dryRun() { List internalTopics = new ArrayList<>(); if (inputTopics.size() > 0) { - System.out.println("Following input topics offsets will be reset to beginning"); + System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")"); for (final String topic : inputTopics) { if (allTopics.contains(topic)) { System.out.println("Topic - " + topic); @@ -153,7 +154,7 @@ private void dryRun() { } if (intermediateTopics.size() > 0) { - System.out.println("Following intermediate topics offsets will be reset to end"); + System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")"); for (final String topic : intermediateTopics) { if (!allTopics.contains(topic)) { System.err.println("Topic - " + topic); @@ -170,7 +171,7 @@ private void dryRun() { } if (internalTopics.size() > 0) { - System.out.println("Following internal topics offsets will be reset to beginning"); + System.out.println("Following internal topics offsets will be reset to beginning (for consumer group " + groupId + ")"); for (String topic : internalTopics) { System.out.println("Topic - " + topic); }