Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ private[log] class Cleaner(val id: Int,
val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset)
for (group <- groupedSegments)
cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs)
cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset)

// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
Expand All @@ -645,14 +645,16 @@ private[log] class Cleaner(val id: Int,
* @param transactionMetadata State of ongoing transactions which is carried between the cleaning
* of the grouped segments
* @param legacyDeleteHorizonMs The delete horizon used for tombstones whose version is less than 2
* @param upperBoundOffsetOfCleaningRound The upper bound offset of this round of cleaning
*/
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
map: OffsetMap,
currentTime: Long,
stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata,
legacyDeleteHorizonMs: Long): Unit = {
legacyDeleteHorizonMs: Long,
upperBoundOffsetOfCleaningRound: Long): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
Expand Down Expand Up @@ -682,7 +684,7 @@ private[log] class Cleaner(val id: Int,

try {
cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainLegacyDeletesAndTxnMarkers, log.config.deleteRetentionMs,
log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, stats, currentTime = currentTime)
log.config.maxMessageSize, transactionMetadata, lastOffsetOfActiveProducers, upperBoundOffsetOfCleaningRound, stats, currentTime = currentTime)
} catch {
case e: LogSegmentOffsetOverflowException =>
// Split the current segment. It's also safest to abort the current cleaning process, so that we retry from
Expand Down Expand Up @@ -728,6 +730,7 @@ private[log] class Cleaner(val id: Int,
* @param maxLogMessageSize The maximum message size of the corresponding topic
* @param transactionMetadata The state of ongoing transactions which is carried between the cleaning of the grouped segments
* @param lastRecordsOfActiveProducers The active producers and its last data offset
* @param upperBoundOffsetOfCleaningRound Next offset of the last batch in the source segment
* @param stats Collector for cleaning statistics
* @param currentTime The time at which the clean was initiated
*/
Expand All @@ -740,6 +743,7 @@ private[log] class Cleaner(val id: Int,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
lastRecordsOfActiveProducers: mutable.Map[Long, LastRecord],
upperBoundOffsetOfCleaningRound: Long,
stats: CleanerStats,
currentTime: Long): Unit = {
val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, deleteRetentionMs) {
Expand Down Expand Up @@ -774,7 +778,11 @@ private[log] class Cleaner(val id: Int,
val batchRetention: BatchRetention =
if (batch.hasProducerId && isBatchLastRecordOfProducer)
BatchRetention.RETAIN_EMPTY
else if (discardBatchRecords)
else if (batch.nextOffset == upperBoundOffsetOfCleaningRound) {
// retain the last batch of the cleaning round, even if it's empty, so that last offset information
// is not lost after cleaning.
BatchRetention.RETAIN_EMPTY
} else if (discardBatchRecords)
BatchRetention.DELETE
else
BatchRetention.DELETE_EMPTY
Expand Down
67 changes: 50 additions & 17 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class LogCleanerTest extends Logging {
val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
}
Expand Down Expand Up @@ -256,7 +256,7 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata, -1, segments.last.readNextOffset)

// Validate based on the file name that log segment file is renamed exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath)
Expand Down Expand Up @@ -423,7 +423,7 @@ class LogCleanerTest extends Logging {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats(time)
cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset, offsetMap, stats)
cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue)
cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue, segments.last.readNextOffset)
dirtyOffset = offsetMap.latestOffset + 1
}

Expand Down Expand Up @@ -925,7 +925,7 @@ class LogCleanerTest extends Logging {

// clean the log
val stats = new CleanerStats()
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L, stats, new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), map, 0L, stats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
Expand All @@ -938,7 +938,7 @@ class LogCleanerTest extends Logging {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)

val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
Expand All @@ -957,7 +957,7 @@ class LogCleanerTest extends Logging {

val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
)
}

Expand All @@ -974,7 +974,7 @@ class LogCleanerTest extends Logging {

val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
)
}

Expand Down Expand Up @@ -1356,12 +1356,42 @@ class LogCleanerTest extends Logging {
val keys = LogTestUtils.keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
val segments = log.logSegments.asScala.take(3).toSeq
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, log.logSegments.asScala.take(3).toSeq, map, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, segments, map, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
)
}

@Test
def testCleanSegmentsRetainingLastEmptyBatch(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))

// append messages to the log until we have four segments
while (log.numberOfSegments < 4)
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
val keysFound = LogTestUtils.keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)

// pretend all keys are deleted
val map = new FakeOffsetMap(Int.MaxValue)
keysFound.foreach(k => map.put(key(k), Long.MaxValue))

// clean the log
val segments = log.logSegments.asScala.take(3).toSeq
val stats = new CleanerStats()
cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1, segments.last.readNextOffset)
assertEquals(2, log.logSegments.size)
assertEquals(1, log.logSegments.asScala.head.log.batches.asScala.size, "one batch should be retained in the cleaned segment")
val retainedBatch = log.logSegments.asScala.head.log.batches.asScala.head
assertEquals(log.logSegments.asScala.last.baseOffset - 1, retainedBatch.lastOffset, "the retained batch should be the last batch")
assertFalse(retainedBatch.iterator.hasNext, "the retained batch should be an empty batch")
}

/**
* Validate the logic for grouping log segments together for cleaning
*/
Expand Down Expand Up @@ -1618,16 +1648,17 @@ class LogCleanerTest extends Logging {
// Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
assertEquals(allKeys, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))

// Clean each segment now that split is complete.
val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
for (segmentToClean <- log.logSegments.asScala)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, upperBoundOffset)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
log.close()
Expand Down Expand Up @@ -1666,9 +1697,11 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)

val upperBoundOffset = log.activeSegment.baseOffset

// clean the log
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
var cleanedKeys = LogTestUtils.keysInLog(log)
Expand All @@ -1684,7 +1717,7 @@ class LogCleanerTest extends Logging {

// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
Expand All @@ -1701,7 +1734,7 @@ class LogCleanerTest extends Logging {

// clean again
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
Expand All @@ -1723,7 +1756,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
Expand All @@ -1741,7 +1774,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
Expand All @@ -1759,7 +1792,7 @@ class LogCleanerTest extends Logging {
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.asScala.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
new CleanedTransactionMetadata, -1, upperBoundOffset)
// clear scheduler so that async deletes don't run
time.scheduler.clear()
cleanedKeys = LogTestUtils.keysInLog(log)
Expand Down