-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5170. KafkaAdminClientIntegration test should wait until metada… #2976
Conversation
…ta is propagated to all brokers
Refer to this link for build results (access rights to CI server needed): |
* @return The leader of the partition. | ||
*/ | ||
def waitUntilBrokerMetadataIsPropagated(servers: Seq[KafkaServer], | ||
brokerList: Seq[String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need brokerList
when you already have servers
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more flexible if we can specify which brokers we want to wait for. Also, this way is a lot easier since we don't have to pull the host:port pairs out of the KafkaServer objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not simply compare the broker id? Seems much simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is a lot simpler. Will change.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Got a flaky test. Looks like KAFKA-4801
|
retest this please |
Refer to this link for build results (access rights to CI server needed): |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
TestUtils.waitUntilTrue(() => false == servers.exists(server => { | ||
expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(broker => broker.id) | ||
}), "Timed out waiting for all servers to learn about the broker list.", timeout, 50) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simplified this a little and made the comparison not care about order.
val expectedBrokerIds = servers.map(_.config.brokerId).toSet
TestUtils.waitUntilTrue(() => servers.forall(server =>
expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet
), "Timed out waiting for broker metadata to propagate to all servers", timeout)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Made a small change before merging to trunk (as per my comment).
…ta is propagated to all brokers