Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9157: Avoid generating empty segments if all records are deleted after cleaning #7711

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

huxihx
Copy link
Contributor

@huxihx huxihx commented Nov 19, 2019

https://issues.apache.org/jira/browse/KAFKA-9157

If all records are deleted after cleaning, we should avoid generating empty log segments.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@huxihx
Copy link
Contributor Author

huxihx commented Nov 20, 2019

@junrao Please review this patch. Thanks.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huxihx : Thanks for the PR. Left a few comments below.

log.appendAsLeader(record(key = 0, value = log.logEndOffset.toInt), leaderEpoch = 0)

cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
assertEquals("Empty segment(s) should not be created after cleaning.", 2, log.numberOfSegments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this test exposes the existing issue. Even without this patch, the cleaner will collapse the first 3 segments into one segment (with 1 record in it). To expose this issue, we have to force multiple log segments to be generated in the old logic. One way to do that is to generate log segments that are more than 2 billion apart in offsets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this patch, this test will generate 4 segments. The first two segments are empty. The latter ones are non-empty.

log.replaceSegments(List(), segments)
} else {
info(s"Swapping in cleaned segment ${cleanedOpt.get} for segment(s) $segments in log $log")
log.replaceSegments(List(cleanedOpt.get), segments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the changes here are more significant than I expected. I am wondering if they are truly necessary. For example, could we just keep the existing logic and check the size of cleaned segment? If the cleaned segment has size of 0, then we can replace the existing segments with an empty list and delete cleaned.

@@ -2269,7 +2269,7 @@ class Log(@volatile var dir: File,
// delete the old files
for (seg <- sortedOldSegments) {
// remove the index entry
if (seg.baseOffset != sortedNewSegments.head.baseOffset)
if (sortedNewSegments.isEmpty || seg.baseOffset != sortedNewSegments.head.baseOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, we may need to advance logStartOffset.

@huxihx
Copy link
Contributor Author

huxihx commented Dec 4, 2019

@junrao Thanks for the response. I was thinking an option of not creating empty cleaned segments at all other than first creating them then deleting. If you prefer the latter one, that looks good to me as well.

@huxihx
Copy link
Contributor Author

huxihx commented Dec 5, 2019

retest this please.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huxihx : Thanks for the update PR. A few more comments below.

@@ -2269,7 +2269,7 @@ class Log(@volatile var dir: File,
// delete the old files
for (seg <- sortedOldSegments) {
// remove the index entry
if (seg.baseOffset != sortedNewSegments.head.baseOffset)
if (sortedNewSegments.isEmpty || seg.baseOffset != sortedNewSegments.head.baseOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first check seems unnecessary now since sortedNewSegments is never empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more like a defensive coding pattern to avoid NoSuchElementException. Do you think it deserves to be here?

log.activeSegment.baseOffset else allNonEmptyCleanedSegments.head.baseOffset
log.maybeIncrementHighWatermark(LogOffsetMetadata(baseOffsetOfFirstNonEmptySegment))
log.maybeIncrementLogStartOffset(baseOffsetOfFirstNonEmptySegment)
log.removeAndDeleteSegments(log.nonActiveLogSegmentsFrom(-1L).filter(_.size == 0), true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to do the above logic while holding a lock of the Log to prevent any concurrent log segment changes. Could we move the above logic inside Log.replaceSegments()?


assertTrue(distinctValuesBySegmentAfterClean.size == distinctValuesBySegmentBeforeClean.size - 1)
distinctValuesBySegmentAfterClean = 0 +: distinctValuesBySegmentAfterClean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the need to patch the logic here. However, the way it's done seems a bit ad hoc. Could we just change the logic to get the distinct values for a particular offset range?

@huxihx
Copy link
Contributor Author

huxihx commented Dec 17, 2019

retest this please.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huxihx : Thanks for the updated patch. A couple of more suggestions below.


val (emptyCleanedSegments, nonEmptyCleanedSegments) = sortedNewSegments.partition(_.size == 0)

if (emptyCleanedSegments.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we expect that if there is an empty segment, it would be the only segment in newSegments. Perhaps we could throw an IllegalArgumentException if that's not the case.

if (emptyCleanedSegments.head.baseOffset == baseOffset) {
val newLogStartOffset = Math.max(logStartOffset, segments.higherKey(baseOffset))
if (highWatermark < newLogStartOffset)
maybeIncrementHighWatermark(LogOffsetMetadata(newLogStartOffset))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need this. Currently, the log cleaner never cleans beyond the HWM. If a bug causes the logStartOffset to be larger than HWM, we can just rely on the assertion in maybeIncrementLogStartOffset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several test cases did not bump the HWM when appending records. The patch will fail such cases. That's why I also update HWM here. Is that okay we could refine those cases then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in those test cases, should the HWM be higher to start with? Or is there a legitimate case that the HWM needs to be advanced when log segments are swapped? If it's the former, we will want to patch the test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests will update HWM to LEO when they append new records, and some won't. Prior this patch, we don't care about the HWM thing since log start offset does not advance after cleaning. However, with this patch applied, things changed. I think it'd better update HWM to LEO as well for those failed tests. Does it make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huxihx : Thanks for the reply. In general, HWM is supposed to be changed only at the ReplicaManager level since it can only advance if all in-sync replicas have received the records. At the Log layer, we only know the local replica and shouldn't be advancing the HWM directly. That's why I was suggesting to fix the tests instead of changing HWM here so that the tests could pass.

@huxihx
Copy link
Contributor Author

huxihx commented Dec 20, 2019

retest this please.

@huxihx
Copy link
Contributor Author

huxihx commented Dec 23, 2019

retest this please.

@huxihx
Copy link
Contributor Author

huxihx commented Dec 30, 2019

@junrao Please review again. Thanks.

@huxihx
Copy link
Contributor Author

huxihx commented Dec 30, 2019

retest this please.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huxihx : Sorry for the delay. Thanks for the new update. Added a few more comments below.

@@ -2232,6 +2232,8 @@ class Log(@volatile var dir: File,
* <li> Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments().
* If broker crashes at this point, the clean-and-swap operation is aborted and
* the .cleaned files are deleted on recovery in loadSegments().
* <li> If the new cleaned segments contain empty(zero-sized) ones, replaceSegments() tries to update high watermark and log start offset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment needs to be adjusted since we are no longer updating the high watermark.

@@ -2254,7 +2256,21 @@ class Log(@volatile var dir: File,
*/
private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
lock synchronized {
if (newSegments.size > 1 && newSegments.exists(_.size == 0))
throw new IllegalArgumentException(s"Expected only one cleaned segment should be created after cleaning, but we got $newSegments")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaceSegments() is called during log recovery too. So, we want to write the error message in a more general way. Sth like "There should be only one new log segment if any of the new segment is empty, but we got $newSegments instead".

if (emptyCleanedSegments.nonEmpty) {
val baseOffset = segments.firstKey()
if (emptyCleanedSegments.head.baseOffset == baseOffset) {
val newLogStartOffset = Math.max(logStartOffset, segments.higherKey(baseOffset))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this doesn't seem quite right. There could be multiple segments in oldSegments that the new segment is trying to replace. In that case, we want to get the higher key of the last old segment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Sorry for the long delay. Will take care of the comments ASAP.

@huxihx
Copy link
Contributor Author

huxihx commented Feb 11, 2020

@junrao Correct me if I am wrong. The key problem here is whether we always update log start offset if an empty cleaned segment is generated. IMO, the update should be conditional. In some cases we do not need to do that.

Say we have three sorted segments, A, B and C which A is not empty after cleaning. When cleaner is doing cleaning for B, and the generated cleaned segment for B is empty. In such a case, we should not update the log start offset since A is not empty. That's actually the case where LogCleanerParameterizedIntegrationTest.cleanerTest wants to test. Does it make sense?

@junrao
Copy link
Contributor

junrao commented Feb 11, 2020

@huxihx : I was thinking about the following case. Suppose that we have 4 segments A, B, C and D with starting offset 100, 200, 300 and 400. The firstDirty offset is at 400. We load segment D to build the offset map. During cleaning, segment A, B and C are grouped together and are cleaned into segment A', which is empty. After replacing segment A, B and C with segment A', it seems that the log start offset should be 400, instead of 200?

@huxihx
Copy link
Contributor Author

huxihx commented Feb 14, 2020

retest this please.

@junrao
Copy link
Contributor

junrao commented Feb 20, 2020

ok to test

@junrao
Copy link
Contributor

junrao commented Feb 20, 2020

retest this please.

@junrao
Copy link
Contributor

junrao commented Feb 20, 2020

test this please

@junrao
Copy link
Contributor

junrao commented Feb 20, 2020

retest this please

@junrao
Copy link
Contributor

junrao commented Feb 21, 2020

@huxihx : The following test failure seems related to this PR.

14:24:07 kafka.log.LogCleanerIntegrationTest > testMaxLogCompactionLag FAILED
14:24:07     java.lang.AssertionError: log cleaner should have processed at least to offset 294, but lastCleaned=0
14:24:07         at org.junit.Assert.fail(Assert.java:89)
14:24:07         at org.junit.Assert.assertTrue(Assert.java:42)
14:24:07         at kafka.log.LogCleanerIntegrationTest.testMaxLogCompactionLag(LogCleanerIntegrationTest.scala:162)

@huxihx
Copy link
Contributor Author

huxihx commented Jul 7, 2020

@junrao Sorry for the long delay. Please review this patch again.

@huxihx
Copy link
Contributor Author

huxihx commented Jul 24, 2020

retest this please

@huxihx
Copy link
Contributor Author

huxihx commented Aug 17, 2020

@junrao Could you take some time to review this patch? Thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants