Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5435: Ensure producer snapshot retained after truncation #3306

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 36 additions & 22 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ import java.util.Map.{Entry => JEntry}
import java.lang.{Long => JLong}
import java.util.regex.Pattern

import org.apache.kafka.common.internals.Topic

object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
Expand Down Expand Up @@ -431,24 +429,26 @@ class Log(@volatile var dir: File,
private def loadProducerState(lastOffset: Long): Unit = lock synchronized {
info(s"Loading producer state from offset $lastOffset for partition $topicPartition")

// To avoid expensive initialization when upgrading from older brokers, we skip loading producer
// state if no snapshot file is found. To ensure that we cannot hit this case after upgrading (which
// could cause us to lose producer state), we enforce the invariant that we always have an empty snapshot
// file at the log start offset.

if (producerStateManager.latestSnapshotOffset.isEmpty) {
// if there are no snapshots to load producer state from, we assume that the brokers are
// being upgraded, which means there would be no previous idempotent/transactional producers
// to load state for. To avoid an expensive scan through all of the segments, we take
// empty snapshots from the start of the last two segments and the last offset. The purpose
// of taking the segment snapshots is to avoid the full scan in the case that the log needs
// truncation.
// There are no snapshots so this is the upgrade path. In addition to taking a snapshot at the log start
// offset to enforce the invariant mentioned above, we take empty snapshots from the start of the last
// two segments and the last offset. The purpose of the additional snapshots is to avoid the full scan in
// the case that the log needs truncation.
val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset)
val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset))
offsetsToSnapshot.flatten.foreach { offset =>
producerStateManager.updateMapEndOffset(offset)
producerStateManager.takeSnapshot()
}
val offsetsToSnapshot = Seq(Some(logStartOffset), nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset))
offsetsToSnapshot.flatten.foreach(producerStateManager.takeEmptySnapshot)
} else {
val currentTimeMs = time.milliseconds
producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
// Ensure we have an empty snapshot at the log start offset to enforce the invariant mentioned above.
// This must be done prior to truncation in case of failure after previous snapshots are removed.
producerStateManager.takeEmptySnapshot(logStartOffset)
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())

// only do the potentially expensive reloading of the last snapshot offset is lower than the
// Only do the potentially expensive reloading if the last snapshot offset is lower than the
// log end offset (which would be the case on first startup) and there are active producers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description says that we do the reloading if both conditions are true. However, the if statement check succeeds if either of the checks is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. Looking at this check, it seems a little wrong. The idea was to skip loading if there were no producers at the end of the log, but that is only useful if you consider the state of the producers prior to truncation. Let me try to sort this out.

// if there are no active producers, then truncating shouldn't change that fact (although it
// could cause a producerId to expire earlier than expected), so we can skip the loading.
Expand Down Expand Up @@ -478,7 +478,7 @@ class Log(@volatile var dir: File,
completedTxns.foreach(producerStateManager.completeTxn)
}

private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized {
private[log] def activeProducers: Map[Long, ProducerIdEntry] = lock synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not from this PR, but it seems like updateProducers should just return the result from appendInfo.append(batch) instead of taking a list to append to (from a simplicity perspective).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's fair.

producerStateManager.activeProducers
}

Expand Down Expand Up @@ -706,6 +706,11 @@ class Log(@volatile var dir: File,
lock synchronized {
if (offset > logStartOffset) {
logStartOffset = offset

// Enforce the invariant that we have an empty snapshot at the log start offset to ensure
// proper loading of producer state upon recovery.
producerStateManager.takeEmptySnapshot(logStartOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized another potential issue with this. If we delete a log up to an offset in the middle of a segment, we create an empty snapshot on logStartOffset. In loadProducerState(), if we need to rebuild producer state from this snapshot, currently there seems to be 2 issues: (1) logSegments() doesn't seem to take logStartOffset into consideration. So, we may miss the first log segment. (2) If the segment is selected, we should be processing message from logStartOffset and up.

producerStateManager.deleteSnapshotsBefore(logStartOffset)
}
}
}
Expand Down Expand Up @@ -1050,7 +1055,11 @@ class Log(@volatile var dir: File,
deletable.foreach(deleteSegment)
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
leaderEpochCache.clearAndFlushEarliest(logStartOffset)
producerStateManager.evictUnretainedProducers(logStartOffset)

// Update the producer state with the new log start offset, which we cause any non-retained producers to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we delete a segment first and then delete the snapshot with the start offset of that segment later in producerStateManager.truncateHead(). I am wondering if we need to tighten up ProducerStateManager.truncateAndReload() a bit. In that method, if the latest snapshot's offset doesn't have a corresponding log segment, using that snapshot could lead to incorrect producer state after recovery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. In truncateAndReload, we first delete any snapshots which are out of range. Is there a case where the start and end offsets passed to truncateAndReload would be out of sync with the log segments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, missed that. So this should work then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"which we cause" -> "which would cause"?

// be evicted. Enforce the invariant that we always have an empty snapshot at the log start offset.
producerStateManager.takeEmptySnapshot(logStartOffset)
producerStateManager.truncateHead(logStartOffset)
updateFirstUnstableOffset()
}
}
Expand Down Expand Up @@ -1255,10 +1264,10 @@ class Log(@volatile var dir: File,
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()

// now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain
// the snapshots from the recent segments in case we need to truncate and rebuild the producer state.
// Otherwise, we would always need to rebuild from the earliest segment.
producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset))
// Now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain the
// snapshots from the recent segments in case we need to truncate and rebuild the producer state. Note that
// we still retain the snapshot from the log start offset.
producerStateManager.deleteSnapshotsInRangeExclusive(logStartOffset, minSnapshotOffsetToRetain(offset))

lock synchronized {
if(offset > this.recoveryPoint) {
Expand Down Expand Up @@ -1364,6 +1373,11 @@ class Log(@volatile var dir: File,

producerStateManager.truncate()
producerStateManager.updateMapEndOffset(newOffset)

// Truncation results in all snapshot files being removed, so take an empty snapshot at the new offset
// to maintain the invariant that we always have a snapshot at the log start offset.
producerStateManager.takeEmptySnapshot(newOffset)

updateFirstUnstableOffset()

this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private[log] class Cleaner(val id: Int,
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(startOffset, log.name, new Date(oldSegmentOpt.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicPartition, oldSegmentOpt, cleaned, map, retainDeletes, log.config.maxMessageSize, transactionMetadata,
log.activePids, stats)
log.activeProducers, stats)

currentSegmentOpt = nextSegmentOpt
}
Expand Down
41 changes: 30 additions & 11 deletions core/src/main/scala/kafka/log/ProducerStateManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
private val producers = mutable.Map.empty[Long, ProducerIdEntry]
private var lastMapOffset = 0L
private var lastSnapOffset = 0L
private var lastSnapOffset = -1L

// ongoing transactions sorted by the first offset of the transaction
private val ongoingTxns = new util.TreeMap[Long, TxnMetadata]
Expand Down Expand Up @@ -444,7 +444,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
Files.deleteIfExists(file.toPath)
}
case None =>
lastSnapOffset = logStartOffset
lastSnapOffset = -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 => -1L ?

lastMapOffset = logStartOffset
return
}
Expand Down Expand Up @@ -472,17 +472,24 @@ class ProducerStateManager(val topicPartition: TopicPartition,
}
}

private def inRangeInclusive(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = {
val offset = offsetFromFilename(snapshotFile.getName)
offset >= startOffset && offset <= endOffset
}

private def inRangeExclusive(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps inRangeExclusive and inRangeInclusive could just be outOfRange and inRange?

val offset = offsetFromFilename(snapshotFile.getName)
offset > startOffset && offset < endOffset
}

/**
* Truncate the producer id mapping to the given offset range and reload the entries from the most recent
* snapshot in range (if there is one). Note that the log end offset is assumed to be less than
* or equal to the high watermark.
*/
def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
// remove all out of range snapshots
deleteSnapshotFiles { file =>
val offset = offsetFromFilename(file.getName)
offset > logEndOffset || offset <= logStartOffset
}
deleteSnapshotFiles(!inRangeInclusive(_, logStartOffset, logEndOffset))

if (logEndOffset != mapEndOffset) {
producers.clear()
Expand All @@ -493,7 +500,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
unreplicatedTxns.clear()
loadFromSnapshot(logStartOffset, currentTimeMs)
} else {
evictUnretainedProducers(logStartOffset)
truncateHead(logStartOffset)
}
}

Expand Down Expand Up @@ -541,6 +548,12 @@ class ProducerStateManager(val topicPartition: TopicPartition,
}
}

def takeEmptySnapshot(offset: Long) = {
val snapshotFile = Log.producerSnapshotFile(logDir, offset)
debug(s"Writing empty producer snapshot for partition $topicPartition at offset $offset")
writeSnapshot(snapshotFile, mutable.Map.empty)
}

/**
* Get the last offset (exclusive) of the latest snapshot file.
*/
Expand All @@ -553,17 +566,18 @@ class ProducerStateManager(val topicPartition: TopicPartition,

/**
* When we remove the head of the log due to retention, we need to clean up the id map. This method takes
* the new start offset and removes all producerIds which have a smaller last written offset.
* the new start offset and removes all producerIds which have a smaller last written offset. Additionally,
* all snapshot files at offsets strictly lower than the log start offset will be removed.
*/
def evictUnretainedProducers(logStartOffset: Long) {
def truncateHead(logStartOffset: Long) {
val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset)
val evictedProducerIds = evictedProducerEntries.keySet

producers --= evictedProducerIds
removeEvictedOngoingTransactions(evictedProducerIds)
removeUnreplicatedTransactions(logStartOffset)

deleteSnapshotFiles(file => offsetFromFilename(file.getName) <= logStartOffset)
deleteSnapshotsBefore(logStartOffset)
if (lastMapOffset < logStartOffset)
lastMapOffset = logStartOffset
lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
Expand Down Expand Up @@ -596,7 +610,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
ongoingTxns.clear()
unreplicatedTxns.clear()
deleteSnapshotFiles()
lastSnapOffset = 0L
lastSnapOffset = -1L
lastMapOffset = 0L
}

Expand All @@ -620,6 +634,11 @@ class ProducerStateManager(val topicPartition: TopicPartition,
deleteSnapshotFiles(file => offsetFromFilename(file.getName) < offset)
}

@threadsafe
def deleteSnapshotsInRangeExclusive(startOffset: Long, endOffset: Long): Unit = {
deleteSnapshotFiles(inRangeExclusive(_, startOffset, endOffset))
}

private def listSnapshotFiles: List[File] = {
if (logDir.exists && logDir.isDirectory)
logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList
Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class LogManagerTest {
assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)

// there should be a log file, two indexes, and the leader epoch checkpoint
assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
// there should be a log file, two indexes, one empty producer snapshot, and the leader epoch checkpoint
assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 1, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset+1, 1024).records.sizeInBytes)

try {
Expand All @@ -132,7 +132,7 @@ class LogManagerTest {
val config = LogConfig.fromProps(logConfig.originals, logProps)

logManager = createLogManager()
logManager.startup
logManager.startup()

// create a log
val log = logManager.createLog(new TopicPartition(name, 0), config)
Expand All @@ -154,8 +154,8 @@ class LogManagerTest {
time.sleep(log.config.fileDeleteDelayMs + 1)

// there should be a log file, two indexes (the txn index is created lazily),
// the leader epoch checkpoint and two pid mapping files (one for the active and previous segments)
assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length)
// the leader epoch checkpoint and three pid mapping files (one for active and previous segments and the log start offset)
assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 4, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes)
try {
log.readUncommitted(0, 1024)
Expand Down