-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6238; Fix inter-broker protocol message format compatibility check #4583
Conversation
lgtm Please correct the JIRA number in the subject |
case 1 => "0.10.0" | ||
case 2 => "0.11.0" | ||
case _ => throw new IllegalArgumentException(s"Invalid message format version $messageFormatVersion") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not figure out these numbers automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to avoid the internal version numbers.
intercept[IllegalArgumentException] { | ||
KafkaConfig.fromProps(props) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I wonder if these 2 methods should be inner defs. You could also potentially have another inner def like:
def kafkaConfig(interBrokerProtocol: ApiVersion, messageFormat: ApiVersion): KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
KafkaConfig.fromProps(props)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, LGTM.
@ijuma Thanks for the review. I've renamed the enum to the more concise |
This patch fixes a bug in the validation of the inter-broker protocol and the message format version. We should allow the configured message format api version to be greater than the inter-broker protocol api version as long as the actual message format versions are equal. For example, if the message format version is set to 1.0, it is fine for the inter-broker protocol version to be 0.11.0 because they both use message format v2. I have added a unit test which checks compatibility for all combinations of the message format version and the inter-broker protocol version. Author: Jason Gustafson <jason@confluent.io> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes #4583 from hachikuji/KAFKA-6328-REOPENED (cherry picked from commit 660c0c0) Signed-off-by: Damian Guy <damian.guy@gmail.com>
Merged to trunk and 1.1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Left some minor comments.
Also worth filing a JIRA for making terminology consistent across the board for magic
, message format version
, record format/version
. Or maybe there is a JIRA for that already.
} | ||
|
||
def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String = { | ||
messageFormatVersion match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace all messageFormatVersion
with recordFormat
?
*/ | ||
package org.apache.kafka.common.record; | ||
|
||
public enum RecordFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering between RecordFormat
and RecordVersion
. I assume you think the former is clearer?
Also, maybe we want to add brief javadoc explaining what this represents (particularly given the fact that configs still talk about message format version).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's compromise and go with RecordMagic
😜
I think I do prefer RecordVersion
after thinking about it. The current name suggests that it has something to do with the schema.
this.value = (byte) value; | ||
} | ||
|
||
public static RecordFormat lookup(byte version) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this parameter name be consistent with the field name?
@@ -1002,7 +1002,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
} | |||
|
|||
def getMagic(topicPartition: TopicPartition): Option[Byte] = | |||
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion)) | |||
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of weird that we have the double messageFormatVersion
, but I guess that's unavoidable given the (IMHO mistaken) choice to couple message format versions with api versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would have been simpler and bugs like this wouldn't have been possible if we exposed the message format version directly in the config.
case 1: return V1; | ||
case 2: return V2; | ||
default: throw new IllegalArgumentException("Unknown format version: " + version); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could just do values()[version]
after the appropriate bounds check at the start of the method. So that this doesn't have to change as we add versions. One downside is that values
causes a copy. So we could save a copy as a private field instead (this is sadly a common pattern with enums).
@ijuma Thanks for the comments. I'll submit a minor follow-up. |
This patch fixes a bug in the validation of the inter-broker protocol and the message format version. We should allow the configured message format api version to be greater than the inter-broker protocol api version as long as the actual message format versions are equal. For example, if the message format version is set to 1.0, it is fine for the inter-broker protocol version to be 0.11.0 because they both use message format v2. I have added a unit test which checks compatibility for all combinations of the message format version and the inter-broker protocol version. Author: Jason Gustafson <jason@confluent.io> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes #4583 from hachikuji/KAFKA-6328-REOPENED
Also include a few clean-ups: * Method/variable/parameter renames to make them consistent with the class name * Return `ApiVersion` from `minSupportedFor` * Use `values` to remove some code duplication * Reduce duplication in `ApiVersion` by introducing the `shortVersion` method and building the versions map programatically * Avoid unnecessary `regex` in `ApiVersion.apply` * Added scaladoc to a few methods Some of these were originally discussed in: #4583 (review) Added a test for `ApiVersion.shortVersion`. Relying on existing tests for the rest since there is no change in behaviour. Reviewers: Jason Gustafson <jason@confluent.io>
Also include a few clean-ups: * Method/variable/parameter renames to make them consistent with the class name * Return `ApiVersion` from `minSupportedFor` * Use `values` to remove some code duplication * Reduce duplication in `ApiVersion` by introducing the `shortVersion` method and building the versions map programatically * Avoid unnecessary `regex` in `ApiVersion.apply` * Added scaladoc to a few methods Some of these were originally discussed in: apache#4583 (review) Added a test for `ApiVersion.shortVersion`. Relying on existing tests for the rest since there is no change in behaviour. Reviewers: Jason Gustafson <jason@confluent.io>
This patch fixes a bug in the validation of the inter-broker protocol and the message format version. We should allow the configured message format api version to be greater than the inter-broker protocol api version as long as the actual message format versions are equal. For example, if the message format version is set to 1.0, it is fine for the inter-broker protocol version to be 0.11.0 because they both use message format v2.
I have added a unit test which checks compatibility for all combinations of the message format version and the inter-broker protocol version.
Committer Checklist (excluded from commit message)