-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6321: Consolidate calls to KafkaConsumer's beginningOffsets()
and endOffsets()
in ConsumerGroupCommand
#4344
Conversation
protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = { | ||
val offsets = getConsumer.endOffsets(topicPartitions.asJava) | ||
topicPartitions.map { topicPartition => | ||
val logStartOffset = offsets.get(topicPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this variable be renamed to match the method name ?
The method is getLogEndOffsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks for catching it. I'll submit an update.
@becketqin This PR addresses the JIRA you opened. I'd appreciate your review when you get a chance. Thanks. |
Thanks for the patch. I am a little busy today. Will review tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Please see some comments below.
@@ -274,7 +274,10 @@ object ConsumerGroupCommand extends Logging { | |||
|
|||
protected def opts: ConsumerGroupCommandOptions | |||
|
|||
protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult | |||
protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = | |||
getLogEndOffsets(Seq(topicPartition)).get(topicPartition).getOrElse(LogOffsetResult.Ignore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could just be getLogEndOffsets(Seq(topicPartition)).getOrElse(topicPartition, LogOffsetResult.Ignore)
@@ -274,7 +274,10 @@ object ConsumerGroupCommand extends Logging { | |||
|
|||
protected def opts: ConsumerGroupCommandOptions | |||
|
|||
protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult | |||
protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this method is still used in a loop in line 706 and line 771. Should we also replace them with batch call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I'll try to consolidate this in the next commit. Thanks for catching it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@becketqin, thanks for reviewing the PR. I tried to address your comments in the new commit.
@@ -274,7 +274,10 @@ object ConsumerGroupCommand extends Logging { | |||
|
|||
protected def opts: ConsumerGroupCommandOptions | |||
|
|||
protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult | |||
protected def getLogEndOffset(topicPartition: TopicPartition): LogOffsetResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I'll try to consolidate this in the next commit. Thanks for catching it.
c6ca83c
to
5fd728d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vahidhashemian Thanks for updating the patch. Happy new year! I left a few more comments.
val offsets = getConsumer.beginningOffsets(List(topicPartition).asJava) | ||
val logStartOffset = offsets.get(topicPartition) | ||
LogOffsetResult.LogOffset(logStartOffset) | ||
protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this method is no longer used. Can we just delete it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. I'll remove it in the next commit. Thanks!
topicPartitions.map { topicPartition => | ||
val logStartOffset = offsets.get(topicPartition) | ||
topicPartition -> LogOffsetResult.LogOffset(logStartOffset) | ||
}.toMap | ||
} | ||
|
||
protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long): LogOffsetResult = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is called when resetting offsets by timestamp and duration. It seems we are still doing the reset partition by partition. Can we also batch them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll do that in the next commit.
case LogOffsetResult.LogOffset(endOffset) if offset > endOffset => | ||
warn(s"New offset ($offset) is higher than latest offset. Value will be set to $endOffset") | ||
endOffset | ||
private def checkOffsetRange(requestedOffsets: Map[TopicPartition, Long]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: perhaps we should change the method name to checkOffsetsRange(), with plural form.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will update this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@becketqin Happy new year! Thanks for another round of feedback. I tried to address them in the new commit.
val offsets = getConsumer.beginningOffsets(List(topicPartition).asJava) | ||
val logStartOffset = offsets.get(topicPartition) | ||
LogOffsetResult.LogOffset(logStartOffset) | ||
protected def getLogStartOffset(topicPartition: TopicPartition): LogOffsetResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. I'll remove it in the next commit. Thanks!
topicPartitions.map { topicPartition => | ||
val logStartOffset = offsets.get(topicPartition) | ||
topicPartition -> LogOffsetResult.LogOffset(logStartOffset) | ||
}.toMap | ||
} | ||
|
||
protected def getLogTimestampOffset(topicPartition: TopicPartition, timestamp: java.lang.Long): LogOffsetResult = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll do that in the next commit.
case LogOffsetResult.LogOffset(endOffset) if offset > endOffset => | ||
warn(s"New offset ($offset) is higher than latest offset. Value will be set to $endOffset") | ||
endOffset | ||
private def checkOffsetRange(requestedOffsets: Map[TopicPartition, Long]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will update this too.
@becketqin any other comment/feedback on this PR? Thanks. |
@vahidhashemian Sorry I missed the PR update. Will look at it later today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vahidhashemian Thanks for updating the patch. Made another pass and left some comments.
consumer.assign(topicPartitions.asJava) | ||
val offsetsForTimes = consumer.offsetsForTimes(topicPartitions.map(_ -> timestamp).toMap.asJava) | ||
|
||
if (offsetsForTimes == null || offsetsForTimes.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaConsumer.offsetsForTimes() will always return a non-empty map as long as the initial partition list is not empty. If the offset of a particular partition is not available, the returned map will have that partition mapped to null.
Perhaps we can do the following here:
- use offsetsForTimes() to get the result for all the specified partitions.
- If there are partitions with null value in the map returned in step 1, add all those unsuccessful partitions to another set.
- use getEndOffsets() to get the log end offsets for the unsuccessful partitions in step 2 and map those partitions to their log end offsets.
- return the combined result from step 1 and 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I made this change in the latest commit. It should save some cpu cycles.
}.toMap | ||
checkOffsetsRange(requestedOffsets).map { | ||
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset)) | ||
} | ||
} else if (opts.options.has(opts.resetToCurrentOpt)) { | ||
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId) | ||
partitionsToReset.map { topicPartition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also batch the requests here? Perhaps we can add all the partitions without a committed offsets to a separate set and invoke the getLogEndOffsets() for those partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that makes sense too. I've done that in the latest commit.
case _ => offset | ||
} | ||
|
||
case None => // the control should not reach here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the control should not reach here, maybe we can throw an IllegalStateException?
e24cdd6
to
a55e0c6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@becketqin Thanks for another look at this PR, and the suggestions. I tried to address them (additional comments inline) in the recent commit.
consumer.assign(topicPartitions.asJava) | ||
val offsetsForTimes = consumer.offsetsForTimes(topicPartitions.map(_ -> timestamp).toMap.asJava) | ||
|
||
if (offsetsForTimes == null || offsetsForTimes.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I made this change in the latest commit. It should save some cpu cycles.
}.toMap | ||
checkOffsetsRange(requestedOffsets).map { | ||
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset)) | ||
} | ||
} else if (opts.options.has(opts.resetToCurrentOpt)) { | ||
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId) | ||
partitionsToReset.map { topicPartition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that makes sense too. I've done that in the latest commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the latest patch. LGTM. Just one minor comment.
val preparedOffsetsForParititionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition => | ||
(topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match { | ||
case Some(offset) => offset | ||
case _ => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we print the help message and exit. Is this change intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another part of the code that should be reachable (because at this point we're certain currentCommittedOffsets.get(topicPartition)
is not None
). I used this instead of doing a currentCommittedOffsets.get(topicPartition).get
above. If you prefer that one instead, or can think of a better way to go about it (that avoids using .get
on an Option
) please let me know and I'll make the change. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is for the partitions with committed offsets. This is fine.
@vahidhashemian It looks there is a minor conflict when I was trying to check in the patch. Do you have time to rebase the patch? Thanks. |
… and `endOffsets()` in ConsumerGroupCommand
This is to avoid repeated calls to `getLogStartOffset` and `getLogEndOffset`.
a55e0c6
to
3f60d6d
Compare
@becketqin Thanks for letting me know. The rebase is complete. |
@vahidhashemian Thanks for the patch. I just merged it to the trunk. |
@becketqin, thanks! Let me know if you want me to work on that too. |
@vahidhashemian You are welcome. I have submitted a patch for KAFKA-6489. It is a small fix. |
Committer Checklist (excluded from commit message)