-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4743: Add Reset Consumer Group Offsets tooling [KIP-122] #2624
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I had a quick look and left some comments (note that I didn't do a proper review).
val consumer = getConsumer() | ||
consumer.assign(List(topicPartition).asJava) | ||
consumer.seekToBeginning(List(topicPartition).asJava) | ||
val logStartOffset = consumer.position(topicPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you use beginningOffsets
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it look something like this right?
val offsets = consumer.beginningOffsets(List(topicPartition).asJava)
val logStartOffset = offsets.get(topicPartition)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: Long): LogOffsetResult = { | ||
val consumer = getConsumer() | ||
consumer.assign(List(topicPartition).asJava) | ||
val offsetForTimestamp = Option(consumer.offsetsForTimes(Map(topicPartition -> timestamp.asInstanceOf[java.lang.Long]).asJava)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of asInstanceOf
, you can do timestamp: java.lang.Long
which is checked by the compiler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why are we wrapping the result of offsetsForTimes
in an Option
? It can never be null
.
package unit.kafka.admin | ||
|
||
import java.io.{BufferedWriter, File, FileWriter} | ||
import java.time.{Duration, LocalDateTime, ZonedDateTime} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still support Java 7, so we cannot use these classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, didn't know that KIP-118 was moved to 0.12. ok, I will check it.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
A few terminology corrrections and simplifications
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
(topicPartition, resetPlan(topicPartition)) | ||
else null | ||
}.toMap | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this case isn't actually allowed by the command. Maybe we can raise an IllegalArgumentException
to make that explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake. It is allowed, but I'm wondering if it is useful.
val topics = opts.options.valuesOf(opts.topicOpt).asScala | ||
parseTopicPartitionsToReset(topics) | ||
} else { | ||
CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you provide a reset csv file, isn't the scope unnecessary?
} | ||
|
||
private def prepareOffsetsToReset(groupId: String, partitionsToReset: Iterable[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = { | ||
if (opts.options.has(opts.resetToOffsetOpt)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to check that the provided offset is in range as we do for the shift option. Same for the CSV case.
partitionsToReset.map { topicPartition => | ||
currentCommittedOffsets.get(topicPartition).map { offset => | ||
(topicPartition, new OffsetAndMetadata(offset)) | ||
}.orNull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guilty for a bug here. Instead of .orNull
, it should be .getOrElse((topicPartition, null))
. Sorry!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, now that I'm thinking about it, there are a lot of cases in here where we return null
when we don't know what offset to use. If that doesn't raise an NPE in toMap
, it will probably raise one in commitSync
. We probably need to filter out all partitions like this. Alternatively, maybe we could just raise an exception if we hit one of these cases? I feel it would be a little unsafe to go through with the offset commit with only some of the expected partitions.
@@ -566,10 +801,19 @@ object ConsumerGroupCommand extends Logging { | |||
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) | |||
if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) | |||
CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt)) | |||
if (options.has(resetOffsetsOpt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we assert that at least one of allResetOffsetScenarioOpts
is provided, then we don't need the weird case above where we reset to the current offsets.
AdminUtils.createTopic(zkUtils, topic1, 1, 1) | ||
|
||
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") | ||
val checkpoint = new Date() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is unused
|
||
@Test | ||
def testResetOffsetsByDurationToEarliest() { | ||
val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's easier to read this when you put the arguments on separate lines like you did for some of the test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeqo Thanks for the patch. I'm going to go ahead and merge this as is. I've created https://issues.apache.org/jira/browse/KAFKA-5266 for the additional improvements I suggested. If you have time, I would appreciate if you would pick that up.
@hachikuji Cool, I will check it :) Thanks! |
No description provided.