Skip to content

Commit

Permalink
KAFKA-5435: Ensure producer snapshot retained after truncation
Browse files Browse the repository at this point in the history
  • Loading branch information
hachikuji committed Jun 13, 2017
1 parent aea5310 commit d43d05f
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 54 deletions.
31 changes: 25 additions & 6 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ class Log(@volatile var dir: File,
info(s"Loading producer state from offset $lastOffset for partition $topicPartition")

if (producerStateManager.latestSnapshotOffset.isEmpty) {
// if there are no snapshots to load producer state from, we assume that the brokers are
// If there are no snapshots to load producer state from, we assume that the brokers are
// being upgraded, which means there would be no previous idempotent/transactional producers
// to load state for. To avoid an expensive scan through all of the segments, we take
// empty snapshots from the start of the last two segments and the last offset. The purpose
Expand All @@ -445,10 +445,14 @@ class Log(@volatile var dir: File,
producerStateManager.takeSnapshot()
}
} else {
val currentTimeMs = time.milliseconds
producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)

// only do the potentially expensive reloading of the last snapshot offset is lower than the
// Since the oldest snapshot will be removed after truncation, we need to take an empty snapshot
// at the log start offset to ensure that we cannot reach the optimization path above if the
// broker fails after out-of-range snapshots are removed below. Otherwise, we would incorrectly assume
// that no producer data exists in the log.
ensureSnapshotRetained(logStartOffset, lastOffset)
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())

// Only do the potentially expensive reloading of the last snapshot offset is lower than the
// log end offset (which would be the case on first startup) and there are active producers.
// if there are no active producers, then truncating shouldn't change that fact (although it
// could cause a producerId to expire earlier than expected), so we can skip the loading.
Expand All @@ -467,6 +471,11 @@ class Log(@volatile var dir: File,
}
}

private def ensureSnapshotRetained(logStartOffset: Long, logEndOffset: Long): Unit = {
if (!producerStateManager.hasSnapshotInRange(logStartOffset, logEndOffset))
producerStateManager.takeEmptySnapshot(logStartOffset)
}

private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = {
val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
Expand All @@ -478,7 +487,7 @@ class Log(@volatile var dir: File,
completedTxns.foreach(producerStateManager.completeTxn)
}

private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized {
private[log] def activeProducers: Map[Long, ProducerIdEntry] = lock synchronized {
producerStateManager.activeProducers
}

Expand Down Expand Up @@ -1050,6 +1059,10 @@ class Log(@volatile var dir: File,
deletable.foreach(deleteSegment)
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
leaderEpochCache.clearAndFlushEarliest(logStartOffset)

// Producer eviction can also result in snapshot deletion, so ensure ahead of time that
// there will still be at least one snapshot remaining in case we fail after the deletion
ensureSnapshotRetained(logStartOffset, logEndOffset)
producerStateManager.evictUnretainedProducers(logStartOffset)
updateFirstUnstableOffset()
}
Expand Down Expand Up @@ -1364,6 +1377,12 @@ class Log(@volatile var dir: File,

producerStateManager.truncate()
producerStateManager.updateMapEndOffset(newOffset)

// Truncation results in all snapshot files being removed, so take a new snapshot now
// to ensure we won't incorrectly assume the upgrade path (and skip reloading of producer
// state) if the broker crashes after doing some appends following full truncation.
producerStateManager.takeSnapshot()

updateFirstUnstableOffset()

this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private[log] class Cleaner(val id: Int,
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(startOffset, log.name, new Date(oldSegmentOpt.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicPartition, oldSegmentOpt, cleaned, map, retainDeletes, log.config.maxMessageSize, transactionMetadata,
log.activePids, stats)
log.activeProducers, stats)

currentSegmentOpt = nextSegmentOpt
}
Expand Down
28 changes: 22 additions & 6 deletions core/src/main/scala/kafka/log/ProducerStateManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME
private val producers = mutable.Map.empty[Long, ProducerIdEntry]
private var lastMapOffset = 0L
private var lastSnapOffset = 0L
private var lastSnapOffset = -1L

// ongoing transactions sorted by the first offset of the transaction
private val ongoingTxns = new util.TreeMap[Long, TxnMetadata]
Expand Down Expand Up @@ -444,7 +444,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
Files.deleteIfExists(file.toPath)
}
case None =>
lastSnapOffset = logStartOffset
lastSnapOffset = -1
lastMapOffset = logStartOffset
return
}
Expand Down Expand Up @@ -472,6 +472,11 @@ class ProducerStateManager(val topicPartition: TopicPartition,
}
}

private def isSnapshotInRange(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = {
val offset = offsetFromFilename(snapshotFile.getName)
offset >= startOffset && offset <= endOffset
}

/**
* Truncate the producer id mapping to the given offset range and reload the entries from the most recent
* snapshot in range (if there is one). Note that the log end offset is assumed to be less than
Expand All @@ -480,8 +485,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
// remove all out of range snapshots
deleteSnapshotFiles { file =>
val offset = offsetFromFilename(file.getName)
offset > logEndOffset || offset <= logStartOffset
!isSnapshotInRange(file, logStartOffset, logEndOffset)
}

if (logEndOffset != mapEndOffset) {
Expand Down Expand Up @@ -541,6 +545,18 @@ class ProducerStateManager(val topicPartition: TopicPartition,
}
}

def hasSnapshotInRange(startOffset: Long, endOffset: Long): Boolean = {
listSnapshotFiles.exists { file =>
isSnapshotInRange(file, startOffset, endOffset)
}
}

def takeEmptySnapshot(offset: Long) = {
val snapshotFile = Log.producerSnapshotFile(logDir, offset)
debug(s"Writing empty producer snapshot for partition $topicPartition at offset $offset")
writeSnapshot(snapshotFile, mutable.Map.empty)
}

/**
* Get the last offset (exclusive) of the latest snapshot file.
*/
Expand All @@ -563,7 +579,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
removeEvictedOngoingTransactions(evictedProducerIds)
removeUnreplicatedTransactions(logStartOffset)

deleteSnapshotFiles(file => offsetFromFilename(file.getName) <= logStartOffset)
deleteSnapshotFiles(file => offsetFromFilename(file.getName) < logStartOffset)
if (lastMapOffset < logStartOffset)
lastMapOffset = logStartOffset
lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
Expand Down Expand Up @@ -596,7 +612,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
ongoingTxns.clear()
unreplicatedTxns.clear()
deleteSnapshotFiles()
lastSnapOffset = 0L
lastSnapOffset = -1L
lastMapOffset = 0L
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class LogManagerTest {
assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
time.sleep(log.config.fileDeleteDelayMs + 1)

// there should be a log file, two indexes, and the leader epoch checkpoint
assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length)
// there should be a log file, two indexes, one empty producer snapshot, and the leader epoch checkpoint
assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 1, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset+1, 1024).records.sizeInBytes)

try {
Expand Down

0 comments on commit d43d05f

Please sign in to comment.