-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3025 Added timetamp to Message and use relative offset. #764
Conversation
val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" | ||
val AutoCreateTopicsEnableProp = "auto.create.topics.enable" | ||
val MinInSyncReplicasProp = "min.insync.replicas" | ||
val MessageTimestampTypeProp = "messge.timestamp.type" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in "messge"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't clear from the KIP, but is this a broker-wide setting or can it be overridden for each topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji I was also thinking about that. Currently whatever configurations in LogConfig are per topic configurations. And the message timestamp type is a legitimate log config. So currently it is a per topic configuration. I can see some benefit of doing so from migration point of view. Because most topics are owned by some applications. We can start to use the new format once all the client of that topic has migrated. And in the final state, we can choose to leave the topics whose owner are not able to migrate to use old format and still have zero-copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. Makes sense to me. By the way, I've only done a quick pass on this patch so far, but I'm planning to spend a bit more time in the next couple days.
7eee789
to
be41afe
Compare
@becketqin would you add the remaining KIP-31 and KIP-32 work to this patch (client side work and timestamp in produce response)? Or that would be a different patch? |
@apovzner This patch contains all the features in KIP-31 and KIP-32. The rest of the work is probably adding integration test. I have already added some unit tests but we can also add more if needed. |
@@ -12,6 +12,8 @@ | |||
*/ | |||
package org.apache.kafka.clients.producer; | |||
|
|||
import org.apache.kafka.common.record.Record; | |||
|
|||
/** | |||
* A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional | |||
* partition number, and an optional key and value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are adding timestamp field to ProducerRecord, I think we should add a comment to ProducerRecord class description about meaning of timestamp, what happens if user sets null, etc.
@becketqin My question about remaining KIP-31 and KIP-32 work was based on outdated info -- I did not refresh my window and did not see that you added client-side implementation + returning timestamp in produce response. I see now that you also updated the PR description, thanks! |
@@ -494,6 +495,16 @@ private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedExce | |||
return time.milliseconds() - begin; | |||
} | |||
|
|||
private long getTimestamp(String topic, Long timestamp) { | |||
// If log append times is used for the topic, we overwrite the timestamp to avoid server side re-compression. | |||
if (metadata.isUsingLogAppendTime(topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to my other comment about learning about timestamp type for topic. So, the first set of produce messages will not have timestamp == INHERITED_TIMESTAMP if timestamp type == LogAppendTime, right? If setting timestamp to INHERITED_TIMESTAMP is required for compressed messages to work, does it mean we have a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not required to be INHERITED_TIMESTAMP, but it is good to be so.
Like I answered in your other comment, if a topic is using LogAppendTime and a broker receives a message whose timestamp is not INHERITED_TIMESTAMP, it will overwrite it and do the recompression. So the first batch of a new producer might cause recompression on broker side, but after that, no recompression should be needed. I will add some comments so it is more clear.
@becketqin Here are some high-level thoughts about the protocol:
I think we can add this information into the attribute field of the message, which currently only used 2 bits for four different compression types; instead we can make it a mask manner where the first 3 (or if we want to be safer, use 4) bits are preserved for indicating compression codec, leaves us a total of 8 (or 16) supported compression types, and use the forth (fifth) bit for indicating if the wrapper timestamp (for LogAppendTime, hence it is overridden) or the inner timestamp (for LogCreationTime) should be used to set the consumer record's timestamp. And with this neither producer nor consumer needs to learn about this per-topic config from metadata responses, which makes the client change simpler, and other languages' adoption easier.
|
@guozhangwang Using attribute field is a good approach. It also lets consumers know the timestamp type. To make sure I understand your suggestion correctly:
Another thing is that we still need to decompress the entire compressed message, because of the reason I mentioned in one of the comments. Given the stream compression used by producer, we will not have a actual "relative offset compared with last message" until we close the batch. Instead, we only have the "relative offset compared with the first message" when we write a message into a batch. Because the outer message only has the absolute offset of the last message, in order to have the absolute offset of an inner message, we have to decompress the entire compressed message to find out the "relative offset compared with the last message", then compute the absolute offset. I feel this is fine for new consumer because we are delivering messages in batch to use anyways. |
BTW, currently the CompressionCodecMask is set to 0x7, so it is 3 bits. Changing that to 4 bits is backward compatible so that should be fine. |
@becketqin yes your understanding is correct. I was initially thinking about possibilities of NOT decompressing the whole message when we add the memory management feature in the future so that we can choose to buffer less decompressed messages. But it seems not possible now, which maybe still fine for us so let's forget about it. |
@apovzner @guozhangwang I updated the patch with Guozhang's proposal. I will add integration test in a separate PR.
|
@becketqin You can do end-to-end compatibility testing with system tests. Take a look at compatibility_test.py. It currently tests 0.9.X java producer against 0.8.X brokers and 0.8.X consumer against an 0.9.X brokers. They both succeed on expected failure. You can add couple of more system tests to that to test newer brokers with older producers and/or consumers. Note that you would need to update vagrant/base.sh to get Kafka release 0.9.0.0. |
int partition, | ||
long offset, | ||
long timestamp, | ||
Record.TimestampType timestampType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are exposing timestamp type in ConsumerRecord, should we declare TimestampType outside of Record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is in KafkaProducer line 437. We just need a one liner now.
@becketqin Maybe I missed it, but I don't see where producer assigns timestamps if the user does not specify the timestamp in ProducerRecord. The code was there before, but maybe it got accidentally removed with recent changes? |
@apovzner Thanks for the direction on compatibility test. Extracting timestamp type out makes sense, given we already did that for CompressionType. I will change server side as well. |
* 2. If the message is using log append time and is an uncompressed message, this method will overwrite the | ||
* timestamp of the message. | ||
*/ | ||
private def validateTimestamp(message: Message, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above the method does not match implementation anymore -- we are now only checking acceptable range for CreateTime timestamps.
@becketqin I reviewed the KIP-32 part of the patch (did not go in detail about KIP-31 related changes). Using timestamp type in attributes made producer/consumer code much cleaner! I made minor comments. Otherwise looks good to me. |
@apovzner Thanks for the review. @guozhangwang @junrao Will you help take a look at the patch? Thanks. |
The test failure is intermittent and is not related to this change. |
<p><b>For a rolling upgrade:</b></p> | ||
|
||
<ol> | ||
<li> Update server.properties file on all brokers and add the following properties: inter.broker.protocol.version=CURRENT_KAFKA_VERSION(e.g. 0.8.2, 0.9.0.0), message.format.version=CURRENT_KAFKA_VERSION(e.g. 0.8.2, 0.9.0.0) </li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message.format.version change is an optimization. So, it's not really required. We can probably just cover that in the section on performance impact.
Thanks for the patch. Looks good overall. Just left a few minor comments. Also, in TopicCommand, when listing the available config options, could we add a description that messageFormat will be ignored if it's not consistent with the inter broker protocol setting? |
} catch { | ||
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) | ||
} | ||
appendInfo.lastOffset = offset.get - 1 | ||
// If log append time is used, we put the timestamp assigned to the messages in the append info. | ||
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) | ||
appendInfo.timestamp = now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we don't pass the timestamp as a parameter to analyzeAndValidateMessageSet
? That would mean that timestamp
could be a val
instead of var
. It's a straightforward change, but it would mean that we read config.messageTimestampType
outside the synchronized block. Is that a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma If we do that, it seems possible to cause inconsistency order of message offset and timestamp. For example, message A comes and is stamped t1 by the broker, but before it is appended to the log, message B comes and is stamped t2 (t2 > t1) and gets appended to the log. After that, message A is appended. In this case, message A will have a smaller timestamp but a larger offset than message B, which is a bit confusing.
We can put everything in the synchronized block, but it seems not worth doing if we only want to change a var to a val.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks.
@junrao Thanks for the patient review. I think I have addressed previous comments. Could you take another look? |
convertedMessageSet = fileMessageSet.toMessageFormat(Message.MagicValue_V1) | ||
verifyConvertedMessageSet(convertedMessageSet, Message.MagicValue_V1) | ||
|
||
def verifyConvertedMessageSet(convertedMessageSet: MessageSet, magicByte: Byte) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we cannot add scope modifier to a code block. Compiler gives the following error:
/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala:260: illegal start of statement (no modifiers allowed here)
private def verifyConvertedMessageSet(convertedMessageSet: MessageSet, magicByte: Byte) {
verifyConvertedMessageSet
itself seems private by nature and only accessible in testMessageFormatConversion
.
@becketqin : Thanks for the latest patch. It looks good to me. Once you address the last few minor comments, I can merge this in. |
@becketqin : Thanks a lot for working on the patch! LGTM |
Nice work @becketqin. And the reviewers too. :) |
@junrao @becketqin Some of the streams tests were incorrect when adding the timestamp. For example in ProcessorStateManagerTest:
Actually I'm thinking if it harms to keep the old constructor for ConsumerRecord and make default values of 0L and TimestampType.CREATE_TIME, and revert all the changes in stream tests? That way we can be free of incorporating the metadata timestamp until it is supported. |
@guozhangwang Well-spotted. I actually wanted to suggest moving I would prefer if we don't add the old |
@ijuma Makes sense. The only place streams use |
@guozhangwang I just mean a method like |
@ijuma Sounds good. |
See KIP-31 and KIP-32 for details.
A few notes on the patch: