-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3163: Add time based index to Kafka. #1215
Conversation
It seems this conflicts with KAFKA-3510. I'll do the rebase. |
cc @junrao. Will you be able to help review this patch? Thanks! |
val segment = new LogSegment(dir = dir, | ||
startOffset = start, | ||
indexIntervalBytes = config.indexInterval, | ||
maxIndexSize = config.maxIndexSize, | ||
rollJitterMs = config.randomSegmentJitter, | ||
time = time, | ||
fileAlreadyExists = true) | ||
|
||
if(indexFile.exists()) { | ||
if (indexFile.exists() && timeIndexFile.exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question regarding the code here. On line 199 of this patch (line 192 of the original code), we check if the index file exists or not. However, right before this, we have already created the log segment which will create the index file if it does not exist. So it seems the test here will never be false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. It seems that we should check the existence of the index file before creating LogSegment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we check the existence of the index files before creating log segment, would it be a little difficult to distinguish between 1) the upgrade case and 2) time index file is really missing? In (1), we want to just create an empty time index without rebuilding the time index. In (2), we want to rebuild the entire time index.
I am wondering in which case will we miss a index? Is it only when the index is deleted manually?
@becketqin : Thanks. Could you confirm the number of log dirs you have and the number of disks per dir? Intuitively, if there are multiple disks per dir, it seems using more than one 1 recovery thread should lead to better performance. |
@junrao We only have one log directory on a RAID 10 containing 10 disk. So in our case, multiple thread might not help. In a JBOD environment, I agree that multiple threads would probably help. |
@becketqin : Interesting, in theory, using more than 1 thread should still help since those threads can drive the I/Os on different disks in parallel. |
@junrao I am not sure, one thing I noticed is that when there is one thread, the log loading time of each partition is pretty linear to the number of log segments. But if multiple threads are used, the linearity goes away. For example, the following logs are from running the code using 10 log recovery threads without time index ( For TOPIC1, it only took ~400 ms to load about 2000 log segments, at about the same time (300 ms earlier) TOPIC2 takes 22 ms to load ~70 log segments. However, later on the same run (3 seconds later), for some other partitions of TOPIC2 that has about the same number of segments, it takes much longer (~400 ms) to load. This was the reason that I suspected that the later mmap() calls are more expensive, which seems also proven from profiling by measuring the time on mmap calls in different buckets. I am not sure why this happens, but I didn't see such issue when there is one log loading thread.
|
<h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5> | ||
<ul> | ||
<li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li> | ||
<li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp of the first message in a log segment. i.e. if the timestmp of the first message in the segment is T, the log will be rolled out at T + log.roll.ms </li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo timestmp
@becketqin : Thanks for the latest patch. I made another pass and only had a few minor comments. Once those are addressed, I can merge in the patch. |
@junrao Thanks a lot for all the patient reviews. I updated the patch to address you latest comments and added a few unit tests. Thanks again. |
Thanks for the patch. LGTM |
if (iter.hasNext) | ||
rollingBasedTimestamp = Some(iter.next.message.timestamp) | ||
else | ||
// If the log is empty, we return 0 as time waited. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is no longer valid. We can probably just remove it.
@becketqin : Thanks a lot for working on this diligently. A couple of followups.
|
@junrao Thanks a lot for the patient review. I will submit a follow up PR soon. And yes, I will start working on the KIP to add seekToTimestamp() API now and post the wiki this week. |
@becketqin, it would be good to update the Kafka documentation to mention the time index. Maybe an easy way is to search it for offset index and see if it makes sense to mention the time index in those cases as well. Would you be willing to file a JIRA and take this on as well? |
This patch is for KIP-33.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index