From 2947de844c0ad288603244325f0c44fb2b3a5744 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Tue, 20 Jun 2017 11:40:22 +0100 Subject: [PATCH] KAFKA-4260: Check for nonroutable address is advertised.listeners --- core/src/main/scala/kafka/server/KafkaConfig.scala | 8 ++++++-- .../test/scala/unit/kafka/server/KafkaConfigTest.scala | 8 ++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index fe47fd0f8cb86..6c107cb5b4c8f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -449,9 +449,10 @@ object KafkaConfig { "The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " + "need to be different from the port to which the broker binds. If this is not set, " + "it will publish the same port that the broker binds to." - val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." + + val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the `listeners` config property." + " In IaaS environments, this may need to be different from the interface to which the broker binds." + - " If this is not set, the value for `listeners` will be used." + " If this is not set, the value for `listeners` will be used." + + " Unlike `listeners` it is not valid to advertise the 0.0.0.0 meta-address." val ListenerSecurityProtocolMapDoc = "Map between listener names and security protocols. This must be defined for " + "the same security protocol to be usable in more than one port or IP. For example, we can separate internal and " + "external traffic even if SSL is required for both. Concretely, we could define listeners with names INTERNAL " + @@ -1184,6 +1185,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + s"are ${listenerNames.map(_.value).mkString(",")}" ) + require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"), + s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+ + s"Use a routable IP address.") require(interBrokerProtocolVersion >= logMessageFormatVersion, s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index df8a6d71fa1ee..f0ece310b4c30 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -717,6 +717,14 @@ class KafkaConfigTest { assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel) } + @Test + def testNonroutableAdvertisedListeners() { + val props = new Properties() + props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092") + assertFalse(isValidKafkaConfig(props)) + } + private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { values.foreach((value) => { val props = validRequiredProps