KAFKA-20524: CSV reset offset plan for kafka-share-groups.sh#22197
KAFKA-20524: CSV reset offset plan for kafka-share-groups.sh#22197AndrewJSchofield wants to merge 2 commits intoapache:trunkfrom
Conversation
| return offsetsUtils.resetToLatest(partitionsToReset); | ||
| } else if (opts.options.has(opts.resetToDatetimeOpt)) { | ||
| return offsetsUtils.resetToDateTime(partitionsToReset); | ||
| } else if (offsetsUtils.resetPlanFromFile().isPresent()) { |
There was a problem hiding this comment.
This call resetPlanFromFile and next call resetFromFile seem to do same reading and parsing the entire csv. May be use opts to check for presence?
| withTimeoutMs(new ListShareGroupOffsetsOptions()) | ||
| ).partitionsToOffsetInfo(groupId).get(); | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Is it better to interrupt the currentThread for RunTimeException?
For ExecutionException, getting the instance type for ex if it's of KafkaException(GroupAuthorizationException etc), it could be more visible to the caller ?
| offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset); | ||
| if (opts.options.has(opts.resetToEarliestOpt)) { | ||
| if (opts.options.has(opts.resetToOffsetOpt)) { | ||
| return offsetsUtils.resetToOffset(partitionsToReset); |
There was a problem hiding this comment.
Not related to this pr, but this call sets offset to 0L, if opts.resetToOffsetOpt is empty or null. Is that ok instead of throwing an exception?
This is also the same in CosnumerGroupCommand too I think.
There was a problem hiding this comment.
I think that's a tiny piece of defensive code in OffsetsUtils.resetToOffset. In practice, the command line argument spec for resetToOffsetOpt requires an argument of type Long. If no argument or a non-Long argument is specified with --to-offset, the command does fail.
muralibasani
left a comment
There was a problem hiding this comment.
@AndrewJSchofield thanks for the pr. I have some minor comments.
Thanks for the review. I basically mapped the consumer group tool options across to the share group tool, and you're quite right that there are a few things to improve. I'll sort it out. |
chia7712
left a comment
There was a problem hiding this comment.
@AndrewJSchofield Out of curiosity, does the KIP's motivation include aligning offset-related options? If so, should we also support the shift-by option?
| "To define the scope use --all-topics or --topic." + NL + | ||
| "Additionally, the --export option is used to export the offsets in CSV format." + NL + | ||
| "You must choose one of the following reset specifications: --to-datetime, --to-earliest, --to-latest, --from-file, --to-current, --to-offset." + NL + | ||
| "To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'." + NL + |
There was a problem hiding this comment.
To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'
It looks like we should remove the condition if (!options.has(topicOpt) && !options.has(allTopicsOpt)) from line 226, right?
| return preparedOffsetsForPartitionsWithCommittedOffset; | ||
| } | ||
|
|
||
| public Map<TopicPartition, OffsetAndMetadata> resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset, Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) { |
There was a problem hiding this comment.
I understand this implementation aligns with resetToCurrent, but it feels a bit old-school to me. We could simplify it using the following style
public Map<TopicPartition, OffsetAndMetadata> resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset, Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {
var partitioned = partitionsToReset.stream().collect(Collectors.partitioningBy(currentOffsetInfo::containsKey));
var partitionsToResetWithStartOffset = partitioned.get(true);
var partitionsToResetWithoutStartOffset = partitioned.get(false);
var preparedOffsetsForPartitionsWithStartOffset = partitionsToResetWithStartOffset.stream()
.collect(Collectors.toMap(Function.identity(), topicPartition -> new OffsetAndMetadata(currentOffsetInfo.get(topicPartition).startOffset())));
getLogEndOffsets(partitionsToResetWithoutStartOffset).forEach((tp, logOffsetResult) -> {
if (!(logOffsetResult instanceof OffsetsUtils.LogOffset logOffset)) {
throw new IllegalStateException("Error getting ending offset of topic partition: " + tp);
}
preparedOffsetsForPartitionsWithStartOffset.put(tp, new OffsetAndMetadata(logOffset.value));
});
return preparedOffsetsForPartitionsWithStartOffset;
}There was a problem hiding this comment.
BTW, I'm fine with addressing this in a separate PR
There was a problem hiding this comment.
This KIP is still in voting and I'm getting the PR ready in anticipation. Entirely happy to iterate on it.
@chia7712 The motivation was really to enable offsets to be exported from one group and used to initialise another. I'm not really averse to shift-by, but hadn't planned to do it. There is also another difference and that is that you can only reset the offsets on one share group at a time. When resetting offsets for consumer groups, you can list multiple groups or reset all groups at once. |
This is the implementation of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1323%3A+Initialization+of+share+group+offsets+from+a+specific+offset+or+a+file.
Reviewers: Chia-Ping Tsai chia7712@gmail.com