From d43d05f8a39004daa9bbffbd0abd1b80418552a2 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 12 Jun 2017 10:38:33 -0700 Subject: [PATCH 1/7] KAFKA-5435: Ensure producer snapshot retained after truncation --- core/src/main/scala/kafka/log/Log.scala | 31 +++- .../src/main/scala/kafka/log/LogCleaner.scala | 2 +- .../kafka/log/ProducerStateManager.scala | 28 +++- .../scala/unit/kafka/log/LogManagerTest.scala | 4 +- .../test/scala/unit/kafka/log/LogTest.scala | 134 +++++++++++++----- .../kafka/log/ProducerStateManagerTest.scala | 61 +++++++- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- 7 files changed, 210 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7a3bc947cb33..8a2f50ef9d8a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -432,7 +432,7 @@ class Log(@volatile var dir: File, info(s"Loading producer state from offset $lastOffset for partition $topicPartition") if (producerStateManager.latestSnapshotOffset.isEmpty) { - // if there are no snapshots to load producer state from, we assume that the brokers are + // If there are no snapshots to load producer state from, we assume that the brokers are // being upgraded, which means there would be no previous idempotent/transactional producers // to load state for. To avoid an expensive scan through all of the segments, we take // empty snapshots from the start of the last two segments and the last offset. The purpose @@ -445,10 +445,14 @@ class Log(@volatile var dir: File, producerStateManager.takeSnapshot() } } else { - val currentTimeMs = time.milliseconds - producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs) - - // only do the potentially expensive reloading of the last snapshot offset is lower than the + // Since the oldest snapshot will be removed after truncation, we need to take an empty snapshot + // at the log start offset to ensure that we cannot reach the optimization path above if the + // broker fails after out-of-range snapshots are removed below. Otherwise, we would incorrectly assume + // that no producer data exists in the log. + ensureSnapshotRetained(logStartOffset, lastOffset) + producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) + + // Only do the potentially expensive reloading of the last snapshot offset is lower than the // log end offset (which would be the case on first startup) and there are active producers. // if there are no active producers, then truncating shouldn't change that fact (although it // could cause a producerId to expire earlier than expected), so we can skip the loading. @@ -467,6 +471,11 @@ class Log(@volatile var dir: File, } } + private def ensureSnapshotRetained(logStartOffset: Long, logEndOffset: Long): Unit = { + if (!producerStateManager.hasSnapshotInRange(logStartOffset, logEndOffset)) + producerStateManager.takeEmptySnapshot(logStartOffset) + } + private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = { val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] val completedTxns = ListBuffer.empty[CompletedTxn] @@ -478,7 +487,7 @@ class Log(@volatile var dir: File, completedTxns.foreach(producerStateManager.completeTxn) } - private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized { + private[log] def activeProducers: Map[Long, ProducerIdEntry] = lock synchronized { producerStateManager.activeProducers } @@ -1050,6 +1059,10 @@ class Log(@volatile var dir: File, deletable.foreach(deleteSegment) logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) leaderEpochCache.clearAndFlushEarliest(logStartOffset) + + // Producer eviction can also result in snapshot deletion, so ensure ahead of time that + // there will still be at least one snapshot remaining in case we fail after the deletion + ensureSnapshotRetained(logStartOffset, logEndOffset) producerStateManager.evictUnretainedProducers(logStartOffset) updateFirstUnstableOffset() } @@ -1364,6 +1377,12 @@ class Log(@volatile var dir: File, producerStateManager.truncate() producerStateManager.updateMapEndOffset(newOffset) + + // Truncation results in all snapshot files being removed, so take a new snapshot now + // to ensure we won't incorrectly assume the upgrade path (and skip reloading of producer + // state) if the broker crashes after doing some appends following full truncation. + producerStateManager.takeSnapshot() + updateFirstUnstableOffset() this.recoveryPoint = math.min(newOffset, this.recoveryPoint) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 5aa86725ac4c..623586f64dc6 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -424,7 +424,7 @@ private[log] class Cleaner(val id: Int, info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." .format(startOffset, log.name, new Date(oldSegmentOpt.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) cleanInto(log.topicPartition, oldSegmentOpt, cleaned, map, retainDeletes, log.config.maxMessageSize, transactionMetadata, - log.activePids, stats) + log.activeProducers, stats) currentSegmentOpt = nextSegmentOpt } diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index e7910521ac86..180bed33925f 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -376,7 +376,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, private val validateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME private val producers = mutable.Map.empty[Long, ProducerIdEntry] private var lastMapOffset = 0L - private var lastSnapOffset = 0L + private var lastSnapOffset = -1L // ongoing transactions sorted by the first offset of the transaction private val ongoingTxns = new util.TreeMap[Long, TxnMetadata] @@ -444,7 +444,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, Files.deleteIfExists(file.toPath) } case None => - lastSnapOffset = logStartOffset + lastSnapOffset = -1 lastMapOffset = logStartOffset return } @@ -472,6 +472,11 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } + private def isSnapshotInRange(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = { + val offset = offsetFromFilename(snapshotFile.getName) + offset >= startOffset && offset <= endOffset + } + /** * Truncate the producer id mapping to the given offset range and reload the entries from the most recent * snapshot in range (if there is one). Note that the log end offset is assumed to be less than @@ -480,8 +485,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { // remove all out of range snapshots deleteSnapshotFiles { file => - val offset = offsetFromFilename(file.getName) - offset > logEndOffset || offset <= logStartOffset + !isSnapshotInRange(file, logStartOffset, logEndOffset) } if (logEndOffset != mapEndOffset) { @@ -541,6 +545,18 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } + def hasSnapshotInRange(startOffset: Long, endOffset: Long): Boolean = { + listSnapshotFiles.exists { file => + isSnapshotInRange(file, startOffset, endOffset) + } + } + + def takeEmptySnapshot(offset: Long) = { + val snapshotFile = Log.producerSnapshotFile(logDir, offset) + debug(s"Writing empty producer snapshot for partition $topicPartition at offset $offset") + writeSnapshot(snapshotFile, mutable.Map.empty) + } + /** * Get the last offset (exclusive) of the latest snapshot file. */ @@ -563,7 +579,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, removeEvictedOngoingTransactions(evictedProducerIds) removeUnreplicatedTransactions(logStartOffset) - deleteSnapshotFiles(file => offsetFromFilename(file.getName) <= logStartOffset) + deleteSnapshotFiles(file => offsetFromFilename(file.getName) < logStartOffset) if (lastMapOffset < logStartOffset) lastMapOffset = logStartOffset lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) @@ -596,7 +612,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, ongoingTxns.clear() unreplicatedTxns.clear() deleteSnapshotFiles() - lastSnapOffset = 0L + lastSnapOffset = -1L lastMapOffset = 0L } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 5b2947100c20..90e57facfadb 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -105,8 +105,8 @@ class LogManagerTest { assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) - // there should be a log file, two indexes, and the leader epoch checkpoint - assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length) + // there should be a log file, two indexes, one empty producer snapshot, and the leader epoch checkpoint + assertEquals("Files should have been deleted", log.numberOfSegments * 4 + 1, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset+1, 1024).records.sizeInBytes) try { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 7b67857b1034..ddcd82bf7139 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -162,10 +162,10 @@ class LogTest { val pid = 1L val epoch: Short = 0 - val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 0) + val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, producerEpoch = epoch, sequence = 0) log.appendAsLeader(records, leaderEpoch = 0) - val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = epoch, sequence = 2) + val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, producerEpoch = epoch, sequence = 2) log.appendAsLeader(nextRecords, leaderEpoch = 0) } @@ -175,7 +175,6 @@ class LogTest { // snapshot files, and then reloading the log val log = createLog(64, messagesPerSegment = 10) - assertEquals(None, log.oldestProducerSnapshotOffset) for (i <- 0 to 100) { val record = new SimpleRecord(time.milliseconds, i.toString.getBytes) @@ -183,11 +182,11 @@ class LogTest { } assertTrue(log.logSegments.size >= 2) - log.close() - logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => Files.delete(file.toPath) } + assertEquals(None, log.oldestProducerSnapshotOffset) + log.close() val reloadedLog = createLog(64, messagesPerSegment = 10) val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset) ++ Seq(reloadedLog.logEndOffset) @@ -214,7 +213,7 @@ class LogTest { val baseOffset = 23L // create a batch with a couple gaps to simulate compaction - val records = TestUtils.records(pid = pid, epoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( + val records = TestUtils.records(pid = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( new SimpleRecord(System.currentTimeMillis(), "a".getBytes), new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes), new SimpleRecord(System.currentTimeMillis(), "c".getBytes), @@ -239,10 +238,10 @@ class LogTest { log.truncateTo(baseOffset + 4) - val activePids = log.activePids - assertTrue(activePids.contains(pid)) + val activeProducers = log.activeProducers + assertTrue(activeProducers.contains(pid)) - val entry = activePids(pid) + val entry = activeProducers(pid) assertEquals(0, entry.firstSeq) assertEquals(baseOffset, entry.firstOffset) assertEquals(3, entry.lastSeq) @@ -258,7 +257,7 @@ class LogTest { val baseOffset = 23L // create a batch with a couple gaps to simulate compaction - val records = TestUtils.records(pid = pid, epoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( + val records = TestUtils.records(pid = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( new SimpleRecord(System.currentTimeMillis(), "a".getBytes), new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes), new SimpleRecord(System.currentTimeMillis(), "c".getBytes), @@ -273,10 +272,10 @@ class LogTest { val filteredRecords = MemoryRecords.readableRecords(filtered) log.appendAsFollower(filteredRecords) - val activePids = log.activePids - assertTrue(activePids.contains(pid)) + val activeProducers = log.activeProducers + assertTrue(activeProducers.contains(pid)) - val entry = activePids(pid) + val entry = activeProducers(pid) assertEquals(0, entry.firstSeq) assertEquals(baseOffset, entry.firstOffset) assertEquals(3, entry.lastSeq) @@ -298,10 +297,51 @@ class LogTest { assertEquals(2, log.latestProducerStateEndOffset) log.truncateTo(1) - assertEquals(None, log.latestProducerSnapshotOffset) + assertEquals(Some(0), log.latestProducerSnapshotOffset) assertEquals(1, log.latestProducerStateEndOffset) } + @Test + def testTruncateBeforeOldestProducerSnapshot(): Unit = { + val pid = 1L + val log = createLog(2048) + + // delete all snapshot files + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => + Files.delete(file.toPath) + } + assertEquals(None, log.oldestProducerSnapshotOffset) + + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), pid = pid, sequence = 0, producerEpoch = 0), leaderEpoch = 0) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), pid = pid, sequence = 1, producerEpoch = 0), leaderEpoch = 0) + log.takeProducerSnapshot() + + log.truncateTo(1) + assertEquals(Some(0), log.oldestProducerSnapshotOffset) + assertEquals(1, log.latestProducerStateEndOffset) + + assertEquals(1, log.activeProducers.size) + val entry = log.activeProducers(pid) + assertEquals(0, entry.lastSeq) + assertEquals(0, entry.lastOffset) + assertEquals(0, entry.producerEpoch) + } + + @Test + def testTakeEmptySnapshotAfterFullTruncation(): Unit = { + val pid = 1L + val log = createLog(2048) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), pid = pid, sequence = 0, producerEpoch = 0), leaderEpoch = 0) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), pid = pid, sequence = 1, producerEpoch = 0), leaderEpoch = 0) + log.takeProducerSnapshot() + + log.truncateFullyAndStartAt(1) + assertEquals(Some(1), log.oldestProducerSnapshotOffset) + assertEquals(Some(1), log.latestProducerSnapshotOffset) + assertEquals(1, log.latestProducerStateEndOffset) + assertTrue(log.activeProducers.isEmpty) + } + @Test def testPidMapTruncateFullyAndStartAt() { val records = TestUtils.singletonRecords("foo".getBytes) @@ -319,32 +359,62 @@ class LogTest { log.truncateFullyAndStartAt(29) assertEquals(1, log.logSegments.size) - assertEquals(None, log.latestProducerSnapshotOffset) + assertEquals(Some(29), log.latestProducerSnapshotOffset) assertEquals(29, log.latestProducerStateEndOffset) } @Test def testPidExpirationOnSegmentDeletion() { val pid1 = 1L - val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), pid = pid1, epoch = 0, sequence = 0) + val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), pid = pid1, producerEpoch = 0, sequence = 0) val log = createLog(records.sizeInBytes, messagesPerSegment = 1, retentionBytes = records.sizeInBytes * 2) log.appendAsLeader(records, leaderEpoch = 0) log.takeProducerSnapshot() val pid2 = 2L - log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("bar".getBytes)), pid = pid2, epoch = 0, sequence = 0), + log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("bar".getBytes)), pid = pid2, producerEpoch = 0, sequence = 0), leaderEpoch = 0) - log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("baz".getBytes)), pid = pid2, epoch = 0, sequence = 1), + log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("baz".getBytes)), pid = pid2, producerEpoch = 0, sequence = 1), leaderEpoch = 0) log.takeProducerSnapshot() assertEquals(3, log.logSegments.size) - assertEquals(Set(pid1, pid2), log.activePids.keySet) + assertEquals(Set(pid1, pid2), log.activeProducers.keySet) + + log.deleteOldSegments() + + assertEquals(2, log.logSegments.size) + assertEquals(Set(pid2), log.activeProducers.keySet) + } + + @Test + def testSnapshotRetainedAfterProducerIdExpiration() { + val pid1 = 1L + val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), pid = pid1, producerEpoch = 0, sequence = 0) + val log = createLog(records.sizeInBytes, messagesPerSegment = 1, retentionBytes = records.sizeInBytes * 2) + log.appendAsLeader(records, leaderEpoch = 0) + log.takeProducerSnapshot() + + val pid2 = 2L + log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("bar".getBytes)), pid = pid2, producerEpoch = 0, sequence = 0), + leaderEpoch = 0) + log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("baz".getBytes)), pid = pid2, producerEpoch = 0, sequence = 1), + leaderEpoch = 0) + + assertEquals(3, log.logSegments.size) + assertEquals(Set(pid1, pid2), log.activeProducers.keySet) + + // this is a bit contrived, but we force deletion of all snapshots to ensure that deleteOldSegments() will + // create a new snapshot if needed + logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file => + Files.delete(file.toPath) + } log.deleteOldSegments() assertEquals(2, log.logSegments.size) - assertEquals(Set(pid2), log.activePids.keySet) + assertEquals(Set(pid2), log.activeProducers.keySet) + assertEquals(Some(log.logStartOffset), log.oldestProducerSnapshotOffset) } @Test @@ -353,7 +423,7 @@ class LogTest { log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) log.roll(1L) assertEquals(Some(1L), log.latestProducerSnapshotOffset) - assertEquals(Some(1L), log.oldestProducerSnapshotOffset) + assertEquals(Some(0L), log.oldestProducerSnapshotOffset) log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 0) log.roll(2L) @@ -423,15 +493,15 @@ class LogTest { val log = createLog(2048, maxPidExpirationMs = maxPidExpirationMs, pidExpirationCheckIntervalMs = expirationCheckInterval) val records = Seq(new SimpleRecord(time.milliseconds(), "foo".getBytes)) - log.appendAsLeader(TestUtils.records(records, pid = pid, epoch = 0, sequence = 0), leaderEpoch = 0) + log.appendAsLeader(TestUtils.records(records, pid = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0) - assertEquals(Set(pid), log.activePids.keySet) + assertEquals(Set(pid), log.activeProducers.keySet) time.sleep(expirationCheckInterval) - assertEquals(Set(pid), log.activePids.keySet) + assertEquals(Set(pid), log.activeProducers.keySet) time.sleep(expirationCheckInterval) - assertEquals(Set(), log.activePids.keySet) + assertEquals(Set(), log.activeProducers.keySet) } @Test @@ -453,7 +523,7 @@ class LogTest { // Pad the beginning of the log. for (_ <- 0 to 5) { val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), - pid = pid, epoch = epoch, sequence = seq) + pid = pid, producerEpoch = epoch, sequence = seq) log.appendAsLeader(record, leaderEpoch = 0) seq = seq + 1 } @@ -462,7 +532,7 @@ class LogTest { new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes) - ), pid = pid, epoch = epoch, sequence = seq) + ), pid = pid, producerEpoch = epoch, sequence = seq) val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3) @@ -481,7 +551,7 @@ class LogTest { List( new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)), - pid = pid, epoch = epoch, sequence = seq - 2) + pid = pid, producerEpoch = epoch, sequence = seq - 2) log.appendAsLeader(records, leaderEpoch = 0) fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " + "in the middle of the log.") @@ -493,7 +563,7 @@ class LogTest { try { val records = TestUtils.records( List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)), - pid = pid, epoch = epoch, sequence = 1) + pid = pid, producerEpoch = epoch, sequence = 1) log.appendAsLeader(records, leaderEpoch = 0) fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " + "in the middle of the log.") @@ -503,7 +573,7 @@ class LogTest { // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry. def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), - pid = pid, epoch = epoch, sequence = seq) + pid = pid, producerEpoch = epoch, sequence = seq) val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset, newAppendInfo.firstOffset) @@ -643,10 +713,10 @@ class LogTest { val newEpoch: Short = 1 val oldEpoch: Short = 0 - val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = newEpoch, sequence = 0) + val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, producerEpoch = newEpoch, sequence = 0) log.appendAsLeader(records, leaderEpoch = 0) - val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, epoch = oldEpoch, sequence = 0) + val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), pid = pid, producerEpoch = oldEpoch, sequence = 0) log.appendAsLeader(nextRecords, leaderEpoch = 0) } diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index ac1d62309d4f..74ff5d720057 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -266,14 +266,30 @@ class ProducerStateManagerTest extends JUnitSuite { append(stateManager, producerId, epoch, 4, 4L) stateManager.takeSnapshot() + assertFalse(stateManager.hasSnapshotInRange(0L, 0L)) + assertTrue(stateManager.hasSnapshotInRange(0L, 4L)) + assertTrue(stateManager.hasSnapshotInRange(1L, 1L)) + assertTrue(stateManager.hasSnapshotInRange(1L, 4L)) + assertTrue(stateManager.hasSnapshotInRange(0L, 3L)) + assertTrue(stateManager.hasSnapshotInRange(2L, 2L)) + assertTrue(stateManager.hasSnapshotInRange(4L, 5L)) + assertTrue(stateManager.hasSnapshotInRange(5L, 5L)) + assertFalse(stateManager.hasSnapshotInRange(6L, 6L)) + assertFalse(stateManager.hasSnapshotInRange(6L, 8L)) + stateManager.truncateAndReload(1L, 3L, time.milliseconds()) - assertEquals(Some(2L), stateManager.oldestSnapshotOffset) + assertEquals(Some(1L), stateManager.oldestSnapshotOffset) assertEquals(Some(3L), stateManager.latestSnapshotOffset) } @Test def testTakeSnapshot(): Unit = { + // Should be able to take a snapshot at offset 0 + stateManager.takeSnapshot() + assertEquals(Some(0), stateManager.oldestSnapshotOffset) + assertEquals(Some(0), stateManager.latestSnapshotOffset) + val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L, 0L) append(stateManager, producerId, epoch, 1, 1L, 1L) @@ -282,8 +298,41 @@ class ProducerStateManagerTest extends JUnitSuite { stateManager.takeSnapshot() // Check that file exists and it is not empty - assertEquals("Directory doesn't contain a single file as expected", 1, logDir.list().length) - assertTrue("Snapshot file is empty", logDir.list().head.length > 0) + assertEquals(2, logDir.list().length) + assertEquals(Some(0), stateManager.oldestSnapshotOffset) + assertEquals(Some(2), stateManager.latestSnapshotOffset) + } + + @Test + def testTakeEmptySnapshot(): Unit = { + val epoch = 0.toShort + append(stateManager, producerId, epoch, seq = 0, 0L) + append(stateManager, producerId, epoch, seq = 1, 1L) + stateManager.takeSnapshot() + + stateManager.takeEmptySnapshot(0L) + assertEquals(Some(0L), stateManager.oldestSnapshotOffset) + + stateManager.truncateAndReload(0L, 1L, time.milliseconds()) + assertEquals(0, stateManager.activeProducers.size) + assertEquals(0, stateManager.mapEndOffset) + assertEquals(Some(0L), stateManager.oldestSnapshotOffset) + assertEquals(Some(0L), stateManager.latestSnapshotOffset) + } + + @Test + def testSnapshotAfterTruncation(): Unit = { + val epoch = 0.toShort + append(stateManager, producerId, epoch, 0, 0L) + append(stateManager, producerId, epoch, 1, 1L) + stateManager.takeSnapshot() + assertEquals(Some(2), stateManager.latestSnapshotOffset) + + stateManager.truncateAndReload(0L, 1L, time.milliseconds()) + assertEquals(None, stateManager.latestSnapshotOffset) + + stateManager.takeSnapshot() + assertEquals(Some(0), stateManager.latestSnapshotOffset) } @Test @@ -402,6 +451,8 @@ class ProducerStateManagerTest extends JUnitSuite { val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 1, 1L) stateManager.takeSnapshot() @@ -409,10 +460,10 @@ class ProducerStateManagerTest extends JUnitSuite { append(stateManager, anotherPid, epoch, 0, 2L) append(stateManager, anotherPid, epoch, 1, 3L) stateManager.takeSnapshot() - assertEquals(Set(2, 4), currentSnapshotOffsets) + assertEquals(Set(1, 2, 4), currentSnapshotOffsets) stateManager.evictUnretainedProducers(2) - assertEquals(Set(4), currentSnapshotOffsets) + assertEquals(Set(2, 4), currentSnapshotOffsets) assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) assertEquals(None, stateManager.lastEntry(producerId)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a2c9b05c570b..a38c75b2def0 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -353,12 +353,12 @@ object TestUtils extends Logging { magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE, codec: CompressionType = CompressionType.NONE, pid: Long = RecordBatch.NO_PRODUCER_ID, - epoch: Short = RecordBatch.NO_PRODUCER_EPOCH, + producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, sequence: Int = RecordBatch.NO_SEQUENCE, baseOffset: Long = 0L): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset, - System.currentTimeMillis, pid, epoch, sequence) + System.currentTimeMillis, pid, producerEpoch, sequence) records.foreach(builder.append) builder.build() } From 50c861ee7cd87472a1094451b53f7e4e63cb8d2c Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 13 Jun 2017 16:07:41 -0700 Subject: [PATCH 2/7] Maintain empty snapshot at log start offset --- core/src/main/scala/kafka/log/Log.scala | 63 +++++++++---------- .../kafka/log/ProducerStateManager.scala | 29 ++++++--- .../test/scala/unit/kafka/log/LogTest.scala | 25 +++++--- .../kafka/log/ProducerStateManagerTest.scala | 32 +++++----- 4 files changed, 80 insertions(+), 69 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8a2f50ef9d8a..2015f598913b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -431,25 +431,23 @@ class Log(@volatile var dir: File, private def loadProducerState(lastOffset: Long): Unit = lock synchronized { info(s"Loading producer state from offset $lastOffset for partition $topicPartition") + // To avoid expensive initialization when upgrading from older brokers, we skip loading producer + // state if no snapshot file is found. To ensure that we cannot hit this case after upgrading (which + // could cause us to lose producer state), we enforce the invariant that we always have an empty snapshot + // file at the log start offset. + if (producerStateManager.latestSnapshotOffset.isEmpty) { - // If there are no snapshots to load producer state from, we assume that the brokers are - // being upgraded, which means there would be no previous idempotent/transactional producers - // to load state for. To avoid an expensive scan through all of the segments, we take - // empty snapshots from the start of the last two segments and the last offset. The purpose - // of taking the segment snapshots is to avoid the full scan in the case that the log needs - // truncation. + // There are no snapshots so this is the upgrade path. In addition to taking a snapshot at the log start + // offset to enforce the invariant mentioned above, we take empty snapshots from the start of the last + // two segments and the last offset. The purpose of the additional snapshots is to avoid the full scan in + // the case that the log needs truncation. val nextLatestSegmentBaseOffset = Option(segments.lowerEntry(activeSegment.baseOffset)).map(_.getValue.baseOffset) - val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) - offsetsToSnapshot.flatten.foreach { offset => - producerStateManager.updateMapEndOffset(offset) - producerStateManager.takeSnapshot() - } + val offsetsToSnapshot = Seq(Some(logStartOffset), nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) + offsetsToSnapshot.flatten.foreach(producerStateManager.takeEmptySnapshot) } else { - // Since the oldest snapshot will be removed after truncation, we need to take an empty snapshot - // at the log start offset to ensure that we cannot reach the optimization path above if the - // broker fails after out-of-range snapshots are removed below. Otherwise, we would incorrectly assume - // that no producer data exists in the log. - ensureSnapshotRetained(logStartOffset, lastOffset) + // Ensure we have an empty snapshot at the log start offset to enforce the invariant mentioned above. + // This must be done prior to truncation in case of failure after previous snapshots are removed. + producerStateManager.takeEmptySnapshot(logStartOffset) producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) // Only do the potentially expensive reloading of the last snapshot offset is lower than the @@ -471,11 +469,6 @@ class Log(@volatile var dir: File, } } - private def ensureSnapshotRetained(logStartOffset: Long, logEndOffset: Long): Unit = { - if (!producerStateManager.hasSnapshotInRange(logStartOffset, logEndOffset)) - producerStateManager.takeEmptySnapshot(logStartOffset) - } - private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = { val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] val completedTxns = ListBuffer.empty[CompletedTxn] @@ -715,6 +708,11 @@ class Log(@volatile var dir: File, lock synchronized { if (offset > logStartOffset) { logStartOffset = offset + + // Enforce the invariant that we have an empty snapshot at the log start offset to ensure + // proper loading of producer state upon recovery. + producerStateManager.takeEmptySnapshot(logStartOffset) + producerStateManager.deleteSnapshotsBefore(logStartOffset) } } } @@ -1060,10 +1058,10 @@ class Log(@volatile var dir: File, logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) leaderEpochCache.clearAndFlushEarliest(logStartOffset) - // Producer eviction can also result in snapshot deletion, so ensure ahead of time that - // there will still be at least one snapshot remaining in case we fail after the deletion - ensureSnapshotRetained(logStartOffset, logEndOffset) - producerStateManager.evictUnretainedProducers(logStartOffset) + // Update the producer state with the new log start offset, which we cause any non-retained producers to + // be evicted. Enforce the invariant that we always have an empty snapshot at the log start offset. + producerStateManager.takeEmptySnapshot(logStartOffset) + producerStateManager.truncateHead(logStartOffset) updateFirstUnstableOffset() } } @@ -1268,10 +1266,10 @@ class Log(@volatile var dir: File, for(segment <- logSegments(this.recoveryPoint, offset)) segment.flush() - // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain - // the snapshots from the recent segments in case we need to truncate and rebuild the producer state. - // Otherwise, we would always need to rebuild from the earliest segment. - producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset)) + // Now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain the + // snapshots from the recent segments in case we need to truncate and rebuild the producer state. Note that + // we still retain the snapshot from the log start offset. + producerStateManager.deleteSnapshotsInRangeExclusive(logStartOffset, minSnapshotOffsetToRetain(offset)) lock synchronized { if(offset > this.recoveryPoint) { @@ -1378,10 +1376,9 @@ class Log(@volatile var dir: File, producerStateManager.truncate() producerStateManager.updateMapEndOffset(newOffset) - // Truncation results in all snapshot files being removed, so take a new snapshot now - // to ensure we won't incorrectly assume the upgrade path (and skip reloading of producer - // state) if the broker crashes after doing some appends following full truncation. - producerStateManager.takeSnapshot() + // Truncation results in all snapshot files being removed, so take an empty snapshot at the new offset + // to maintain the invariant that we always have a snapshot at the log start offset. + producerStateManager.takeEmptySnapshot(newOffset) updateFirstUnstableOffset() diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 180bed33925f..a8e0f0c27a88 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -472,11 +472,16 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } - private def isSnapshotInRange(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = { + private def inRangeInclusive(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = { val offset = offsetFromFilename(snapshotFile.getName) offset >= startOffset && offset <= endOffset } + private def inRangeExclusive(snapshotFile: File, startOffset: Long, endOffset: Long): Boolean = { + val offset = offsetFromFilename(snapshotFile.getName) + offset > startOffset && offset < endOffset + } + /** * Truncate the producer id mapping to the given offset range and reload the entries from the most recent * snapshot in range (if there is one). Note that the log end offset is assumed to be less than @@ -484,9 +489,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, */ def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { // remove all out of range snapshots - deleteSnapshotFiles { file => - !isSnapshotInRange(file, logStartOffset, logEndOffset) - } + deleteSnapshotFiles(!inRangeInclusive(_, logStartOffset, logEndOffset)) if (logEndOffset != mapEndOffset) { producers.clear() @@ -497,7 +500,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, unreplicatedTxns.clear() loadFromSnapshot(logStartOffset, currentTimeMs) } else { - evictUnretainedProducers(logStartOffset) + truncateHead(logStartOffset) } } @@ -545,9 +548,9 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } - def hasSnapshotInRange(startOffset: Long, endOffset: Long): Boolean = { + def hasSnapshotInRangeInclusive(startOffset: Long, endOffset: Long): Boolean = { listSnapshotFiles.exists { file => - isSnapshotInRange(file, startOffset, endOffset) + inRangeInclusive(file, startOffset, endOffset) } } @@ -569,9 +572,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * When we remove the head of the log due to retention, we need to clean up the id map. This method takes - * the new start offset and removes all producerIds which have a smaller last written offset. + * the new start offset and removes all producerIds which have a smaller last written offset. Additionally, + * all snapshot files at offsets strictly lower than the log start offset will be removed. */ - def evictUnretainedProducers(logStartOffset: Long) { + def truncateHead(logStartOffset: Long) { val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset) val evictedProducerIds = evictedProducerEntries.keySet @@ -579,7 +583,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, removeEvictedOngoingTransactions(evictedProducerIds) removeUnreplicatedTransactions(logStartOffset) - deleteSnapshotFiles(file => offsetFromFilename(file.getName) < logStartOffset) + deleteSnapshotsBefore(logStartOffset) if (lastMapOffset < logStartOffset) lastMapOffset = logStartOffset lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) @@ -636,6 +640,11 @@ class ProducerStateManager(val topicPartition: TopicPartition, deleteSnapshotFiles(file => offsetFromFilename(file.getName) < offset) } + @threadsafe + def deleteSnapshotsInRangeExclusive(startOffset: Long, endOffset: Long): Unit = { + deleteSnapshotFiles(inRangeExclusive(_, startOffset, endOffset)) + } + private def listSnapshotFiles: List[File] = { if (logDir.exists && logDir.isDirectory) logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index ddcd82bf7139..91baf9d99650 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -419,31 +419,36 @@ class LogTest { @Test def testTakeSnapshotOnRollAndDeleteSnapshotOnFlush() { + // roll triggers a flush at the starting offset of the new segment. we should + // retain the snapshots from the active segment, the previous segment, + // and the log start offset. Other snapshots should be removed. + val log = createLog(2048) log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) log.roll(1L) + assertTrue(Seq(0L, 1L).forall(Log.producerSnapshotFile(logDir, _).exists)) assertEquals(Some(1L), log.latestProducerSnapshotOffset) assertEquals(Some(0L), log.oldestProducerSnapshotOffset) log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 0) log.roll(2L) - assertEquals(Some(2L), log.latestProducerSnapshotOffset) - assertEquals(Some(1L), log.oldestProducerSnapshotOffset) + assertTrue(Seq(0L, 1L, 2L).forall(Log.producerSnapshotFile(logDir, _).exists)) + assertEquals(Some(0L), log.latestProducerSnapshotOffset) + assertEquals(Some(2L), log.oldestProducerSnapshotOffset) log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), leaderEpoch = 0) log.roll(3L) - assertEquals(Some(3L), log.latestProducerSnapshotOffset) + assertTrue(Seq(0L, 2L, 3L).forall(Log.producerSnapshotFile(logDir, _).exists)) + assertTrue(Seq(1L).forall(!Log.producerSnapshotFile(logDir, _).exists)) + assertEquals(Some(0L), log.latestProducerSnapshotOffset) + assertEquals(Some(3L), log.oldestProducerSnapshotOffset) - // roll triggers a flush at the starting offset of the new segment. we should - // retain the snapshots from the active segment and the previous segment, but - // the oldest one should be gone - assertEquals(Some(2L), log.oldestProducerSnapshotOffset) - - // even if we flush within the active segment, the snapshot should remain + // if we flush within the active segment, nothing changes log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), leaderEpoch = 0) log.flush(4L) + assertTrue(Seq(0L, 2L, 3L).forall(Log.producerSnapshotFile(logDir, _).exists)) assertEquals(Some(3L), log.latestProducerSnapshotOffset) - assertEquals(Some(2L), log.oldestProducerSnapshotOffset) + assertEquals(Some(0L), log.oldestProducerSnapshotOffset) } @Test diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 74ff5d720057..34808111ae0f 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -266,16 +266,16 @@ class ProducerStateManagerTest extends JUnitSuite { append(stateManager, producerId, epoch, 4, 4L) stateManager.takeSnapshot() - assertFalse(stateManager.hasSnapshotInRange(0L, 0L)) - assertTrue(stateManager.hasSnapshotInRange(0L, 4L)) - assertTrue(stateManager.hasSnapshotInRange(1L, 1L)) - assertTrue(stateManager.hasSnapshotInRange(1L, 4L)) - assertTrue(stateManager.hasSnapshotInRange(0L, 3L)) - assertTrue(stateManager.hasSnapshotInRange(2L, 2L)) - assertTrue(stateManager.hasSnapshotInRange(4L, 5L)) - assertTrue(stateManager.hasSnapshotInRange(5L, 5L)) - assertFalse(stateManager.hasSnapshotInRange(6L, 6L)) - assertFalse(stateManager.hasSnapshotInRange(6L, 8L)) + assertFalse(stateManager.hasSnapshotInRangeInclusive(0L, 0L)) + assertTrue(stateManager.hasSnapshotInRangeInclusive(0L, 4L)) + assertTrue(stateManager.hasSnapshotInRangeInclusive(1L, 1L)) + assertTrue(stateManager.hasSnapshotInRangeInclusive(1L, 4L)) + assertTrue(stateManager.hasSnapshotInRangeInclusive(0L, 3L)) + assertTrue(stateManager.hasSnapshotInRangeInclusive(2L, 2L)) + assertTrue(stateManager.hasSnapshotInRangeInclusive(4L, 5L)) + assertTrue(stateManager.hasSnapshotInRangeInclusive(5L, 5L)) + assertFalse(stateManager.hasSnapshotInRangeInclusive(6L, 6L)) + assertFalse(stateManager.hasSnapshotInRangeInclusive(6L, 8L)) stateManager.truncateAndReload(1L, 3L, time.milliseconds()) @@ -436,18 +436,18 @@ class ProducerStateManagerTest extends JUnitSuite { } @Test - def testFirstUnstableOffsetAfterEviction(): Unit = { + def testFirstUnstableOffsetAfterTruncateHead(): Unit = { val epoch = 0.toShort val sequence = 0 append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset)) append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true) - stateManager.evictUnretainedProducers(100) + stateManager.truncateHead(100) assertEquals(Some(106), stateManager.firstUnstableOffset.map(_.messageOffset)) } @Test - def testEvictUnretainedPids(): Unit = { + def testTruncateHead(): Unit = { val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L) @@ -462,7 +462,7 @@ class ProducerStateManagerTest extends JUnitSuite { stateManager.takeSnapshot() assertEquals(Set(1, 2, 4), currentSnapshotOffsets) - stateManager.evictUnretainedProducers(2) + stateManager.truncateHead(2) assertEquals(Set(2, 4), currentSnapshotOffsets) assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) assertEquals(None, stateManager.lastEntry(producerId)) @@ -471,12 +471,12 @@ class ProducerStateManagerTest extends JUnitSuite { assertTrue(maybeEntry.isDefined) assertEquals(3L, maybeEntry.get.lastOffset) - stateManager.evictUnretainedProducers(3) + stateManager.truncateHead(3) assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) assertEquals(Set(4), currentSnapshotOffsets) assertEquals(4, stateManager.mapEndOffset) - stateManager.evictUnretainedProducers(5) + stateManager.truncateHead(5) assertEquals(Set(), stateManager.activeProducers.keySet) assertEquals(Set(), currentSnapshotOffsets) assertEquals(5, stateManager.mapEndOffset) From fb483ff2e34c3d3a8e1d0ff7fe09f9a397626619 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 13 Jun 2017 16:30:18 -0700 Subject: [PATCH 3/7] Remove unused method and add test case for deleteSnapshotsInRangeExclusive --- core/src/main/scala/kafka/log/Log.scala | 4 +- .../kafka/log/ProducerStateManager.scala | 6 --- .../kafka/log/ProducerStateManagerTest.scala | 41 ++++++++++++++----- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2015f598913b..7b673e183ad3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -46,8 +46,6 @@ import java.util.Map.{Entry => JEntry} import java.lang.{Long => JLong} import java.util.regex.Pattern -import org.apache.kafka.common.internals.Topic - object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) @@ -450,7 +448,7 @@ class Log(@volatile var dir: File, producerStateManager.takeEmptySnapshot(logStartOffset) producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) - // Only do the potentially expensive reloading of the last snapshot offset is lower than the + // Only do the potentially expensive reloading if the last snapshot offset is lower than the // log end offset (which would be the case on first startup) and there are active producers. // if there are no active producers, then truncating shouldn't change that fact (although it // could cause a producerId to expire earlier than expected), so we can skip the loading. diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index a8e0f0c27a88..f642f3868abc 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -548,12 +548,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, } } - def hasSnapshotInRangeInclusive(startOffset: Long, endOffset: Long): Boolean = { - listSnapshotFiles.exists { file => - inRangeInclusive(file, startOffset, endOffset) - } - } - def takeEmptySnapshot(offset: Long) = { val snapshotFile = Log.producerSnapshotFile(logDir, offset) debug(s"Writing empty producer snapshot for partition $topicPartition at offset $offset") diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 34808111ae0f..f57001e8938b 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -266,17 +266,6 @@ class ProducerStateManagerTest extends JUnitSuite { append(stateManager, producerId, epoch, 4, 4L) stateManager.takeSnapshot() - assertFalse(stateManager.hasSnapshotInRangeInclusive(0L, 0L)) - assertTrue(stateManager.hasSnapshotInRangeInclusive(0L, 4L)) - assertTrue(stateManager.hasSnapshotInRangeInclusive(1L, 1L)) - assertTrue(stateManager.hasSnapshotInRangeInclusive(1L, 4L)) - assertTrue(stateManager.hasSnapshotInRangeInclusive(0L, 3L)) - assertTrue(stateManager.hasSnapshotInRangeInclusive(2L, 2L)) - assertTrue(stateManager.hasSnapshotInRangeInclusive(4L, 5L)) - assertTrue(stateManager.hasSnapshotInRangeInclusive(5L, 5L)) - assertFalse(stateManager.hasSnapshotInRangeInclusive(6L, 6L)) - assertFalse(stateManager.hasSnapshotInRangeInclusive(6L, 8L)) - stateManager.truncateAndReload(1L, 3L, time.milliseconds()) assertEquals(Some(1L), stateManager.oldestSnapshotOffset) @@ -387,6 +376,36 @@ class ProducerStateManagerTest extends JUnitSuite { assertEquals(Set(), currentSnapshotOffsets) } + @Test + def testDeleteSnapshotsInRangeExclusive(): Unit = { + val epoch = 0.toShort + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 0, 0L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 1, 1L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 2, 2L) + stateManager.takeSnapshot() + append(stateManager, producerId, epoch, 3, 3L) + stateManager.takeSnapshot() + assertEquals(Set(0, 1, 2, 3, 4), currentSnapshotOffsets) + + stateManager.deleteSnapshotsInRangeExclusive(3L, 4L) + assertEquals(Set(0, 1, 2, 3, 4), currentSnapshotOffsets) + + stateManager.deleteSnapshotsInRangeExclusive(0L, 0L) + assertEquals(Set(0, 1, 2, 3, 4), currentSnapshotOffsets) + + stateManager.deleteSnapshotsInRangeExclusive(0L, 2L) + assertEquals(Set(0, 2, 3, 4), currentSnapshotOffsets) + + stateManager.deleteSnapshotsInRangeExclusive(3L, 5L) + assertEquals(Set(0, 2, 3), currentSnapshotOffsets) + + stateManager.deleteSnapshotsInRangeExclusive(0L, 3L) + assertEquals(Set(0, 3), currentSnapshotOffsets) + } + @Test def testTruncate(): Unit = { val epoch = 0.toShort From 382e2f0aaa2b444e79b0e1633871b622fbd0ced1 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 13 Jun 2017 16:34:00 -0700 Subject: [PATCH 4/7] Fix LogManager test case --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 90e57facfadb..b91683e74599 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -132,7 +132,7 @@ class LogManagerTest { val config = LogConfig.fromProps(logConfig.originals, logProps) logManager = createLogManager() - logManager.startup + logManager.startup() // create a log val log = logManager.createLog(new TopicPartition(name, 0), config) @@ -154,8 +154,8 @@ class LogManagerTest { time.sleep(log.config.fileDeleteDelayMs + 1) // there should be a log file, two indexes (the txn index is created lazily), - // the leader epoch checkpoint and two pid mapping files (one for the active and previous segments) - assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 3, log.dir.list.length) + // the leader epoch checkpoint and three pid mapping files (one for active and previous segments and the log start offset) + assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 4, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.readUncommitted(offset + 1, 1024).records.sizeInBytes) try { log.readUncommitted(0, 1024) From a3fef3e5acf16b83af09df995db1114fb688510f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 13 Jun 2017 16:39:24 -0700 Subject: [PATCH 5/7] Fix another broken test case --- core/src/test/scala/unit/kafka/log/LogTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 91baf9d99650..be194238f891 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -433,15 +433,15 @@ class LogTest { log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 0) log.roll(2L) assertTrue(Seq(0L, 1L, 2L).forall(Log.producerSnapshotFile(logDir, _).exists)) - assertEquals(Some(0L), log.latestProducerSnapshotOffset) - assertEquals(Some(2L), log.oldestProducerSnapshotOffset) + assertEquals(Some(0L), log.oldestProducerSnapshotOffset) + assertEquals(Some(2L), log.latestProducerSnapshotOffset) log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), leaderEpoch = 0) log.roll(3L) assertTrue(Seq(0L, 2L, 3L).forall(Log.producerSnapshotFile(logDir, _).exists)) assertTrue(Seq(1L).forall(!Log.producerSnapshotFile(logDir, _).exists)) - assertEquals(Some(0L), log.latestProducerSnapshotOffset) - assertEquals(Some(3L), log.oldestProducerSnapshotOffset) + assertEquals(Some(0L), log.oldestProducerSnapshotOffset) + assertEquals(Some(3L), log.latestProducerSnapshotOffset) // if we flush within the active segment, nothing changes log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), leaderEpoch = 0) From c9af50ff22e201750fee191e7b362a5fa367ef91 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 13 Jun 2017 16:58:20 -0700 Subject: [PATCH 6/7] Add a few assertions on snapshot existence for record deletion --- core/src/test/scala/unit/kafka/log/LogTest.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index be194238f891..af98bed58d29 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1985,21 +1985,25 @@ class LogTest { log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("should have 3 segments", 3, log.numberOfSegments) assertEquals(log.logStartOffset, 0) + assertEquals(Some(0), log.oldestProducerSnapshotOffset) log.maybeIncrementLogStartOffset(1) log.deleteOldSegments() assertEquals("should have 3 segments", 3, log.numberOfSegments) assertEquals(log.logStartOffset, 1) + assertEquals(Some(1), log.oldestProducerSnapshotOffset) log.maybeIncrementLogStartOffset(6) log.deleteOldSegments() assertEquals("should have 2 segments", 2, log.numberOfSegments) assertEquals(log.logStartOffset, 6) + assertEquals(Some(6), log.oldestProducerSnapshotOffset) log.maybeIncrementLogStartOffset(15) log.deleteOldSegments() assertEquals("should have 1 segments", 1, log.numberOfSegments) assertEquals(log.logStartOffset, 15) + assertEquals(Some(15), log.oldestProducerSnapshotOffset) } def epochCache(log: Log): LeaderEpochFileCache = { @@ -2015,7 +2019,7 @@ class LogTest { for (_ <- 0 until 15) log.appendAsLeader(createRecords, leaderEpoch = 0) - log.deleteOldSegments + log.deleteOldSegments() assertEquals("should have 2 segments", 2,log.numberOfSegments) } @@ -2028,7 +2032,7 @@ class LogTest { for (_ <- 0 until 15) log.appendAsLeader(createRecords, leaderEpoch = 0) - log.deleteOldSegments + log.deleteOldSegments() assertEquals("should have 3 segments", 3,log.numberOfSegments) } From 96c79a857f95bd219224cb5a50f46f4b7ab180d2 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 15 Jun 2017 14:13:36 -0700 Subject: [PATCH 7/7] A few minor fixes --- core/src/main/scala/kafka/log/Log.scala | 27 ++++++++++--------- .../kafka/log/ProducerStateManager.scala | 2 +- .../test/scala/unit/kafka/log/LogTest.scala | 16 +++++++++++ .../kafka/log/ProducerStateManagerTest.scala | 2 ++ 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7b673e183ad3..505d2f521122 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -443,17 +443,19 @@ class Log(@volatile var dir: File, val offsetsToSnapshot = Seq(Some(logStartOffset), nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) offsetsToSnapshot.flatten.foreach(producerStateManager.takeEmptySnapshot) } else { + val hasProducersBeforeTruncation = !producerStateManager.isEmpty + // Ensure we have an empty snapshot at the log start offset to enforce the invariant mentioned above. // This must be done prior to truncation in case of failure after previous snapshots are removed. producerStateManager.takeEmptySnapshot(logStartOffset) producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) // Only do the potentially expensive reloading if the last snapshot offset is lower than the - // log end offset (which would be the case on first startup) and there are active producers. - // if there are no active producers, then truncating shouldn't change that fact (although it - // could cause a producerId to expire earlier than expected), so we can skip the loading. + // log end offset (which would be the case on first startup) and there were active producers + // prior to truncation. If there weren't, then truncating shouldn't change that fact (although it + // could cause a producerId to expire earlier than expected), and we can skip the loading. // This is an optimization for users which are not yet using idempotent/transactional features yet. - if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty) { + if (lastOffset > producerStateManager.mapEndOffset && hasProducersBeforeTruncation) { logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset) val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) @@ -471,8 +473,10 @@ class Log(@volatile var dir: File, val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] val completedTxns = ListBuffer.empty[CompletedTxn] records.batches.asScala.foreach { batch => - if (batch.hasProducerId) - updateProducers(batch, loadedProducers, completedTxns, loadingFromLog = true) + if (batch.hasProducerId) { + val maybeCompletedTxn = updateProducers(batch, loadedProducers, loadingFromLog = true) + maybeCompletedTxn.foreach(completedTxns += _) + } } loadedProducers.values.foreach(producerStateManager.update) completedTxns.foreach(producerStateManager.completeTxn) @@ -727,7 +731,8 @@ class Log(@volatile var dir: File, // the last appended entry to the client. if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch))) return (updatedProducers, completedTxns.toList, maybeLastEntry) - updateProducers(batch, updatedProducers, completedTxns, loadingFromLog = false) + val maybeCompletedTxn = updateProducers(batch, updatedProducers, loadingFromLog = false) + maybeCompletedTxn.foreach(completedTxns += _) } (updatedProducers, completedTxns.toList, None) } @@ -813,12 +818,10 @@ class Log(@volatile var dir: File, private def updateProducers(batch: RecordBatch, producers: mutable.Map[Long, ProducerAppendInfo], - completedTxns: ListBuffer[CompletedTxn], - loadingFromLog: Boolean): Unit = { + loadingFromLog: Boolean): Option[CompletedTxn] = { val producerId = batch.producerId val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, loadingFromLog)) - val maybeCompletedTxn = appendInfo.append(batch) - maybeCompletedTxn.foreach(completedTxns += _) + appendInfo.append(batch) } /** @@ -1056,7 +1059,7 @@ class Log(@volatile var dir: File, logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) leaderEpochCache.clearAndFlushEarliest(logStartOffset) - // Update the producer state with the new log start offset, which we cause any non-retained producers to + // Update the producer state with the new log start offset, which would cause any non-retained producers to // be evicted. Enforce the invariant that we always have an empty snapshot at the log start offset. producerStateManager.takeEmptySnapshot(logStartOffset) producerStateManager.truncateHead(logStartOffset) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index f642f3868abc..c167d6137f6d 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -444,7 +444,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, Files.deleteIfExists(file.toPath) } case None => - lastSnapOffset = -1 + lastSnapOffset = -1L lastMapOffset = logStartOffset return } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index af98bed58d29..14d75625d448 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -301,6 +301,21 @@ class LogTest { assertEquals(1, log.latestProducerStateEndOffset) } + @Test + def testTruncateToWithEmptyProducerState() { + val log = createLog(2048) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes))), leaderEpoch = 0) + log.appendAsLeader(TestUtils.records(List(new SimpleRecord("d".getBytes))), leaderEpoch = 0) + assertTrue(log.activeProducers.isEmpty) + assertEquals(4L, log.latestProducerStateEndOffset) + + log.truncateTo(1L) + assertTrue(log.activeProducers.isEmpty) + assertEquals(1L, log.latestProducerStateEndOffset) + } + @Test def testTruncateBeforeOldestProducerSnapshot(): Unit = { val pid = 1L @@ -360,6 +375,7 @@ class LogTest { log.truncateFullyAndStartAt(29) assertEquals(1, log.logSegments.size) assertEquals(Some(29), log.latestProducerSnapshotOffset) + assertEquals(Some(29), log.oldestProducerSnapshotOffset) assertEquals(29, log.latestProducerStateEndOffset) } diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index f57001e8938b..fbec5f08c403 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -319,9 +319,11 @@ class ProducerStateManagerTest extends JUnitSuite { stateManager.truncateAndReload(0L, 1L, time.milliseconds()) assertEquals(None, stateManager.latestSnapshotOffset) + assertEquals(None, stateManager.oldestSnapshotOffset) stateManager.takeSnapshot() assertEquals(Some(0), stateManager.latestSnapshotOffset) + assertEquals(Some(0), stateManager.oldestSnapshotOffset) } @Test