Skip to content

Commit

Permalink
KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
Browse files Browse the repository at this point in the history
Some plugins make use of KafkaConfig#originals rather than the
KafkaConfig object. We should ensure that these plugins see the
correct value for broker.id if the broker is running in KRaft mode and
node.id has been configured, but not broker.id.

This PR does this by ensuring that both node.id and broker.id are set in
the orignals map if either one is set.  We also check that they are set
to the same value in KafkaConfig#validateValues.

Co-author: Ron Dagostino <rdagostino@confluent.io>
  • Loading branch information
cmccabe committed Sep 8, 2021
1 parent d30b4e5 commit 49569bc
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 10 deletions.
28 changes: 18 additions & 10 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -1385,10 +1385,22 @@ object KafkaConfig {
}
if (maybeSensitive) Password.HIDDEN else value
}

def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
val output = new util.HashMap[Any, Any](input)
val brokerId = output.get(KafkaConfig.BrokerIdProp)
val nodeId = output.get(KafkaConfig.NodeIdProp)
if (brokerId == null && nodeId != null) {
output.put(KafkaConfig.BrokerIdProp, nodeId)
} else if (brokerId != null && nodeId == null) {
output.put(KafkaConfig.NodeIdProp, brokerId)
}
output
}
}

class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {
extends AbstractConfig(KafkaConfig.configDef, KafkaConfig.populateSynonyms(props), doLog) with Logging {

def this(props: java.util.Map[_, _]) = this(props, true, None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
Expand Down Expand Up @@ -1516,15 +1528,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = {
val nodeId = getInt(KafkaConfig.NodeIdProp)
if (nodeId < 0) {
getInt(KafkaConfig.BrokerIdProp)
} else {
nodeId
}
}
val nodeId: Int = brokerId
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
Expand Down Expand Up @@ -1905,6 +1910,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

@nowarn("cat=deprecation")
private def validateValues(): Unit = {
if (nodeId != brokerId) {
throw new ConfigException(s"You must set `${KafkaConfig.NodeIdProp}` to the same value as `${KafkaConfig.BrokerIdProp}`.")
}
if (requiresZookeeper) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Expand Up @@ -1235,4 +1235,76 @@ class KafkaConfigTest {
assertEquals(dataDir1, config.metadataLogDir)
assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
}

@Test
def testPopulateSynonymsOnEmptyMap(): Unit = {
assertEquals(Collections.emptyMap(), KafkaConfig.populateSynonyms(Collections.emptyMap()))
}

@Test
def testPopulateSynonymsOnMapWithoutNodeId(): Unit = {
val input = new util.HashMap[String, String]()
input.put(KafkaConfig.BrokerIdProp, "4")
val expectedOutput = new util.HashMap[String, String]()
expectedOutput.put(KafkaConfig.BrokerIdProp, "4")
expectedOutput.put(KafkaConfig.NodeIdProp, "4")
assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input))
}

@Test
def testPopulateSynonymsOnMapWithoutBrokerId(): Unit = {
val input = new util.HashMap[String, String]()
input.put(KafkaConfig.NodeIdProp, "4")
val expectedOutput = new util.HashMap[String, String]()
expectedOutput.put(KafkaConfig.BrokerIdProp, "4")
expectedOutput.put(KafkaConfig.NodeIdProp, "4")
assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input))
}

@Test
def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.NodeIdProp, "2")
assertEquals("You must set `node.id` to the same value as `broker.id`.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage())
}

@Test
def testNodeIdOrBrokerIdMustBeSetWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertEquals("Missing configuration `node.id` which is required when `process.roles` " +
"is defined (i.e. when running in KRaft mode).",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage())
}

@Test
def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.BrokerIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
assertEquals(3, config.nodeId)
val originals = config.originals()
assertEquals("3", originals.get(KafkaConfig.BrokerIdProp))
assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
}

@Test
def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.NodeIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
assertEquals(3, config.nodeId)
val originals = config.originals()
assertEquals("3", originals.get(KafkaConfig.BrokerIdProp))
assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
}
}

0 comments on commit 49569bc

Please sign in to comment.