KAFKA-7030: Add configuration to disable message down-conversion (KIP-283)#5192
Conversation
7ae2ec2 to
1622969
Compare
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, left a few comments.
| val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024, | ||
| topics)).build(1) | ||
| val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest) | ||
| topics.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, fetchResponse.responseData().get(tp).error)) |
There was a problem hiding this comment.
This assertion is satisfied even if topics doesn't contain all the expected topics. Similarly below.
| "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; | ||
|
|
||
| public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable"; | ||
| public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether down-conversion of message formats is enabled to satisfy consume requests."; |
There was a problem hiding this comment.
We should mention the impact of this config on old clients.
| // For fetch requests from clients, check if down-conversion is disabled for the particular partition | ||
| if (!fetchRequest.isFromFollower && downConvertMagic.isDefined && !replicaManager.messageDownConversionEnabled(tp).getOrElse(true)) { | ||
| trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.") | ||
| new FetchResponse.PartitionData(Errors.UNSUPPORTED_VERSION, FetchResponse.INVALID_HIGHWATERMARK, |
There was a problem hiding this comment.
Could we use errorResponse?
There was a problem hiding this comment.
Unfortunately, no. errorResponse returns FetchResponse.PartitionData[Records] but we want FetchResponse.PartitionData[BaseRecords] here.
There was a problem hiding this comment.
Found the incantation to make this work:
def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
}This says that T is a supertype of MemoryRecords and a subtype of BaseRecords. Since Records and BaseRecords both satisfy this bound, it works. I already did this locally, so I'll just push to this branch and merge.
| } | ||
| }.getOrElse(unconvertedRecords) | ||
| // For fetch requests from clients, check if down-conversion is disabled for the particular partition | ||
| if (!fetchRequest.isFromFollower && downConvertMagic.isDefined && !replicaManager.messageDownConversionEnabled(tp).getOrElse(true)) { |
There was a problem hiding this comment.
It's a little vexing we have to hit ReplicaManager a second time and repeat the same logic to extract the config. What if we just exposed an API to get the LogConfig directly for a partition?
| } | ||
|
|
||
| def convertRecords(tp: TopicPartition, unconvertedRecords: Records): BaseRecords = { | ||
| def convertFetchedData(tp: TopicPartition, |
There was a problem hiding this comment.
Perhaps we should call this maybeConvertFetchedData since the conversion may or may not happen depending on whether it is needed and permitted.
| } | ||
| } | ||
|
|
||
| def convertRecords(tp: TopicPartition, unconvertedRecords: Records, downConvertMagic: Option[Byte]): BaseRecords = { |
There was a problem hiding this comment.
nit: This method is a little odd since the conversion only happens if downConvertMagic is not None. It seems a little more natural to move the map to the caller. Since we're not hiding much, I'd probably just suggest inlining it below.
| }.toMap | ||
| } | ||
|
|
||
| private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition], |
There was a problem hiding this comment.
Seems like we're borrowing some code from FetchTest. Would it make sense to extend it?
There was a problem hiding this comment.
Probably wouldn't make sense to extend it. I tried extracting some logic into a utility class but looks like it needs some amount of refactoring.
|
|
||
| @Test | ||
| def testV1FetchWithDownConversionDisabled(): Unit = { | ||
| initProducer() |
There was a problem hiding this comment.
You can do this in a @Before method.
2e62790 to
2f98d35
Compare
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, left a couple additional comments.
| "implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface." | ||
| val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " + | ||
| "implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface." | ||
| val LogMessageDownConversionEnableDoc = "This configuration controls whether down-conversion of message formats is enabled to satisfy consume requests." |
There was a problem hiding this comment.
Seems not ideal to have this documentation in two places. I know you are just following the current pattern, but if we don't actually have any difference to communicate, perhaps we should just refer to the documentation in TopicConfig?
| } | ||
|
|
||
| @Test | ||
| def testV1FetchFromReplica(): Unit = { |
There was a problem hiding this comment.
I think the point of this test is to ensure that the downconversion config does not apply to replicas? Probably worth spelling this out. Also, I'm wondering if we should mention this in the config documentation?
| }.getOrElse(unconvertedRecords) | ||
| // For fetch requests from clients, check if down-conversion is disabled for the particular partition | ||
| if (downConvertMagic.isDefined && !fetchRequest.isFromFollower && | ||
| !logConfig.map(_.messageDownConversionEnable.booleanValue()).getOrElse(true)) { |
There was a problem hiding this comment.
I think this can be simplified: !logConfig.forall(_.messageDownConversionEnable)?
72d8db0 to
248d13e
Compare
hachikuji
left a comment
There was a problem hiding this comment.
LGTM, thanks for the patch!
…-283) (#5192) Add support for the topic-level `message.downconversion.enable` config as part of KIP-283.
…-283) (apache#5192) Add support for the topic-level `message.downconversion.enable` config as part of KIP-283.
Committer Checklist (excluded from commit message)