New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5161: add code in reassign-partitions to check broker existence #2962
KAFKA-5161: add code in reassign-partitions to check broker existence #2962
Conversation
…uster Added code for non-existent brokerID checking in the proposed assignemnt plan.
…uster Added code to check brokerID existence in the proposed assignment plan.
@hachikuji Please take some time to review this PR. Thanks. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@guozhangwang Please kindly take some time to review this PR. Thanks. |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the title to KAFKA-5161: ...
@@ -209,6 +209,12 @@ object ReassignPartitionsCommand extends Logging { | |||
throw new AdminCommandFailedException("The proposed assignment contains non-existent partitions: " + | |||
nonExistentPartitions) | |||
|
|||
//Check that all brokers in the proposed assignment exist in the cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: once space after //
and do not need to capitalize.
@guozhangwang Thanks for the comments and please review again. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Merged to trunk. |
val existingBrokerIDs = zkUtils.getSortedBrokerList() | ||
val nonExistingBrokerIDs = partitionsToBeReassigned.toMap.values.flatten.filterNot(existingBrokerIDs.contains).toSet | ||
if (nonExistingBrokerIDs.nonEmpty) | ||
throw new AdminCommandFailedException("The proposed assignment contains non-existent brokerIDs: " + nonExistingBrokerIDs.mkString(",")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would probably be good to be a bit clearer about what non-existent
means. A broker that lost connection (maybe due to an upgrade) won't appear in getSortedBrokerList
for example.
Added code to check existence of the brokers in the proposed plan.