Skip to content

Commit

Permalink
KAFKA-6394; Add a check to prevent misconfiguration of advertised lis…
Browse files Browse the repository at this point in the history
…teners (#4897)

Do not allow server startup if one of its configured advertised listeners has already been registered by another broker.
  • Loading branch information
omkreddy authored and hachikuji committed May 11, 2018
1 parent 6eb7cf1 commit ec7ba32
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.quota.ClientQuotaCallback

import scala.collection.JavaConverters._
import scala.collection.Map
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internal.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
Expand Down Expand Up @@ -378,6 +377,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}

private[server] def createBrokerInfo: BrokerInfo = {
val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
s" advertised listeners are already registered by broker ${broker.id}")
}

val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
// Ensure connections are made to brokers before external listener is made inaccessible
describeConfig(externalAdminClient)

// Update broker keystore for external listener to use invalid listener address
// Update broker external listener to use invalid listener address
// any address other than localhost is sufficient to fail (either connection or host name verification failure)
val invalidHost = "192.168.0.1"
alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
Expand Down
49 changes: 49 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.junit.Test

class KafkaServerTest extends ZooKeeperTestHarness {

@Test
def testAlreadyRegisteredAdvertisedListeners() {
//start a server with a advertised listener
val server1 = createServer(1, "myhost", TestUtils.RandomPort)

//start a server with same advertised listener
intercept[IllegalArgumentException] {
createServer(2, "myhost", TestUtils.boundPort(server1))
}

//start a server with same host but with different port
val server2 = createServer(2, "myhost", TestUtils.RandomPort)

TestUtils.shutdownServers(Seq(server1, server2))
}

def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$hostName:$port")
val kafkaConfig = KafkaConfig.fromProps(props)
TestUtils.createServer(kafkaConfig)
}

}

0 comments on commit ec7ba32

Please sign in to comment.