-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-7409: Validate topic configs prior to topic creation #5651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
https://issues.apache.org/jira/browse/KAFKA-7409 Values for `message.format.version` and `log.message.format.version` should be verfied before topic creation or config change.
hachikuji
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huxihx for picking this up so quickly. Left a few minor comments.
| try { | ||
| val createOpts = new TopicCommandOptions( | ||
| Array("--partitions", "1", "--replication-factor", "1", "--topic", "test", | ||
| "--config", "message.message.version=boom")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config should be message.format.version.
| KAFKA_2_1_IV1 | ||
| ) | ||
|
|
||
| val allVersionsStr = (allVersions.map(_.toString) ++ allVersions.map(_.shortVersion)).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. couldn't we use something like versionMap.keys.toSeq?
omkreddy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
One minor caveat is, when using with zk based topic creation, version info will be taken from the jar available on the client machines. The clients may be using different version of Kafka. In this case, there is a chance to create topic with invalid versions.
| info(s"Error processing create topic request for topic $topic with arguments $arguments", e) | ||
| CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause))) | ||
| case e: Throwable => | ||
| e.printStackTrace() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unintended print?
hachikuji
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. A few more small comments.
| val AutoCreateTopicsEnable = true | ||
| val MinInSyncReplicas = 1 | ||
| val MessageDownConversionEnable = true | ||
| val validApiVersions = ApiVersion.allVersionsArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reference the ApiVersion.allVersionsArray directly below? This is not really a default, but a constraint.
| private val versionMap: Map[String, ApiVersion] = | ||
| allVersions.map(v => v.version -> v).toMap ++ allVersions.groupBy(_.shortVersion).map { case (k, v) => k -> v.last } | ||
|
|
||
| val allVersionsArray = versionMap.keys.seq.toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this might be more generally useful if exposed as a Set. We can convert it to a seq or an array when we pass it to in in the config definition.
| props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0") | ||
| // We need to set the message format version to make the configuration valid. | ||
| props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0") | ||
| props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the reason for changing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a valid value actually, instead 0.8.2 is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, good point. Shall we change the InterBrokerProtocolVersion above as well?
|
@hachikuji Please review again. Thanks. |
| assertEquals(KAFKA_0_8_2, conf2.interBrokerProtocolVersion) | ||
|
|
||
| // check that 0.8.2.0 is the same as 0.8.2.1 | ||
| props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not allowed anymore? That would be a breaking changes, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, @ijuma is right. We have to account for the logic inside ApiVersion.apply in order to validate the version. A custom config validator might be one option.
| .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc, | ||
| KafkaConfig.LogPreAllocateProp) | ||
| .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, in(ApiVersion.allValidVersions.toArray:_*), MEDIUM, MessageFormatVersionDoc, | ||
| .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, ValidPrefixString.in(ApiVersion.allValidVersions.toArray:_*), MEDIUM, MessageFormatVersionDoc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we only have on use case for this at the moment for validating ApiVersion, I wonder if it would be simpler to have a custom ApiVersion.Validator which invokes ApiVersion.apply directly? My concern is that we have two separate paths for validation. The logic seems consistent as far as I can tell, but we may change the validation logic in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong. Currently, Validator and its implementations are in clients codebase which does not see any classes from within the core. It cannot invoke ApiVersion.apply. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, I am suggesting that we add a custom Validator implementation inside ApiVersion. We need not put it in the clients module since ApiVersion is in core.
|
@hachikuji Please review again. Thanks. |
hachikuji
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @huxihx!
… altering configs (apache#5651) Values for `message.format.version` and `log.message.format.version` should be verified before topic creation or config change. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
KAFKA-7409: Validate topic configs prior to topic creation
https://issues.apache.org/jira/browse/KAFKA-7409
Values for
message.format.versionandlog.message.format.versionshould be verfied before topic creation or config change.More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)