Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals #11312

Merged
merged 2 commits into from Sep 13, 2021

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented Sep 8, 2021

Some plugins make use of KafkaConfig#originals rather than the
KafkaConfig object. We should ensure that these plugins see the
correct value for broker.id if the broker is running in KRaft mode and
node.id has been configured, but not broker.id.

This PR does this by ensuring that both node.id and broker.id are set in
the orignals map if either one is set. We also check that they are set
to the same value in KafkaConfig#validateValues.

Co-author: Ron Dagostino rdagostino@confluent.io

Some plugins make use of KafkaConfig#originals rather than the
KafkaConfig object. We should ensure that these plugins see the
correct value for broker.id if the broker is running in KRaft mode and
node.id has been configured, but not broker.id.

This PR does this by ensuring that both node.id and broker.id are set in
the orignals map if either one is set.  We also check that they are set
to the same value in KafkaConfig#validateValues.

Co-author: Ron Dagostino <rdagostino@confluent.io>
Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cmccabe (and @rondagostino). Two small things inline

@@ -1385,10 +1385,22 @@ object KafkaConfig {
}
if (maybeSensitive) Password.HIDDEN else value
}

def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment or docstring would be useful here explaining the motivation for this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@@ -1385,10 +1385,22 @@ object KafkaConfig {
}
if (maybeSensitive) Password.HIDDEN else value
}

def populateSynonyms(input: util.Map[_, _]): util.Map[Any, Any] = {
val output = new util.HashMap[Any, Any](input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to make a new map here? Are there any callers of KafkaConfig() which might expect to modify the props map after the object has been constructed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractConfig already copies the map which is passed to it.

}

class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {
extends AbstractConfig(KafkaConfig.configDef, KafkaConfig.populateSynonyms(props), doLog) with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little bit odd that we populate synonyms only in the reference that we're passing to AbstractConfig. The field KafkaConfig.props could still be accessed directly (maybe it should be private?). Would it make sense to move this to a factory method? For example:

object KafkaConfig {
  def apply(props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig]): KafkaConfig = {
    new KafkaConfig(populateSynonyms(props), doLog, dynamicConfigOverride)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good find. This is a case where Scala's behavior is kind of annoying. I wish there was a way to opt-out of the auto-initialization.

Anyway, I made the primary constructor private, and put a call to KafkaConfig#populateSynonyms in all the (publicly visible) secondary constructors. This should fix it...

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cmccabe cmccabe merged commit 0786dc8 into apache:trunk Sep 13, 2021
@cmccabe cmccabe deleted the KAFKA-13224 branch September 13, 2021 17:14
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…pache#11312)

Some plugins make use of KafkaConfig#originals rather than the
KafkaConfig object. We should ensure that these plugins see the
correct value for broker.id if the broker is running in KRaft mode and
node.id has been configured, but not broker.id.

This PR does this by ensuring that both node.id and broker.id are set in
the originals map if either one is set.  We also check that they are set
to the same value in KafkaConfig#validateValues.

Co-author: Ron Dagostino <rdagostino@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants