New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Follow-up improvements for consumer offset reset tool (KIP-122) #3102
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Left a few comments.
} else if (opts.options.has(opts.topicOpt)) { | ||
val topics = opts.options.valuesOf(opts.topicOpt).asScala | ||
parseTopicPartitionsToReset(topics) | ||
val topicPartitions = parseTopicPartitionsToReset(topics) | ||
allTopicPartitions.filter(topicPartition => topicPartitions.contains(topicPartition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get why we filter based on current committed offsets. Why would we not permit the tool to set offsets for a topic partition which hasn't been previously consumed? If that is really the behavior we want, I would prefer to raise an exception rather than silently proceeding with only some of the offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't considered setting offsets on topic partitions unassigned previously, sounds like a good idea (e.g. set initial offsets for a new consumer group). if we add this, checking previous assignments won't be required, right? And implementation will be cleaner. Maybe we can add this as part of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it is a reasonable feature to implement here and users might find it a little surprising if they couldn't do it.
if (resetPlan.keySet.contains(topicPartition)) | ||
(topicPartition, resetPlan(topicPartition)) | ||
else null | ||
resetPlan.keySet.map { topicPartition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a comment here for lack of a better place. It seems we still require either --all-topics or --topic to use the CSV option?
@@ -650,6 +646,23 @@ object ConsumerGroupCommand extends Logging { | |||
} | |||
} | |||
|
|||
private def checkOffsetRange(topicPartition: TopicPartition, shiftedOffset: Long) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shiftedOffset
is no longer correct since we use this in more scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I'll change it to offset
@@ -650,6 +646,23 @@ object ConsumerGroupCommand extends Logging { | |||
} | |||
} | |||
|
|||
private def checkOffsetRange(topicPartition: TopicPartition, shiftedOffset: Long) = { | |||
val newOffset = getLogEndOffset(topicPartition) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't really need the newOffest
variable. We can return from the match directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we don't handle errors in getLogEndOffset
, which seems a bit unsafe. Maybe we should check for the Unknown
and Ignore
cases and raise an error. In general, this is the point I was trying to make: rather than crossing our fingers and trying to go ahead with the reset, It is safer to fail fast on errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. This should also apply to the null
s returnedon #prepareOffsetsToReset
, and instead return #printUsageAndDie
. Do you agree?
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…/KAFKA-5266 # Conflicts: # core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@hachikuji @jeqo What is the status of this PR? |
@ijuma Was planning to look at it shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. LGTM. There is some commented out code that I will remove when merging.
…P-122) Implement improvements defined here: https://issues.apache.org/jira/browse/KAFKA-5266 Author: Jorge Quilcate Otoya <quilcate.jorge@gmail.com> Reviewers: Jason Gustafson <jason@confluent.io> Closes #3102 from jeqo/feature/KAFKA-5266 (cherry picked from commit ef95512) Signed-off-by: Jason Gustafson <jason@confluent.io>
Hi. One more improvement to the tool - return nonzero code when operation is failed. For example, when there are consumers in the group and reset cannot be done. |
@DXist Thanks for the suggestion. Would you mind opening a JIRA? https://issues.apache.org/jira/projects/KAFKA |
Implement improvements defined here: https://issues.apache.org/jira/browse/KAFKA-5266