-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5435: Ensure producer snapshot retained after truncation #3306
Changes from all commits
d43d05f
50c861e
fb483ff
382e2f0
a3fef3e
c9af50f
96c79a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,8 +46,6 @@ import java.util.Map.{Entry => JEntry} | |
import java.lang.{Long => JLong} | ||
import java.util.regex.Pattern | ||
|
||
import org.apache.kafka.common.internals.Topic | ||
|
||
object LogAppendInfo { | ||
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, | ||
NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) | ||
|
@@ -431,29 +429,33 @@ class Log(@volatile var dir: File, | |
private def loadProducerState(lastOffset: Long): Unit = lock synchronized { | ||
info(s"Loading producer state from offset $lastOffset for partition $topicPartition") | ||
|
||
// To avoid expensive initialization when upgrading from older brokers, we skip loading producer | ||
// state if no snapshot file is found. To ensure that we cannot hit this case after upgrading (which | ||
// could cause us to lose producer state), we enforce the invariant that we always have an empty snapshot | ||
// file at the log start offset. | ||
|
||
if (producerStateManager.latestSnapshotOffset.isEmpty) { | ||
// if there are no snapshots to load producer state from, we assume that the brokers are | ||
// being upgraded, which means there would be no previous idempotent/transactional producers | ||
// to load state for. To avoid an expensive scan through all of the segments, we take | ||
// empty snapshots from the start of the last two segments and the last offset. The purpose | ||
// of taking the segment snapshots is to avoid the full scan in the case that the log needs | ||
// truncation. | ||
// There are no snapshots so this is the upgrade path. In addition to taking a snapshot at the log start | ||
// offset to enforce the invariant mentioned above, we take empty snapshots from the start of the last | ||
// two segments and the last offset. The purpose of the additional snapshots is to avoid the full scan in | ||
// the case that the log needs truncation. | ||
val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset) | ||
val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) | ||
offsetsToSnapshot.flatten.foreach { offset => | ||
producerStateManager.updateMapEndOffset(offset) | ||
producerStateManager.takeSnapshot() | ||
} | ||
val offsetsToSnapshot = Seq(Some(logStartOffset), nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) | ||
offsetsToSnapshot.flatten.foreach(producerStateManager.takeEmptySnapshot) | ||
} else { | ||
val currentTimeMs = time.milliseconds | ||
producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs) | ||
val hasProducersBeforeTruncation = !producerStateManager.isEmpty | ||
|
||
// only do the potentially expensive reloading of the last snapshot offset is lower than the | ||
// log end offset (which would be the case on first startup) and there are active producers. | ||
// if there are no active producers, then truncating shouldn't change that fact (although it | ||
// could cause a producerId to expire earlier than expected), so we can skip the loading. | ||
// Ensure we have an empty snapshot at the log start offset to enforce the invariant mentioned above. | ||
// This must be done prior to truncation in case of failure after previous snapshots are removed. | ||
producerStateManager.takeEmptySnapshot(logStartOffset) | ||
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) | ||
|
||
// Only do the potentially expensive reloading if the last snapshot offset is lower than the | ||
// log end offset (which would be the case on first startup) and there were active producers | ||
// prior to truncation. If there weren't, then truncating shouldn't change that fact (although it | ||
// could cause a producerId to expire earlier than expected), and we can skip the loading. | ||
// This is an optimization for users which are not yet using idempotent/transactional features yet. | ||
if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty) { | ||
if (lastOffset > producerStateManager.mapEndOffset && hasProducersBeforeTruncation) { | ||
logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => | ||
val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset) | ||
val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) | ||
|
@@ -471,14 +473,16 @@ class Log(@volatile var dir: File, | |
val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] | ||
val completedTxns = ListBuffer.empty[CompletedTxn] | ||
records.batches.asScala.foreach { batch => | ||
if (batch.hasProducerId) | ||
updateProducers(batch, loadedProducers, completedTxns, loadingFromLog = true) | ||
if (batch.hasProducerId) { | ||
val maybeCompletedTxn = updateProducers(batch, loadedProducers, loadingFromLog = true) | ||
maybeCompletedTxn.foreach(completedTxns += _) | ||
} | ||
} | ||
loadedProducers.values.foreach(producerStateManager.update) | ||
completedTxns.foreach(producerStateManager.completeTxn) | ||
} | ||
|
||
private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized { | ||
private[log] def activeProducers: Map[Long, ProducerIdEntry] = lock synchronized { | ||
producerStateManager.activeProducers | ||
} | ||
|
||
|
@@ -706,6 +710,11 @@ class Log(@volatile var dir: File, | |
lock synchronized { | ||
if (offset > logStartOffset) { | ||
logStartOffset = offset | ||
|
||
// Enforce the invariant that we have an empty snapshot at the log start offset to ensure | ||
// proper loading of producer state upon recovery. | ||
producerStateManager.takeEmptySnapshot(logStartOffset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just realized another potential issue with this. If we delete a log up to an offset in the middle of a segment, we create an empty snapshot on logStartOffset. In loadProducerState(), if we need to rebuild producer state from this snapshot, currently there seems to be 2 issues: (1) logSegments() doesn't seem to take logStartOffset into consideration. So, we may miss the first log segment. (2) If the segment is selected, we should be processing message from logStartOffset and up. |
||
producerStateManager.deleteSnapshotsBefore(logStartOffset) | ||
} | ||
} | ||
} | ||
|
@@ -722,7 +731,8 @@ class Log(@volatile var dir: File, | |
// the last appended entry to the client. | ||
if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch))) | ||
return (updatedProducers, completedTxns.toList, maybeLastEntry) | ||
updateProducers(batch, updatedProducers, completedTxns, loadingFromLog = false) | ||
val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false) | ||
maybeCompletedTxn.foreach(completedTxns += _) | ||
} | ||
(updatedProducers, completedTxns.toList, None) | ||
} | ||
|
@@ -808,12 +818,10 @@ class Log(@volatile var dir: File, | |
|
||
private def updateProducers(batch: RecordBatch, | ||
producers: mutable.Map[Long, ProducerAppendInfo], | ||
completedTxns: ListBuffer[CompletedTxn], | ||
loadingFromLog: Boolean): Unit = { | ||
loadingFromLog: Boolean): Option[CompletedTxn] = { | ||
val producerId = batch.producerId | ||
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, loadingFromLog)) | ||
val maybeCompletedTxn = appendInfo.append(batch) | ||
maybeCompletedTxn.foreach(completedTxns += _) | ||
appendInfo.append(batch) | ||
} | ||
|
||
/** | ||
|
@@ -1050,7 +1058,11 @@ class Log(@volatile var dir: File, | |
deletable.foreach(deleteSegment) | ||
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) | ||
leaderEpochCache.clearAndFlushEarliest(logStartOffset) | ||
producerStateManager.evictUnretainedProducers(logStartOffset) | ||
|
||
// Update the producer state with the new log start offset, which would cause any non-retained producers to | ||
// be evicted. Enforce the invariant that we always have an empty snapshot at the log start offset. | ||
producerStateManager.takeEmptySnapshot(logStartOffset) | ||
producerStateManager.truncateHead(logStartOffset) | ||
updateFirstUnstableOffset() | ||
} | ||
} | ||
|
@@ -1255,10 +1267,10 @@ class Log(@volatile var dir: File, | |
for(segment <- logSegments(this.recoveryPoint, offset)) | ||
segment.flush() | ||
|
||
// now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain | ||
// the snapshots from the recent segments in case we need to truncate and rebuild the producer state. | ||
// Otherwise, we would always need to rebuild from the earliest segment. | ||
producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset)) | ||
// Now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain the | ||
// snapshots from the recent segments in case we need to truncate and rebuild the producer state. Note that | ||
// we still retain the snapshot from the log start offset. | ||
producerStateManager.deleteSnapshotsInRangeExclusive(logStartOffset, minSnapshotOffsetToRetain(offset)) | ||
|
||
lock synchronized { | ||
if(offset > this.recoveryPoint) { | ||
|
@@ -1364,6 +1376,11 @@ class Log(@volatile var dir: File, | |
|
||
producerStateManager.truncate() | ||
producerStateManager.updateMapEndOffset(newOffset) | ||
|
||
// Truncation results in all snapshot files being removed, so take an empty snapshot at the new offset | ||
// to maintain the invariant that we always have a snapshot at the log start offset. | ||
producerStateManager.takeEmptySnapshot(newOffset) | ||
|
||
updateFirstUnstableOffset() | ||
|
||
this.recoveryPoint = math.min(newOffset, this.recoveryPoint) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -376,7 +376,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME | ||
private val producers = mutable.Map.empty[Long, ProducerIdEntry] | ||
private var lastMapOffset = 0L | ||
private var lastSnapOffset = 0L | ||
private var lastSnapOffset = -1L | ||
|
||
// ongoing transactions sorted by the first offset of the transaction | ||
private val ongoingTxns = new util.TreeMap[Long, TxnMetadata] | ||
|
@@ -444,7 +444,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
Files.deleteIfExists(file.toPath) | ||
} | ||
case None => | ||
lastSnapOffset = logStartOffset | ||
lastSnapOffset = -1L | ||
lastMapOffset = logStartOffset | ||
return | ||
} | ||
|
@@ -472,17 +472,24 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
} | ||
} | ||
|
||
private def inRangeInclusive(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = { | ||
val offset = offsetFromFilename(snapshotFile.getName) | ||
offset >= startOffset && offset <= endOffset | ||
} | ||
|
||
private def inRangeExclusive(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps inRangeExclusive and inRangeInclusive could just be outOfRange and inRange? |
||
val offset = offsetFromFilename(snapshotFile.getName) | ||
offset > startOffset && offset < endOffset | ||
} | ||
|
||
/** | ||
* Truncate the producer id mapping to the given offset range and reload the entries from the most recent | ||
* snapshot in range (if there is one). Note that the log end offset is assumed to be less than | ||
* or equal to the high watermark. | ||
*/ | ||
def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { | ||
// remove all out of range snapshots | ||
deleteSnapshotFiles { file => | ||
val offset = offsetFromFilename(file.getName) | ||
offset > logEndOffset || offset <= logStartOffset | ||
} | ||
deleteSnapshotFiles(!inRangeInclusive(_, logStartOffset, logEndOffset)) | ||
|
||
if (logEndOffset != mapEndOffset) { | ||
producers.clear() | ||
|
@@ -493,7 +500,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
unreplicatedTxns.clear() | ||
loadFromSnapshot(logStartOffset, currentTimeMs) | ||
} else { | ||
evictUnretainedProducers(logStartOffset) | ||
truncateHead(logStartOffset) | ||
} | ||
} | ||
|
||
|
@@ -541,6 +548,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
} | ||
} | ||
|
||
def takeEmptySnapshot(offset: Long) = { | ||
val snapshotFile = Log.producerSnapshotFile(logDir, offset) | ||
debug(s"Writing empty producer snapshot for partition $topicPartition at offset $offset") | ||
writeSnapshot(snapshotFile, mutable.Map.empty) | ||
} | ||
|
||
/** | ||
* Get the last offset (exclusive) of the latest snapshot file. | ||
*/ | ||
|
@@ -553,17 +566,18 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
|
||
/** | ||
* When we remove the head of the log due to retention, we need to clean up the id map. This method takes | ||
* the new start offset and removes all producerIds which have a smaller last written offset. | ||
* the new start offset and removes all producerIds which have a smaller last written offset. Additionally, | ||
* all snapshot files at offsets strictly lower than the log start offset will be removed. | ||
*/ | ||
def evictUnretainedProducers(logStartOffset: Long) { | ||
def truncateHead(logStartOffset: Long) { | ||
val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset) | ||
val evictedProducerIds = evictedProducerEntries.keySet | ||
|
||
producers --= evictedProducerIds | ||
removeEvictedOngoingTransactions(evictedProducerIds) | ||
removeUnreplicatedTransactions(logStartOffset) | ||
|
||
deleteSnapshotFiles(file => offsetFromFilename(file.getName) <= logStartOffset) | ||
deleteSnapshotsBefore(logStartOffset) | ||
if (lastMapOffset < logStartOffset) | ||
lastMapOffset = logStartOffset | ||
lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) | ||
|
@@ -596,7 +610,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
ongoingTxns.clear() | ||
unreplicatedTxns.clear() | ||
deleteSnapshotFiles() | ||
lastSnapOffset = 0L | ||
lastSnapOffset = -1L | ||
lastMapOffset = 0L | ||
} | ||
|
||
|
@@ -620,6 +634,11 @@ class ProducerStateManager(val topicPartition: TopicPartition, | |
deleteSnapshotFiles(file => offsetFromFilename(file.getName) < offset) | ||
} | ||
|
||
@threadsafe | ||
def deleteSnapshotsInRangeExclusive(startOffset: Long, endOffset: Long): Unit = { | ||
deleteSnapshotFiles(inRangeExclusive(_, startOffset, endOffset)) | ||
} | ||
|
||
private def listSnapshotFiles: List[File] = { | ||
if (logDir.exists && logDir.isDirectory) | ||
logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not from this PR, but it seems like
updateProducers
should just return the result fromappendInfo.append(batch)
instead of taking a list to append to (from a simplicity perspective).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's fair.