-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6394: Add a check to prevent misconfiguration of advertised listeners #4897
Conversation
@hachikuji Please take a look when you get a chance. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @omkreddy. Should we also validate advertised listeners after a dynamic config change?
if (!config.advertisedListenersEndPoints.isEmpty) { | ||
val hostNames = config.advertisedListenersEndPoints.map(_.host) | ||
zkClient.getAllBrokersInCluster.foreach(broker => { | ||
val commonHosts = broker.endPoints.map(_.host).intersect(hostNames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check port as well? I think using the same host is pretty common, especially for development.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can check port also. Updated the code.
@@ -374,6 +372,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP | |||
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64)) | |||
} | |||
|
|||
private def registerBroker(zkClient: KafkaZkClient): BrokerInfo = { | |||
if (!config.advertisedListenersEndPoints.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should check config.advertisedListeners
instead? I guess you are expecting that we would fail to bind if the listener was incorrectly specified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking we want this check only if advertised.listeners configured. but it makes sense to check if listener was incorrectly specified. updated the code
b2c688a
to
368aedf
Compare
368aedf
to
28bf632
Compare
@hachikuji Thanks for the review. yes, we can validate advertised listeners after a dynamic config change. Updated the code. Pls take a look. |
@@ -775,6 +775,14 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi | |||
} | |||
if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName)) | |||
throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}") | |||
|
|||
val endPoints = newConfig.advertisedListeners.map(e => s"${e.host}:${e.port}") | |||
server.zkClient.getAllBrokersInCluster.filter(_.id != newConfig.brokerId).foreach(broker => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure it's worthwhile given how rarely the listeners would be updated in practice, but I guess we could also use the metadata cache?
Otherwise, it would be nice to factor out the common logic between this and KafkaServer
. Like maybe a method in AdminUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji I am also not strongly inclined to add this check in DynamicConfig validation phase. DynamicListenerConfig.reconfigure method fails if there any misconfigured listeners. here:
https://github.com/omkreddy/kafka/blob/28bf632dc791ec42077b8ab19aad7583a44975d4/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L804
If there are no concerns, I want to drop this check from DynamicBrokerConfig. validateReconfiguration
@hachikuji reverted changes related to dynamic configs. Pls take a look. |
@hachikuji pinging for review. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the patch. Note I made a trivial tweak to have the error message display the common endpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@omkreddy Sorry, just one more small comment.
|
||
class KafkaConfigTest { | ||
class KafkaConfigTest extends ZooKeeperTestHarness { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little annoying that we had to do this just for the one test case. Maybe we could add a KafkaServerTest
since KafkaServer
is where the check was added?
77ef269
to
e0d7441
Compare
@hachikuji Thanks for the review. Updated the PR. |
LGTM, thanks @omkreddy |
…teners (apache#4897) Do not allow server startup if one of its configured advertised listeners has already been registered by another broker.
Sorry to resurrect a zombie thread, but this broke a common pattern we use for our healthcheck listeners. Typically our healthcheck listeners advertise only 127.0.0.1:9094 and the sidecar process runs locally and communicates over localhost. Now, with this change, the broker won't start because the listener is already registered. |
Just came here after debugging the same issue @jeckhart (assuming you got it from the gruntwork module). Can we make an exception for localhost or 127.0.0.1? |
Yes, I got it from the Gruntwork module. I think that proposal works for me. |
No description provided.