New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14591 DeleteRecordsCommand moved to tools #13278
Conversation
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @nizhikov, thanks for working on this. Nice job.
It looks like the scala version does not work well when you pass a non existent partition, while yours works as expected.
# trunk
Executing records delete operation
Records delete operation completed:
partition: mytest-0 error: org.apache.kafka.common.errors.TimeoutException: Call(callName=deleteRecords(api=METADATA), deadlineMs=1688824453413, tries=332909, nextAllowedTryMs=1688824453523) timed out at 1688824453423 after 332909 attempt(s)
# this PR
Executing records delete operation
Records delete operation completed:
partition: mytest-0 error: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
Have you considered using the Jackson wrapper proposed here?
Can you please rebase and add a unit test for the various options, like we have for the other tools?
Hello, @fvaleri Thanks for the feedback.
Yes. I merged trunk to the PR and added tests for rewritten command.
I'm not aware of this work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nizhikov there are a couple of checkstyle errors.
Please, take a look into DeleteRecordsCommandTest.
Done. Left some comments.
I'm not aware of this work.
It seems right now command works like expected :)
The JSON parsing logic is shared between multiple tools, including this one, so it seems a nice optimization to have. If we can get #13585 merged soon, then you can also use it here. Let's also see what other think about this.
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
Outdated
Show resolved
Hide resolved
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
Outdated
Show resolved
Hide resolved
Sure. We must keep codebase consistent. |
Yep, LGTM. Just waiting for #13585. |
#13585 has been merged. Can you please rebase and integrate with this PR? |
@fvaleri Done. New tests passed locally. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
Build failure is due to #14037.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I left a few comments.
server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
Outdated
Show resolved
Hide resolved
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
Outdated
Show resolved
Hide resolved
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
Show resolved
Hide resolved
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\"}]}"); | ||
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", \"partition\": \"\"}]}"); | ||
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", \"partition\": 0}]}"); | ||
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", \"offset\":0}]}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if there are extra fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra fields will be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a test for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Success parsing test case extended like this:
@Test
public void testParse() throws Exception {
Collection<DeleteRecordsCommand.Tuple<TopicPartition, Long>> res = DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(
"{\"partitions\":[" +
"{\"topic\":\"t\", \"partition\":0, \"offset\":0}," +
"{\"topic\":\"t\", \"partition\":1, \"offset\":1, \"ignored\":\"field\"}," +
"{\"topic\":\"t\", \"partition\":0, \"offset\":2}," +
"{\"topic\":\"t\", \"partition\":0, \"offset\":0}" +
"]}"
);
server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java
Outdated
Show resolved
Hide resolved
server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java
Outdated
Show resolved
Hide resolved
server-common/src/test/java/org/apache/kafka/server/util/CoreUtilsTest.java
Outdated
Show resolved
Hide resolved
@mimaison Thanks for the review. It seems I addressed all your comments. Please, take a look one more time. |
Tests failures unrelated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, I made another pass and left a few more comments
* 2. It is the most general possible utility, not just the thing you needed in one particular place | ||
* 3. You have tests for it if it is nontrivial in any way | ||
*/ | ||
public class CoreUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ToolsUtils
would be a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have ToolsUtils in server-common, and maybe we should think about moving it to the tools module in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
} | ||
} | ||
|
||
public static final class Tuple<V1, V2> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this class? Why can't we use something like Map
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regular Map implementations can have only one key -> value mapping.
But parseOffsetJsonStringWithoutDedup
collects data with possible duplicates.
I can rework command logic and throw on first TopicPartition
duplicate, but, it seems out of scope of "just rewrite command in java without changing anything".
Do we have some multi map implementation that can be used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have similar clases like
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor.TaskPairs.Pair
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.ConsumerGenerationPair
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.ConsumerPair
So may be it will be better to keep generic class and reuse it in other places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just asking, if there's a good reason to keep Tuple, we should keep it and not change the tool's semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems tool semantic not changed with Map<TopicPartition, List<Long>>
.
So current changes conform both semantic requirements and your suggestion.
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\"}]}"); | ||
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", \"partition\": \"\"}]}"); | ||
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", \"partition\": 0}]}"); | ||
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", \"offset\":0}]}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a test for it?
@mimaison all your comments addressed except the one with the Tuple. Please, review. |
@mimaison I reworked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the updates!
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>
This PR contains changes to move
DeleteRecordsCommand
to java codeCommitter Checklist (excluded from commit message)