-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-1981 Make log compaction point configurable #1794
Conversation
@junrao could you please take a look at this new PR (I closed the prior one). I switched to using the time index to determine the last message time in the LogSegments as we discussed. |
* "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning. | ||
* "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section. | ||
* The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a | ||
* compaction lag time set, segments whose last modification date is within the compaction lag time of the cleaning operation are also uncleanable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer accurate. It should be "segments whose largest timestamp".
@ewasserman : Sorry for the late review. I made a pass and left some comments. |
4d72464
to
e05b636
Compare
@junrao Thanks for the review. Hope this addresses all the issues you found. |
} | ||
|
||
// dirty log segments | ||
val dirtySegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset).toArray :+ log.activeSegment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add log.activeSegment here since we already consider log.activeSegment when computing firstUncleanableDirtyOffset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to only include non-active dirty segments
} | ||
|
||
/* create a cleaner instance and logs with the given parameters */ | ||
def makeCleaner(parts: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be private?
@ewasserman : Thanks for the updated patch. Left a few more minor comments. |
e05b636
to
7cd9d9d
Compare
7cd9d9d
to
72f18c0
Compare
@ewasserman : Thanks for the patch. LGTM. |
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable") | ||
isUncleanable | ||
} map(_.baseOffset) | ||
} else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this has already been merged, but this code seems a bit too dense. Maybe we could extract some of it to an inner def in a clean-up PR?
Now uses LogSegment.largestTimestamp to determine age of segment's messages.