-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand #15659
Conversation
…er in ReassignPartitionsCommand
throw new UnknownTopicOrPartitionException("Unable to find partition: " + | ||
missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In ReassignPartitionsCommand#describeTopics, we wrap UnknownTopicOrPartitionException with ExecutionException. Maybe we should make it consistent by this pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I don't know why do we do this, but follow the pattern is a good choice. Thanks for the comment 😃
UnknownTopicOrPartitionException exception = assertThrows(UnknownTopicOrPartitionException.class, | ||
() -> getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10))))); | ||
assertEquals("Unable to find partition: foo-10", exception.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change it to ExecutionException, should we change this? Ex:
Exception exception = assertThrows(ExecutionException.class,
() -> getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10)))));
Exception causeException = exception.getCause();
AssertTrue(causeException instanceof UnknownTopicOrPartitionException);
assertEquals("Unable to find partition: foo-10", causeException.getMessage());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, my bad...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Left one question.
@@ -300,6 +301,15 @@ public void testGetReplicaAssignments() throws Exception { | |||
|
|||
assertEquals(assignments, | |||
getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0))))); | |||
|
|||
assignments.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what's the purpose we explicitly clear assignments
here? I think this is only used as local scope variable, we should not clear it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, already removed it in the latest commit, many thanks 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Failed tests are unrelated. |
…er in ReassignPartitionsCommand (apache#15659) Currently, when executing kafka-reassign-partitions.sh with the --execute option, if a partition number specified in the JSON file does not exist, this check occurs only when submitting the reassignments to alterPartitionReassignments on the server-side. We can perform this check in advance before submitting the reassignments to the server side. Reviewers: Luke Chen <showuon@gmail.com>
related to KAFKA-16455
Currently, when executing kafka-reassign-partitions.sh with the --execute option, if a partition number specified in the JSON file does not exist, this check occurs only when submitting the reassignments to alterPartitionReassignments on the server-side.
We can perform this check in advance before submitting the reassignments to the server side.
Committer Checklist (excluded from commit message)