Skip to content

Commit

Permalink
MINOR: Revert incompatible behavior change to consumer reset tool (#4611
Browse files Browse the repository at this point in the history
)

This patch reverts the removal of the --execute option in the offset reset tool and the change to the default behavior when no options were present. For consistency, this patch adds the --execute flag to the streams reset tool, but keeps its current default behavior. A note has been added to both of these commands to warn the user that future default behavior will be to prompt before acting.

Test cases were not actually validating that offsets were committed when the --execute option was present, so I have fixed that and added basic assertions for the dry-run behavior. I also removed some duplicated test boilerplate.

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
hachikuji committed Feb 23, 2018
1 parent 0e22fd6 commit e26d0d7
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 428 deletions.
112 changes: 62 additions & 50 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,14 @@ object ConsumerGroupCommand extends Logging {
}

def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = {
print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
println()
println("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))

groupAssignmentsToReset.foreach {
case (consumerAssignment, offsetAndMetadata) =>
print("%-30s %-10s %-15s".format(
consumerAssignment.topic(),
consumerAssignment.partition(),
offsetAndMetadata.offset()))
println()
println("%-30s %-10s %-15s".format(
consumerAssignment.topic,
consumerAssignment.partition,
offsetAndMetadata.offset))
}
}

Expand Down Expand Up @@ -284,7 +282,7 @@ object ConsumerGroupCommand extends Logging {
protected def opts: ConsumerGroupCommandOptions

protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult =
getLogEndOffsets(Seq(topicPartition)).get(topicPartition).getOrElse(LogOffsetResult.Ignore)
getLogEndOffsets(Seq(topicPartition)).getOrElse(topicPartition, LogOffsetResult.Ignore)

protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult]

Expand Down Expand Up @@ -550,46 +548,43 @@ object ConsumerGroupCommand extends Logging {
// `consumer` is only needed for `describe`, so we instantiate it lazily
private var consumer: KafkaConsumer[String, String] = _

def listGroups(): List[String] = {
override def listGroups(): List[String] = {
adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
}

def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
override def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
val group = opts.options.valueOf(opts.groupOpt)
val consumerGroupSummary = adminClient.describeConsumerGroup(group, opts.options.valueOf(opts.timeoutMsOpt))
(Some(consumerGroupSummary.state),
consumerGroupSummary.consumers match {
case None =>
None
case Some(consumers) =>
var assignedTopicPartitions = Array[TopicPartition]()
val offsets = adminClient.listGroupOffsets(group)
val rowsWithConsumer =
if (offsets.isEmpty)
List[PartitionAssignmentState]()
else {
consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary =>
val topicPartitions = consumerSummary.assignment
assignedTopicPartitions = assignedTopicPartitions ++ consumerSummary.assignment
val partitionOffsets: Map[TopicPartition, Option[Long]] = consumerSummary.assignment.map { topicPartition =>
new TopicPartition(topicPartition.topic, topicPartition.partition) -> offsets.get(topicPartition)
}.toMap
collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), topicPartitions,
partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
}

val rowsWithoutConsumer = offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
case (topicPartition, offset) =>
collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicPartition),
Map(topicPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
val assignments = consumerGroupSummary.consumers.map { consumers =>
var assignedTopicPartitions = Array[TopicPartition]()
val offsets = adminClient.listGroupOffsets(group)
val rowsWithConsumer =
if (offsets.isEmpty)
List[PartitionAssignmentState]()
else {
consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size > _.assignment.size).flatMap { consumerSummary =>
val topicPartitions = consumerSummary.assignment
assignedTopicPartitions = assignedTopicPartitions ++ consumerSummary.assignment
val partitionOffsets: Map[TopicPartition, Option[Long]] = consumerSummary.assignment.map { topicPartition =>
new TopicPartition(topicPartition.topic, topicPartition.partition) -> offsets.get(topicPartition)
}.toMap
collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), topicPartitions,
partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
}

Some(rowsWithConsumer ++ rowsWithoutConsumer)
val rowsWithoutConsumer = offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
case (topicPartition, offset) =>
collectConsumerAssignment(group, Some(consumerGroupSummary.coordinator), Seq(topicPartition),
Map(topicPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
}

rowsWithConsumer ++ rowsWithoutConsumer
}
)

(Some(consumerGroupSummary.state), assignments)
}

override def collectGroupMembers(verbose: Boolean): (Option[String], Option[Seq[MemberAssignmentState]]) = {
Expand Down Expand Up @@ -680,7 +675,9 @@ object ConsumerGroupCommand extends Logging {
case "Empty" | "Dead" =>
val partitionsToReset = getPartitionsToReset(groupId)
val preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset)
val dryRun = opts.options.has(opts.dryRunOpt)

// Dry-run is the default behavior if --execute is not specified
val dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt)
if (!dryRun)
getConsumer.commitSync(preparedOffsets.asJava)
preparedOffsets
Expand Down Expand Up @@ -796,7 +793,7 @@ object ConsumerGroupCommand extends Logging {
val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset) =
partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))

val preparedOffsetsForParititionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
(topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
case Some(offset) => offset
case _ => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition")
Expand All @@ -808,7 +805,7 @@ object ConsumerGroupCommand extends Logging {
case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}

preparedOffsetsForParititionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset
preparedOffsetsForPartitionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset
} else {
CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
}
Expand Down Expand Up @@ -838,7 +835,7 @@ object ConsumerGroupCommand extends Logging {
}

override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = {
val rows = assignmentsToReset.map { case (k,v) => s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): List[String]
val rows = assignmentsToReset.map { case (k,v) => s"${k.topic},${k.partition},${v.offset}" }(collection.breakOut): List[String]
rows.foldRight("")(_ + "\n" + _)
}

Expand Down Expand Up @@ -899,10 +896,13 @@ object ConsumerGroupCommand extends Logging {
"or is going through some changes)."
val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + nl +
"Has 3 execution options: (default) to plan which offsets to reset, --execute to execute the reset-offsets process, and --export to export the results to a CSV format." + nl +
"Has the following scenarios to choose: --to-datetime, --by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario must be choose" + nl +
"To define the scope use: --all-topics or --topic. . One scope must be choose, unless you use '--from-file' scenario"
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " +
"Additionally, the --export option is used to export the results to a CSV format." + nl +
"You must choose one of the following reset specifications: --to-datetime, --by-period, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current." + nl +
"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'."
val DryRunDoc = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets."
val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
val ExportDoc = "Export operation execution to a CSV file. Supported operations: reset-offsets."
val ResetToOffsetDoc = "Reset offsets to a specific offset."
val ResetFromFileDoc = "Reset offsets to values defined in CSV file."
Expand Down Expand Up @@ -955,6 +955,7 @@ object ConsumerGroupCommand extends Logging {
.ofType(classOf[String])
val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc)
val dryRunOpt = parser.accepts("dry-run", DryRunDoc)
val executeOpt = parser.accepts("execute", ExecuteDoc)
val exportOpt = parser.accepts("export", ExportDoc)
val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc)
.withRequiredArg()
Expand Down Expand Up @@ -1029,8 +1030,19 @@ object ConsumerGroupCommand extends Logging {
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)

if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt))
if (options.has(resetOffsetsOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt either takes $groupOpt, $topicOpt, or both")

if (options.has(resetOffsetsOpt)) {
if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
Console.err.println("WARN: In a future major release, the default behavior of this command will be to " +
"prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run " +
"option explicitly if you are scripting this command and want to keep the current default behavior " +
"without prompting.")
}

if (options.has(dryRunOpt) && options.has(executeOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $resetOffsetsOpt only accepts one of $executeOpt and $dryRunOpt")

CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt)
Expand All @@ -1040,7 +1052,7 @@ object ConsumerGroupCommand extends Logging {
CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)

}

// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
Expand Down
56 changes: 37 additions & 19 deletions core/src/main/scala/kafka/tools/StreamsResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class StreamsResetter {
private static OptionSpec<String> fromFileOption;
private static OptionSpec<Long> shiftByOption;
private static OptionSpecBuilder dryRunOption;
private static OptionSpecBuilder executeOption;
private static OptionSpec<String> commandConfigOption;

private OptionSet options = null;
Expand All @@ -109,6 +110,7 @@ public int run(final String[] args,

try {
parseArguments(args);

final boolean dryRun = options.has(dryRunOption);

final String groupId = options.valueOf(applicationIdOption);
Expand Down Expand Up @@ -207,6 +209,7 @@ private void parseArguments(final String[] args) throws IOException {
.withRequiredArg()
.ofType(String.class)
.describedAs("file name");
executeOption = optionParser.accepts("execute", "Execute the command.");
dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");

// TODO: deprecated in 1.0; can be removed eventually
Expand All @@ -219,6 +222,16 @@ private void parseArguments(final String[] args) throws IOException {
throw e;
}

if (options.has(executeOption) && options.has(dryRunOption)) {
CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
}

if (!options.has(executeOption) && !options.has(dryRunOption)) {
System.err.println("WARN: In a future major release, the default behavior of this command will be to " +
"prompt the user before executing the reset. You should add the --execute option explicitly if " +
"you are scripting this command and want to keep the current default behavior without prompting.");
}

scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
allScenarioOptions.$plus(toOffsetOption);
allScenarioOptions.$plus(toDatetimeOption);
Expand Down Expand Up @@ -266,7 +279,6 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consum
notFoundInputTopics.add(topic);
} else {
topicsToSubscribe.add(topic);

}
}
for (final String topic : intermediateTopics) {
Expand All @@ -277,6 +289,28 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consum
}
}

if (!notFoundInputTopics.isEmpty()) {
System.out.println("Following input topics are not found, skipping them");
for (final String topic : notFoundInputTopics) {
System.out.println("Topic: " + topic);
}
topicNotFound = EXIT_CODE_ERROR;
}

if (!notFoundIntermediateTopics.isEmpty()) {
System.out.println("Following intermediate topics are not found, skipping them");
for (final String topic : notFoundIntermediateTopics) {
System.out.println("Topic:" + topic);
}
topicNotFound = EXIT_CODE_ERROR;
}

// Return early if there are no topics to reset (the consumer will raise an error if we
// try to poll with an empty subscription)
if (topicsToSubscribe.isEmpty()) {
return topicNotFound;
}

final Properties config = new Properties();
config.putAll(consumerConfig);
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Expand Down Expand Up @@ -311,22 +345,6 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consum
}
client.commitSync();
}

if (notFoundInputTopics.size() > 0) {
System.out.println("Following input topics are not found, skipping them");
for (final String topic : notFoundInputTopics) {
System.out.println("Topic: " + topic);
}
topicNotFound = EXIT_CODE_ERROR;
}

if (notFoundIntermediateTopics.size() > 0) {
System.out.println("Following intermediate topics are not found, skipping them");
for (final String topic : notFoundIntermediateTopics) {
System.out.println("Topic:" + topic);
}
}

} catch (final Exception e) {
System.err.println("ERROR: Resetting offsets failed.");
throw e;
Expand All @@ -337,8 +355,8 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consum

// visible for testing
public void maybeSeekToEnd(final String groupId,
final Consumer<byte[], byte[]> client,
final Set<TopicPartition> intermediateTopicPartitions) {
final Consumer<byte[], byte[]> client,
final Set<TopicPartition> intermediateTopicPartitions) {
if (intermediateTopicPartitions.size() > 0) {
System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
for (final TopicPartition topicPartition : intermediateTopicPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
}

def committedOffsets(topic: String = topic, group: String = group): Map[TopicPartition, Long] = {
val props = new Properties
props.put("bootstrap.servers", brokerList)
props.put("group.id", group)
val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
try {
consumer.partitionsFor(topic).asScala.flatMap { partitionInfo =>
val tp = new TopicPartition(partitionInfo.topic, partitionInfo.partition)
val committed = consumer.committed(tp)
if (committed == null)
None
else
Some(tp -> committed.offset)
}.toMap
} finally {
consumer.close()
}
}

def stopRandomOldConsumer(): Unit = {
oldConsumers.head.stop()
}
Expand Down
Loading

0 comments on commit e26d0d7

Please sign in to comment.