Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals #11312

Merged
merged 2 commits into from Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 18 additions & 10 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -1385,10 +1385,22 @@ object KafkaConfig {
}
if (maybeSensitive) Password.HIDDEN else value
}

def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment or docstring would be useful here explaining the motivation for this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

val output = new util.HashMap[Any, Any](input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to make a new map here? Are there any callers of KafkaConfig() which might expect to modify the props map after the object has been constructed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractConfig already copies the map which is passed to it.

val brokerId = output.get(KafkaConfig.BrokerIdProp)
val nodeId = output.get(KafkaConfig.NodeIdProp)
if (brokerId == null && nodeId != null) {
output.put(KafkaConfig.BrokerIdProp, nodeId)
} else if (brokerId != null && nodeId == null) {
output.put(KafkaConfig.NodeIdProp, brokerId)
}
output
}
}

class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {
extends AbstractConfig(KafkaConfig.configDef, KafkaConfig.populateSynonyms(props), doLog) with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little bit odd that we populate synonyms only in the reference that we're passing to AbstractConfig. The field KafkaConfig.props could still be accessed directly (maybe it should be private?). Would it make sense to move this to a factory method? For example:

object KafkaConfig {
  def apply(props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]): KafkaConfig = {
    new KafkaConfig(populateSynonyms(props), doLog, dynamicConfigOverride)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good find. This is a case where Scala's behavior is kind of annoying. I wish there was a way to opt-out of the auto-initialization.

Anyway, I made the primary constructor private, and put a call to KafkaConfig#populateSynonyms in all the (publicly visible) secondary constructors. This should fix it...


def this(props: java.util.Map[_, _]) = this(props, true, None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
Expand Down Expand Up @@ -1516,15 +1528,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = {
val nodeId = getInt(KafkaConfig.NodeIdProp)
if (nodeId < 0) {
getInt(KafkaConfig.BrokerIdProp)
} else {
nodeId
}
}
val nodeId: Int = brokerId
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
Expand Down Expand Up @@ -1905,6 +1910,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

@nowarn("cat=deprecation")
private def validateValues(): Unit = {
if (nodeId != brokerId) {
throw new ConfigException(s"You must set `${KafkaConfig.NodeIdProp}` to the same value as `${KafkaConfig.BrokerIdProp}`.")
}
if (requiresZookeeper) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Expand Up @@ -1235,4 +1235,76 @@ class KafkaConfigTest {
assertEquals(dataDir1, config.metadataLogDir)
assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
}

@Test
def testPopulateSynonymsOnEmptyMap(): Unit = {
assertEquals(Collections.emptyMap(), KafkaConfig.populateSynonyms(Collections.emptyMap()))
}

@Test
def testPopulateSynonymsOnMapWithoutNodeId(): Unit = {
val input = new util.HashMap[String, String]()
input.put(KafkaConfig.BrokerIdProp, "4")
val expectedOutput = new util.HashMap[String, String]()
expectedOutput.put(KafkaConfig.BrokerIdProp, "4")
expectedOutput.put(KafkaConfig.NodeIdProp, "4")
assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input))
}

@Test
def testPopulateSynonymsOnMapWithoutBrokerId(): Unit = {
val input = new util.HashMap[String, String]()
input.put(KafkaConfig.NodeIdProp, "4")
val expectedOutput = new util.HashMap[String, String]()
expectedOutput.put(KafkaConfig.BrokerIdProp, "4")
expectedOutput.put(KafkaConfig.NodeIdProp, "4")
assertEquals(expectedOutput, KafkaConfig.populateSynonyms(input))
}

@Test
def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.NodeIdProp, "2")
assertEquals("You must set `node.id` to the same value as `broker.id`.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage())
}

@Test
def testNodeIdOrBrokerIdMustBeSetWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertEquals("Missing configuration `node.id` which is required when `process.roles` " +
"is defined (i.e. when running in KRaft mode).",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage())
}

@Test
def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.BrokerIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
assertEquals(3, config.nodeId)
val originals = config.originals()
assertEquals("3", originals.get(KafkaConfig.BrokerIdProp))
assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
}

@Test
def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
props.setProperty(KafkaConfig.NodeIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
assertEquals(3, config.nodeId)
val originals = config.originals()
assertEquals("3", originals.get(KafkaConfig.BrokerIdProp))
assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
}
}