From 5f320b8578f8b29e1a3a26e1f55ad07970789053 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 21 Mar 2026 12:12:28 +0800 Subject: [PATCH 1/3] migrate --- .../storage/internals/log/LogTestUtils.java | 8 + .../storage/internals/log/UnifiedLogTest.java | 675 ++++++++++++++++++ 2 files changed, 683 insertions(+) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java index 92a1e8cf0d900..b560e211bd9c1 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java @@ -196,6 +196,14 @@ public static MemoryRecords records(List records, long baseOffset, return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch); } + public static List allAbortedTransactions(UnifiedLog log) { + List result = new ArrayList<>(); + for (LogSegment segment : log.logSegments()) { + result.addAll(segment.txnIndex().allAbortedTxns()); + } + return result; + } + public static void deleteProducerSnapshotFiles(File logDir) { Stream.of(logDir.listFiles()) .filter(f -> f.isFile() && f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index a0a207c512a55..591afe9040934 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.config.AbstractConfig; @@ -29,11 +30,14 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; import org.apache.kafka.common.message.DescribeProducersResponseData; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.DefaultRecordBatch; +import org.apache.kafka.common.record.internal.EndTransactionMarker; import org.apache.kafka.common.record.internal.FileRecords; import org.apache.kafka.common.record.internal.InvalidMemoryRecordsProvider; import org.apache.kafka.common.record.internal.MemoryRecords; @@ -47,6 +51,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.server.common.TransactionVersion; import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; @@ -102,6 +107,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -3445,4 +3451,673 @@ private Object yammerMetricValue(String name) { .getValue(); return gauge.value(); } + + @Test + public void testTransactionIndexUpdatedThroughReplication() throws IOException { + short epoch = 0; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + ByteBuffer buffer = ByteBuffer.allocate(2048); + + long pid1 = 1L; + long pid2 = 2L; + long pid3 = 3L; + long pid4 = 4L; + + BiConsumer appendPid1 = appendTransactionalToBuffer(buffer, pid1, epoch); + BiConsumer appendPid2 = appendTransactionalToBuffer(buffer, pid2, epoch); + BiConsumer appendPid3 = appendTransactionalToBuffer(buffer, pid3, epoch); + BiConsumer appendPid4 = appendTransactionalToBuffer(buffer, pid4, epoch); + + appendPid1.accept(0L, 5); + appendNonTransactionalToBuffer(buffer, 5L, 3); + appendPid2.accept(8L, 2); + appendPid1.accept(10L, 4); + appendPid3.accept(14L, 3); + appendNonTransactionalToBuffer(buffer, 17L, 2); + appendPid1.accept(19L, 10); + appendEndTxnMarkerToBuffer(buffer, pid1, epoch, 29L, ControlRecordType.ABORT); + appendPid2.accept(30L, 6); + appendPid4.accept(36L, 3); + appendNonTransactionalToBuffer(buffer, 39L, 10); + appendPid3.accept(49L, 9); + appendEndTxnMarkerToBuffer(buffer, pid3, epoch, 58L, ControlRecordType.COMMIT); + appendPid4.accept(59L, 8); + appendPid2.accept(67L, 7); + appendEndTxnMarkerToBuffer(buffer, pid2, epoch, 74L, ControlRecordType.ABORT); + appendNonTransactionalToBuffer(buffer, 75L, 10); + appendPid4.accept(85L, 4); + appendEndTxnMarkerToBuffer(buffer, pid4, epoch, 89L, ControlRecordType.COMMIT); + + buffer.flip(); + + appendAsFollower(log, MemoryRecords.readableRecords(buffer), epoch); + + List abortedTransactions = LogTestUtils.allAbortedTransactions(log); + List expectedTransactions = List.of( + new AbortedTxn(pid1, 0L, 29L, 8L), + new AbortedTxn(pid2, 8L, 74L, 36L) + ); + + assertEquals(expectedTransactions, abortedTransactions); + + // Verify caching of the segment position of the first unstable offset + log.updateHighWatermark(30L); + assertCachedFirstUnstableOffset(log, 8L); + + log.updateHighWatermark(75L); + assertCachedFirstUnstableOffset(log, 36L); + + log.updateHighWatermark(log.logEndOffset()); + assertEquals(Optional.empty(), log.firstUnstableOffset()); + } + + @Test + public void testZombieCoordinatorFenced() throws IOException { + long pid = 1L; + short epoch = 0; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + Consumer append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime); + + append.accept(10); + LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), + 1, 0, TransactionVersion.TV_0.featureLevel()); + + append.accept(5); + LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds(), + 2, 0, TransactionVersion.TV_0.featureLevel()); + + assertThrows(TransactionCoordinatorFencedException.class, + () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), + 1, 0, TransactionVersion.TV_0.featureLevel())); + } + + @Test + public void testZombieCoordinatorFencedEmptyTransaction() throws IOException { + long pid = 1L; + short epoch = 0; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + ByteBuffer buffer = ByteBuffer.allocate(256); + BiConsumer append = appendTransactionalToBuffer(buffer, pid, epoch, 1); + append.accept(0L, 10); + appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, ControlRecordType.COMMIT, 1); + + buffer.flip(); + log.appendAsFollower(MemoryRecords.readableRecords(buffer), epoch); + + LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), + 2, 1, TransactionVersion.TV_0.featureLevel()); + assertThrows(TransactionCoordinatorFencedException.class, + () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), + 1, 1, TransactionVersion.TV_0.featureLevel())); + } + + @ParameterizedTest(name = "testEndTxnWithFencedProducerEpoch with transactionVersion={0}") + @ValueSource(shorts = {0, 1, 2}) + public void testEndTxnWithFencedProducerEpoch(short transactionVersion) throws IOException { + long producerId = 1L; + short epoch = 5; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + // First, write some transactional records to establish the current epoch + MemoryRecords records = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, epoch, 0, + new SimpleRecord("key".getBytes(), "value".getBytes()) + ); + log.appendAsLeader(records, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion); + + // Test 1: Old epoch (epoch - 1) should be rejected for both TV0/TV1 and TV2 + // TV0/TV1: markerEpoch < currentEpoch is rejected + // TV2: markerEpoch <= currentEpoch is rejected (requires strict >) + assertThrows(InvalidProducerEpochException.class, + () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, (short) (epoch - 1), + ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0, transactionVersion)); + + // Test 2: Same epoch behavior differs between TV0/TV1 and TV2 + // TV0/TV1: same epoch is allowed (markerEpoch >= currentEpoch) + // TV2: same epoch is rejected (requires strict >, markerEpoch > currentEpoch) + if (transactionVersion >= 2) { + // TV2: same epoch should be rejected + assertThrows(InvalidProducerEpochException.class, + () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, + ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0, transactionVersion)); + } else { + // TV0/TV1: same epoch should be allowed + assertDoesNotThrow(() -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, + ControlRecordType.ABORT, mockTime.milliseconds(), 1, 0, transactionVersion)); + } + } + + @Test + public void testTV2MarkerWithBumpedEpochSucceeds() throws IOException { + // Test that TV2 markers with bumped epochs (epoch + 1) are accepted (positive case) + // TV2 (KIP-890): Coordinator bumps epoch before writing marker, so markerEpoch = currentEpoch + 1 + short transactionVersion = 2; + long producerId = 1L; + short epoch = 5; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + // First, write some transactional records to establish the current epoch + MemoryRecords records = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, epoch, 0, + new SimpleRecord("key".getBytes(), "value".getBytes()) + ); + log.appendAsLeader(records, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion); + + // TV2: Verify that bumped epoch (epoch + 1) is accepted + short bumpedEpoch = (short) (epoch + 1); + assertDoesNotThrow(() -> LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, bumpedEpoch, + ControlRecordType.COMMIT, mockTime.milliseconds(), 1, + 0, TransactionVersion.TV_2.featureLevel())); + + // Verify the marker was successfully appended by checking producer state + ProducerStateEntry producerState = log.producerStateManager().activeProducers().get(producerId); + assertNotNull(producerState); + // After a commit marker, the producer epoch should be updated to the bumped epoch for TV2 + assertEquals(bumpedEpoch, producerState.producerEpoch()); + } + + @Test + public void testReplicationWithTVUnknownAllowed() throws IOException { + // Test that TV_UNKNOWN is allowed for replication (REPLICATION origin) and uses TV_0 validation + // This simulates the scenario where: + // 1. Leader receives WriteTxnMarkersRequest with transactionVersion=2 and validates with strict TV2 rules + // 2. Leader writes MemoryRecords to log (transactionVersion is not stored in MemoryRecords) + // 3. Follower receives MemoryRecords via replication (without transactionVersion metadata) + // 4. Follower uses TV_UNKNOWN which defaults to TV_0 validation (more permissive, safe because leader already validated) + long producerId = 1L; + short epoch = 5; + int coordinatorEpoch = 1; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + // Step 1: Write transactional records as leader to establish current epoch + MemoryRecords transactionalRecords = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, epoch, 0, + new SimpleRecord("key".getBytes(), "value".getBytes()) + ); + log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_2.featureLevel()); + + // Step 2: Simulate leader writing TV2 marker with bumped epoch (epoch + 1) + // This is what happens at the leader when WriteTxnMarkersRequest is received + short bumpedEpoch = (short) (epoch + 1); + MemoryRecords leaderMarker = MemoryRecords.withEndTransactionMarker( + mockTime.milliseconds(), + producerId, + bumpedEpoch, + new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) + ); + // Leader validates with TV2 (strict: markerEpoch > currentEpoch) + log.appendAsLeader(leaderMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_2.featureLevel()); + + // Verify leader state + ProducerStateEntry leaderProducerState = log.producerStateManager().activeProducers().get(producerId); + assertNotNull(leaderProducerState); + assertEquals(bumpedEpoch, leaderProducerState.producerEpoch()); + + // Step 3: Create a new log to simulate a follower + File followerLogDir = TestUtils.randomPartitionLogDir(tmpDir); + UnifiedLog followerLog = createLog(followerLogDir, logConfig); + + // Step 4: Follower replicates transactional records first + MemoryRecords followerTransactionalRecords = MemoryRecords.withTransactionalRecords( + 0L, Compression.NONE, producerId, epoch, 0, 0, + new SimpleRecord("key".getBytes(), "value".getBytes()) + ); + followerLog.appendAsFollower(followerTransactionalRecords, 0); + + // Step 5: Follower replicates the marker (appendAsFollower uses TV_UNKNOWN internally) + // This should succeed because TV_UNKNOWN is allowed for REPLICATION origin + // and defaults to TV_0 validation (markerEpoch >= currentEpoch), which is more permissive + // The marker should be at offset 1 (after the transactional record at offset 0) + MemoryRecords followerMarker = MemoryRecords.withEndTransactionMarker( + 1L, // offset after the transactional record + mockTime.milliseconds(), + 0, // partition leader epoch + producerId, + bumpedEpoch, + new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) + ); + + // This should not throw an exception - TV_UNKNOWN is allowed for replication + assertDoesNotThrow(() -> followerLog.appendAsFollower(followerMarker, 0)); + + // Verify follower state matches leader state + ProducerStateEntry followerProducerState = followerLog.producerStateManager().activeProducers().get(producerId); + assertNotNull(followerProducerState); + assertEquals(bumpedEpoch, followerProducerState.producerEpoch()); + assertEquals(coordinatorEpoch, followerProducerState.coordinatorEpoch()); + + // Verify the marker was written to the follower log + assertEquals(2L, followerLog.logEndOffset()); // 1 transactional record + 1 marker + } + + @Test + public void testLeaderRejectsTVUnknownForTransactionMarker() throws IOException { + // Test that TV_UNKNOWN is rejected for COORDINATOR origin (leader writing transaction markers) + // TV_UNKNOWN is only allowed for REPLICATION origin (followers) + long producerId = 1L; + short epoch = 5; + int coordinatorEpoch = 1; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + // Write transactional records as leader to establish current epoch + MemoryRecords transactionalRecords = MemoryRecords.withTransactionalRecords( + Compression.NONE, producerId, epoch, 0, + new SimpleRecord("key".getBytes(), "value".getBytes()) + ); + log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_2.featureLevel()); + + // Attempt to write a transaction marker with TV_UNKNOWN as COORDINATOR (leader) + // This should throw IllegalArgumentException because TV_UNKNOWN is not allowed for COORDINATOR origin + MemoryRecords marker = MemoryRecords.withEndTransactionMarker( + mockTime.milliseconds(), + producerId, + (short) (epoch + 1), // bumped epoch for TV2 + new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) + ); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + log.appendAsLeader(marker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_UNKNOWN)); + + assertTrue(exception.getMessage().contains("transactionVersion must be explicitly specified")); + assertTrue(exception.getMessage().contains("TV_UNKNOWN")); + assertTrue(exception.getMessage().contains("COORDINATOR")); + } + + @Test + public void testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + short epoch = 0; + long pid = 1L; + Consumer appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime); + + appendPid.accept(5); + LogTestUtils.appendNonTransactionalAsLeader(log, 3); + assertEquals(8L, log.logEndOffset()); + + log.roll(); + assertEquals(2, log.logSegments().size()); + appendPid.accept(5); + + assertEquals(Optional.of(0L), log.firstUnstableOffset()); + + log.updateHighWatermark(log.logEndOffset()); + log.maybeIncrementLogStartOffset(5L, LogStartOffsetIncrementReason.ClientRecordDeletion); + + // the first unstable offset should be lower bounded by the log start offset + assertEquals(Optional.of(5L), log.firstUnstableOffset()); + } + + @Test + public void testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + short epoch = 0; + long pid = 1L; + Consumer appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime); + + appendPid.accept(5); + LogTestUtils.appendNonTransactionalAsLeader(log, 3); + assertEquals(8L, log.logEndOffset()); + + log.roll(); + assertEquals(2, log.logSegments().size()); + appendPid.accept(5); + + assertEquals(Optional.of(0L), log.firstUnstableOffset()); + + log.updateHighWatermark(log.logEndOffset()); + log.maybeIncrementLogStartOffset(8L, LogStartOffsetIncrementReason.ClientRecordDeletion); + log.updateHighWatermark(log.logEndOffset()); + assertTrue(log.deleteOldSegments() > 0, "At least one segment should be deleted"); + assertEquals(1, log.logSegments().size()); + + // the first unstable offset should be lower bounded by the log start offset + assertEquals(Optional.of(8L), log.firstUnstableOffset()); + } + + @Test + public void testAppendToTransactionIndexFailure() throws IOException { + long pid = 1L; + short epoch = 0; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + Consumer append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime); + append.accept(10); + + // Kind of a hack, but renaming the index to a directory ensures that the append + // to the index will fail. + log.activeSegment().txnIndex().renameTo(log.dir()); + + // The append will be written to the log successfully, but the write to the index will fail + assertThrows(KafkaStorageException.class, + () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), + 1, 0, TransactionVersion.TV_0.featureLevel())); + assertEquals(11L, log.logEndOffset()); + assertEquals(0L, log.lastStableOffset()); + + // Try the append a second time. The appended offset in the log should not increase + // because the log dir is marked as failed. Nor will there be a write to the transaction + // index. + assertThrows(KafkaStorageException.class, + () -> LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), + 1, 0, TransactionVersion.TV_0.featureLevel())); + assertEquals(11L, log.logEndOffset()); + assertEquals(0L, log.lastStableOffset()); + + // Even if the high watermark is updated, the first unstable offset does not move + log.updateHighWatermark(12L); + assertEquals(0L, log.lastStableOffset()); + + assertThrows(KafkaStorageException.class, () -> log.close()); + UnifiedLog reopenedLog = createLog(logDir, logConfig, 0L, 0L, brokerTopicStats, mockTime.scheduler, mockTime, + producerStateManagerConfig, false, Optional.empty(), false); + assertEquals(11L, reopenedLog.logEndOffset()); + assertEquals(1, reopenedLog.activeSegment().txnIndex().allAbortedTxns().size()); + reopenedLog.updateHighWatermark(12L); + assertEquals(Optional.empty(), reopenedLog.firstUnstableOffset()); + } + + @Test + public void testOffsetSnapshot() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + // append a few records + appendAsFollower( + log, + MemoryRecords.withRecords( + Compression.NONE, + new SimpleRecord("a".getBytes()), + new SimpleRecord("b".getBytes()), + new SimpleRecord("c".getBytes()) + ), + 5 + ); + + log.updateHighWatermark(3L); + LogOffsetSnapshot offsets = log.fetchOffsetSnapshot(); + assertEquals(3L, offsets.highWatermark().messageOffset); + assertFalse(offsets.highWatermark().messageOffsetOnly()); + + offsets = log.fetchOffsetSnapshot(); + assertEquals(3L, offsets.highWatermark().messageOffset); + assertFalse(offsets.highWatermark().messageOffsetOnly()); + } + + @Test + public void testLastStableOffsetWithMixedProducerData() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + + // for convenience, both producers share the same epoch + short epoch = 5; + + long pid1 = 137L; + int seq1 = 0; + long pid2 = 983L; + int seq2 = 0; + + // add some transactional records + LogAppendInfo firstAppendInfo = log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid1, epoch, seq1, + new SimpleRecord("a".getBytes()), + new SimpleRecord("b".getBytes()), + new SimpleRecord("c".getBytes())), 0); + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + + // mix in some non-transactional data + log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("g".getBytes()), + new SimpleRecord("h".getBytes()), + new SimpleRecord("i".getBytes())), 0); + + // append data from a second transactional producer + LogAppendInfo secondAppendInfo = log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid2, epoch, seq2, + new SimpleRecord("d".getBytes()), + new SimpleRecord("e".getBytes()), + new SimpleRecord("f".getBytes())), 0); + + // LSO should not have changed + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + + // now first producer's transaction is aborted + LogAppendInfo abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT, + mockTime.milliseconds(), 0, 0, TransactionVersion.TV_0.featureLevel()); + log.updateHighWatermark(abortAppendInfo.lastOffset() + 1); + + // LSO should now point to one less than the first offset of the second transaction + assertEquals(Optional.of(secondAppendInfo.firstOffset()), log.firstUnstableOffset()); + + // commit the second transaction + LogAppendInfo commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.COMMIT, + mockTime.milliseconds(), 0, 0, TransactionVersion.TV_0.featureLevel()); + log.updateHighWatermark(commitAppendInfo.lastOffset() + 1); + + // now there should be no first unstable offset + assertEquals(Optional.empty(), log.firstUnstableOffset()); + } + + @Test + public void testAbortedTransactionSpanningMultipleSegments() throws IOException { + long pid = 137L; + short epoch = 5; + int seq = 0; + + MemoryRecords records = MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, + new SimpleRecord("a".getBytes()), + new SimpleRecord("b".getBytes()), + new SimpleRecord("c".getBytes())); + + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(records.sizeInBytes()).build(); + log = createLog(logDir, logConfig); + + LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0); + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + + // this write should spill to the second segment + log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, 3, + new SimpleRecord("d".getBytes()), + new SimpleRecord("e".getBytes()), + new SimpleRecord("f".getBytes())), 0); + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + assertEquals(3L, log.logEndOffsetMetadata().segmentBaseOffset); + + // now abort the transaction + LogAppendInfo abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, + mockTime.milliseconds(), 0, 0, TransactionVersion.TV_0.featureLevel()); + log.updateHighWatermark(abortAppendInfo.lastOffset() + 1); + assertEquals(Optional.empty(), log.firstUnstableOffset()); + + // now check that a fetch includes the aborted transaction + FetchDataInfo fetchDataInfo = log.read(0L, 2048, FetchIsolation.TXN_COMMITTED, true); + + assertTrue(fetchDataInfo.abortedTransactions.isPresent()); + assertEquals(1, fetchDataInfo.abortedTransactions.get().size()); + assertEquals(new FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), fetchDataInfo.abortedTransactions.get().get(0)); + } + + @Test + public void testLoadPartitionDirWithNoSegmentsShouldNotThrow() throws IOException { + String dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 3)); + File testLogDir = new File(tmpDir, dirName); + testLogDir.mkdirs(); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().build(); + UnifiedLog testLog = createLog(testLogDir, logConfig); + assertEquals(1, testLog.numberOfSegments()); + } + + @Test + public void testSegmentDeletionWithHighWatermarkInitialization() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).retentionMs(999).build(); + log = createLog(logDir, logConfig); + + long expiredTimestamp = mockTime.milliseconds() - 1000; + for (int i = 0; i < 100; i++) { + MemoryRecords records = LogTestUtils.singletonRecords(("test" + i).getBytes(), Compression.NONE, null, expiredTimestamp); + log.appendAsLeader(records, 0); + } + + long initialHighWatermark = log.updateHighWatermark(25L); + assertEquals(25L, initialHighWatermark); + + int initialNumSegments = log.numberOfSegments(); + assertTrue(log.deleteOldSegments() > 0, "At least one segment should be deleted"); + assertTrue(log.numberOfSegments() < initialNumSegments); + assertTrue(log.logStartOffset() <= initialHighWatermark); + } + + @Test + public void testCannotDeleteSegmentsAtOrAboveHighWatermark() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).retentionMs(999).build(); + log = createLog(logDir, logConfig); + + long expiredTimestamp = mockTime.milliseconds() - 1000; + for (int i = 0; i < 100; i++) { + MemoryRecords records = LogTestUtils.singletonRecords(("test" + i).getBytes(), Compression.NONE, null, expiredTimestamp); + log.appendAsLeader(records, 0); + } + + // ensure we have at least a few segments so the test case is not trivial + assertTrue(log.numberOfSegments() > 5); + assertEquals(0L, log.highWatermark()); + assertEquals(0L, log.logStartOffset()); + assertEquals(100L, log.logEndOffset()); + + for (int hw = 0; hw <= 100; hw++) { + log.updateHighWatermark(hw); + assertEquals(hw, log.highWatermark()); + log.deleteOldSegments(); + assertTrue(log.logStartOffset() <= hw); + + // verify that all segments up to the high watermark have been deleted + List segments = log.logSegments(); + if (!segments.isEmpty()) { + assertTrue(segments.get(0).baseOffset() <= hw); + assertTrue(segments.get(0).baseOffset() >= log.logStartOffset()); + } + for (int i = 1; i < segments.size(); i++) { + assertTrue(segments.get(i).baseOffset() > hw); + assertTrue(segments.get(i).baseOffset() >= log.logStartOffset()); + } + } + + assertEquals(100L, log.logStartOffset()); + assertEquals(1, log.numberOfSegments()); + assertEquals(0, log.activeSegment().size()); + } + + @Test + public void testCannotIncrementLogStartOffsetPastHighWatermark() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(512).segmentIndexBytes(1000).build(); + log = createLog(logDir, logConfig); + + for (int i = 0; i < 100; i++) { + MemoryRecords records = LogTestUtils.singletonRecords(("test" + i).getBytes(), null); + log.appendAsLeader(records, 0); + } + + log.updateHighWatermark(25L); + assertThrows(OffsetOutOfRangeException.class, + () -> log.maybeIncrementLogStartOffset(26L, LogStartOffsetIncrementReason.ClientRecordDeletion)); + } + + @Test + public void testBackgroundDeletionWithIOException() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024).build(); + log = createLog(logDir, logConfig); + assertEquals(1, log.numberOfSegments(), "The number of segments should be 1"); + + // Delete the underlying directory to trigger a KafkaStorageException + File dir = log.dir(); + Utils.delete(dir); + Files.createFile(dir.toPath()); + + assertThrows(KafkaStorageException.class, () -> log.delete()); + assertTrue(log.logDirFailureChannel().hasOfflineLogDir(tmpDir.toString())); + } + + /** + * test renaming a log's dir without reinitialization, which is the case during topic deletion + */ + @Test + public void testRenamingDirWithoutReinitialization() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024).build(); + log = createLog(logDir, logConfig); + assertEquals(1, log.numberOfSegments(), "The number of segments should be 1"); + + File newDir = TestUtils.randomPartitionLogDir(tmpDir); + assertTrue(newDir.exists()); + + log.renameDir(newDir.getName(), false); + assertFalse(log.leaderEpochCache().nonEmpty()); + assertTrue(log.partitionMetadataFile().isEmpty()); + assertEquals(0, log.logEndOffset()); + + // verify that the background deletion can succeed + log.delete(); + assertEquals(0, log.numberOfSegments(), "The number of segments should be 0"); + assertFalse(newDir.exists()); + } + + private BiConsumer appendTransactionalToBuffer(ByteBuffer buffer, long producerId, short producerEpoch) { + return appendTransactionalToBuffer(buffer, producerId, producerEpoch, 0); + } + + private BiConsumer appendTransactionalToBuffer(ByteBuffer buffer, long producerId, short producerEpoch, int leaderEpoch) { + int[] sequence = {0}; + return (offset, numRecords) -> { + int baseSequence = sequence[0]; + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME, + offset, mockTime.milliseconds(), producerId, producerEpoch, baseSequence, true, leaderEpoch); + for (int seq = baseSequence; seq < baseSequence + numRecords; seq++) { + builder.append(new SimpleRecord(String.valueOf(seq).getBytes())); + } + sequence[0] += numRecords; + builder.close(); + }; + } + + private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long producerId, short producerEpoch, + long offset, ControlRecordType controlType) { + appendEndTxnMarkerToBuffer(buffer, producerId, producerEpoch, offset, controlType, 0, 0); + } + + private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long producerId, short producerEpoch, + long offset, ControlRecordType controlType, + int coordinatorEpoch) { + appendEndTxnMarkerToBuffer(buffer, producerId, producerEpoch, offset, controlType, coordinatorEpoch, 0); + } + + private void appendEndTxnMarkerToBuffer(ByteBuffer buffer, long producerId, short producerEpoch, + long offset, ControlRecordType controlType, + int coordinatorEpoch, int leaderEpoch) { + EndTransactionMarker marker = new EndTransactionMarker(controlType, coordinatorEpoch); + MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker); + } + + private void appendNonTransactionalToBuffer(ByteBuffer buffer, long offset, int numRecords) { + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, offset); + for (int seq = 0; seq < numRecords; seq++) { + builder.append(new SimpleRecord(String.valueOf(seq).getBytes())); + } + builder.close(); + } + + private void assertCachedFirstUnstableOffset(UnifiedLog log, long expectedOffset) throws IOException { + assertTrue(log.producerStateManager().firstUnstableOffset().isPresent()); + LogOffsetMetadata firstUnstableOffset = log.producerStateManager().firstUnstableOffset().get(); + assertEquals(expectedOffset, firstUnstableOffset.messageOffset); + assertFalse(firstUnstableOffset.messageOffsetOnly()); + assertValidLogOffsetMetadata(log, firstUnstableOffset); + } } From 30b116e1890f05617b1b6ecd20499eba45eed42a Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 21 Mar 2026 13:56:08 +0800 Subject: [PATCH 2/3] remove --- .../scala/unit/kafka/log/UnifiedLogTest.scala | 639 ------------------ 1 file changed, 639 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index b01375e59ae92..8d3d2b6c1e8ae 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -754,7 +754,6 @@ class UnifiedLogTest { } } - @Test def testTransactionIndexUpdated(): Unit = { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) @@ -814,66 +813,6 @@ class UnifiedLogTest { assertEquals(Optional.empty, log.firstUnstableOffset) } - @Test - def testTransactionIndexUpdatedThroughReplication(): Unit = { - val epoch = 0.toShort - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - val buffer = ByteBuffer.allocate(2048) - - val pid1 = 1L - val pid2 = 2L - val pid3 = 3L - val pid4 = 4L - - val appendPid1 = appendTransactionalToBuffer(buffer, pid1, epoch) - val appendPid2 = appendTransactionalToBuffer(buffer, pid2, epoch) - val appendPid3 = appendTransactionalToBuffer(buffer, pid3, epoch) - val appendPid4 = appendTransactionalToBuffer(buffer, pid4, epoch) - - appendPid1(0L, 5) - appendNonTransactionalToBuffer(buffer, 5L, 3) - appendPid2(8L, 2) - appendPid1(10L, 4) - appendPid3(14L, 3) - appendNonTransactionalToBuffer(buffer, 17L, 2) - appendPid1(19L, 10) - appendEndTxnMarkerToBuffer(buffer, pid1, epoch, 29L, ControlRecordType.ABORT) - appendPid2(30L, 6) - appendPid4(36L, 3) - appendNonTransactionalToBuffer(buffer, 39L, 10) - appendPid3(49L, 9) - appendEndTxnMarkerToBuffer(buffer, pid3, epoch, 58L, ControlRecordType.COMMIT) - appendPid4(59L, 8) - appendPid2(67L, 7) - appendEndTxnMarkerToBuffer(buffer, pid2, epoch, 74L, ControlRecordType.ABORT) - appendNonTransactionalToBuffer(buffer, 75L, 10) - appendPid4(85L, 4) - appendEndTxnMarkerToBuffer(buffer, pid4, epoch, 89L, ControlRecordType.COMMIT) - - buffer.flip() - - appendAsFollower(log, MemoryRecords.readableRecords(buffer), epoch) - - val abortedTransactions = LogTestUtils.allAbortedTransactions(log) - val expectedTransactions = List( - new AbortedTxn(pid1, 0L, 29L, 8L), - new AbortedTxn(pid2, 8L, 74L, 36L) - ) - - assertEquals(expectedTransactions, abortedTransactions) - - // Verify caching of the segment position of the first unstable offset - log.updateHighWatermark(30L) - assertCachedFirstUnstableOffset(log, expectedOffset = 8L) - - log.updateHighWatermark(75L) - assertCachedFirstUnstableOffset(log, expectedOffset = 36L) - - log.updateHighWatermark(log.logEndOffset) - assertEquals(Optional.empty, log.firstUnstableOffset) - } - private def assertCachedFirstUnstableOffset(log: UnifiedLog, expectedOffset: Long): Unit = { assertTrue(log.producerStateManager.firstUnstableOffset.isPresent) val firstUnstableOffset = log.producerStateManager.firstUnstableOffset.get @@ -904,587 +843,9 @@ class UnifiedLogTest { assertNull(readInfo) } - @Test - def testZombieCoordinatorFenced(): Unit = { - val pid = 1L - val epoch = 0.toShort - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime) - - append(10) - LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), - coordinatorEpoch = 1, transactionVersion = TransactionVersion.TV_0.featureLevel()) - - append(5) - LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds(), - coordinatorEpoch = 2, transactionVersion = TransactionVersion.TV_0.featureLevel()) - - assertThrows( - classOf[TransactionCoordinatorFencedException], - () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), - coordinatorEpoch = 1, transactionVersion = TransactionVersion.TV_0.featureLevel())) - } - - @Test - def testZombieCoordinatorFencedEmptyTransaction(): Unit = { - val pid = 1L - val epoch = 0.toShort - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - val buffer = ByteBuffer.allocate(256) - val append = appendTransactionalToBuffer(buffer, pid, epoch, 1) - append(0, 10) - appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, ControlRecordType.COMMIT, 1) - - buffer.flip() - log.appendAsFollower(MemoryRecords.readableRecords(buffer), epoch) - - LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), - coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion = TransactionVersion.TV_0.featureLevel()) - LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), - coordinatorEpoch = 2, leaderEpoch = 1, transactionVersion = TransactionVersion.TV_0.featureLevel()) - assertThrows(classOf[TransactionCoordinatorFencedException], - () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), - coordinatorEpoch = 1, leaderEpoch = 1, transactionVersion = TransactionVersion.TV_0.featureLevel())) - } - - @ParameterizedTest(name = "testEndTxnWithFencedProducerEpoch with transactionVersion={0}") - @ValueSource(shorts = Array(1, 2)) - def testEndTxnWithFencedProducerEpoch(transactionVersion: Short): Unit = { - val producerId = 1L - val epoch = 5.toShort - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - // First, write some transactional records to establish the current epoch - val records = MemoryRecords.withTransactionalRecords( - Compression.NONE, producerId, epoch, 0, - new SimpleRecord("key".getBytes, "value".getBytes) - ) - log.appendAsLeader(records, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion) - - // Test 1: Old epoch (epoch - 1) should be rejected for both TV0/TV1 and TV2 - // TV0/TV1: markerEpoch < currentEpoch is rejected - // TV2: markerEpoch <= currentEpoch is rejected (requires strict >) - assertThrows(classOf[InvalidProducerEpochException], - () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, (epoch - 1).toShort, - ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 1, - leaderEpoch = 0, transactionVersion = transactionVersion)) - - // Test 2: Same epoch behavior differs between TV0/TV1 and TV2 - // TV0/TV1: same epoch is allowed (markerEpoch >= currentEpoch) - // TV2: same epoch is rejected (requires strict >, markerEpoch > currentEpoch) - if (transactionVersion >= 2) { - // TV2: same epoch should be rejected - assertThrows(classOf[InvalidProducerEpochException], - () => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, - ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 1, - leaderEpoch = 0, transactionVersion = transactionVersion)) - } else { - // TV0/TV1: same epoch should be allowed - assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, epoch, - ControlRecordType.ABORT, mockTime.milliseconds(), coordinatorEpoch = 1, - leaderEpoch = 0, transactionVersion = transactionVersion)) - } - } - - @Test - def testTV2MarkerWithBumpedEpochSucceeds(): Unit = { - // Test that TV2 markers with bumped epochs (epoch + 1) are accepted (positive case) - // TV2 (KIP-890): Coordinator bumps epoch before writing marker, so markerEpoch = currentEpoch + 1 - val transactionVersion: Short = 2 - val producerId = 1L - val epoch = 5.toShort - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - // First, write some transactional records to establish the current epoch - val records = MemoryRecords.withTransactionalRecords( - Compression.NONE, producerId, epoch, 0, - new SimpleRecord("key".getBytes, "value".getBytes) - ) - log.appendAsLeader(records, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, transactionVersion) - - // TV2: Verify that bumped epoch (epoch + 1) is accepted - val bumpedEpoch = (epoch + 1).toShort - assertDoesNotThrow(() => LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId, bumpedEpoch, - ControlRecordType.COMMIT, mockTime.milliseconds(), coordinatorEpoch = 1, - leaderEpoch = 0, transactionVersion = TransactionVersion.TV_2.featureLevel())) - - // Verify the marker was successfully appended by checking producer state - val producerState = log.producerStateManager.activeProducers.get(producerId) - assertNotNull(producerState) - // After a commit marker, the producer epoch should be updated to the bumped epoch for TV2 - assertEquals(bumpedEpoch, producerState.producerEpoch) - } - - @Test - def testReplicationWithTVUnknownAllowed(): Unit = { - // Test that TV_UNKNOWN is allowed for replication (REPLICATION origin) and uses TV_0 validation - // This simulates the scenario where: - // 1. Leader receives WriteTxnMarkersRequest with transactionVersion=2 and validates with strict TV2 rules - // 2. Leader writes MemoryRecords to log (transactionVersion is not stored in MemoryRecords) - // 3. Follower receives MemoryRecords via replication (without transactionVersion metadata) - // 4. Follower uses TV_UNKNOWN which defaults to TV_0 validation (more permissive, safe because leader already validated) - - val producerId = 1L - val epoch = 5.toShort - val coordinatorEpoch = 1 - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - // Step 1: Write transactional records as leader to establish current epoch - val transactionalRecords = MemoryRecords.withTransactionalRecords( - Compression.NONE, producerId, epoch, 0, - new SimpleRecord("key".getBytes, "value".getBytes) - ) - log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_2.featureLevel()) - - // Step 2: Simulate leader writing TV2 marker with bumped epoch (epoch + 1) - // This is what happens at the leader when WriteTxnMarkersRequest is received - val bumpedEpoch = (epoch + 1).toShort - val leaderMarker = MemoryRecords.withEndTransactionMarker( - mockTime.milliseconds(), - producerId, - bumpedEpoch, - new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) - ) - // Leader validates with TV2 (strict: markerEpoch > currentEpoch) - log.appendAsLeader(leaderMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_2.featureLevel()) - - // Verify leader state - val leaderProducerState = log.producerStateManager.activeProducers.get(producerId) - assertNotNull(leaderProducerState) - assertEquals(bumpedEpoch, leaderProducerState.producerEpoch) - - // Step 3: Create a new log to simulate a follower - val followerLogDir = TestUtils.randomPartitionLogDir(tmpDir) - val followerLog = createLog(followerLogDir, logConfig) - - // Step 4: Follower replicates transactional records first - val followerTransactionalRecords = MemoryRecords.withTransactionalRecords( - 0L, - Compression.NONE, producerId, epoch, 0, - 0, - new SimpleRecord("key".getBytes, "value".getBytes) - ) - followerLog.appendAsFollower(followerTransactionalRecords, 0) - - // Step 5: Follower replicates the marker (appendAsFollower uses TV_UNKNOWN internally) - // This should succeed because TV_UNKNOWN is allowed for REPLICATION origin - // and defaults to TV_0 validation (markerEpoch >= currentEpoch), which is more permissive - // The marker should be at offset 1 (after the transactional record at offset 0) - val followerMarker = MemoryRecords.withEndTransactionMarker( - 1L, // offset after the transactional record - mockTime.milliseconds(), - 0, // partition leader epoch - producerId, - bumpedEpoch, - new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) - ) - - // This should not throw an exception - TV_UNKNOWN is allowed for replication - assertDoesNotThrow(() => followerLog.appendAsFollower(followerMarker, 0)) - - // Verify follower state matches leader state - val followerProducerState = followerLog.producerStateManager.activeProducers.get(producerId) - assertNotNull(followerProducerState) - assertEquals(bumpedEpoch, followerProducerState.producerEpoch) - assertEquals(coordinatorEpoch, followerProducerState.coordinatorEpoch) - - // Verify the marker was written to the follower log - assertEquals(2L, followerLog.logEndOffset) // 1 transactional record + 1 marker - } - - @Test - def testLeaderRejectsTVUnknownForTransactionMarker(): Unit = { - // Test that TV_UNKNOWN is rejected for COORDINATOR origin (leader writing transaction markers) - // TV_UNKNOWN is only allowed for REPLICATION origin (followers) - val producerId = 1L - val epoch = 5.toShort - val coordinatorEpoch = 1 - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - // Write transactional records as leader to establish current epoch - val transactionalRecords = MemoryRecords.withTransactionalRecords( - Compression.NONE, producerId, epoch, 0, - new SimpleRecord("key".getBytes, "value".getBytes) - ) - log.appendAsLeader(transactionalRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_2.featureLevel()) - - // Attempt to write a transaction marker with TV_UNKNOWN as COORDINATOR (leader) - // This should throw IllegalArgumentException because TV_UNKNOWN is not allowed for COORDINATOR origin - val marker = MemoryRecords.withEndTransactionMarker( - mockTime.milliseconds(), - producerId, - (epoch + 1).toShort, // bumped epoch for TV2 - new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) - ) - - val exception = assertThrows(classOf[IllegalArgumentException], () => { - log.appendAsLeader(marker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL, TransactionVersion.TV_UNKNOWN) - }) - - assertTrue(exception.getMessage.contains("transactionVersion must be explicitly specified")) - assertTrue(exception.getMessage.contains("TV_UNKNOWN")) - assertTrue(exception.getMessage.contains("COORDINATOR")) - } - - @Test - def testLastStableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - val epoch = 0.toShort - val pid = 1L - val appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime) - - appendPid(5) - LogTestUtils.appendNonTransactionalAsLeader(log, 3) - assertEquals(8L, log.logEndOffset) - - log.roll() - assertEquals(2, log.logSegments.size) - appendPid(5) - - assertEquals(Optional.of(0L), log.firstUnstableOffset) - - log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(5L, LogStartOffsetIncrementReason.ClientRecordDeletion) - - // the first unstable offset should be lower bounded by the log start offset - assertEquals(Optional.of(5L), log.firstUnstableOffset) - } - - @Test - def testLastStableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - val epoch = 0.toShort - val pid = 1L - val appendPid = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime) - - appendPid(5) - LogTestUtils.appendNonTransactionalAsLeader(log, 3) - assertEquals(8L, log.logEndOffset) - - log.roll() - assertEquals(2, log.logSegments.size) - appendPid(5) - - assertEquals(Optional.of(0L), log.firstUnstableOffset) - - log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(8L, LogStartOffsetIncrementReason.ClientRecordDeletion) - log.updateHighWatermark(log.logEndOffset) - assertTrue(log.deleteOldSegments > 0, "At least one segment should be deleted") - assertEquals(1, log.logSegments.size) - - // the first unstable offset should be lower bounded by the log start offset - assertEquals(Optional.of(8L), log.firstUnstableOffset) - } - - @Test - def testAppendToTransactionIndexFailure(): Unit = { - val pid = 1L - val epoch = 0.toShort - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - val append = LogTestUtils.appendTransactionalAsLeader(log, pid, epoch, mockTime) - append(10) - - // Kind of a hack, but renaming the index to a directory ensures that the append - // to the index will fail. - log.activeSegment.txnIndex.renameTo(log.dir) - - // The append will be written to the log successfully, but the write to the index will fail - assertThrows( - classOf[KafkaStorageException], - () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), - coordinatorEpoch = 1, transactionVersion = TransactionVersion.TV_0.featureLevel())) - assertEquals(11L, log.logEndOffset) - assertEquals(0L, log.lastStableOffset) - - // Try the append a second time. The appended offset in the log should not increase - // because the log dir is marked as failed. Nor will there be a write to the transaction - // index. - assertThrows( - classOf[KafkaStorageException], - () => LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, mockTime.milliseconds(), - coordinatorEpoch = 1, transactionVersion = TransactionVersion.TV_0.featureLevel())) - assertEquals(11L, log.logEndOffset) - assertEquals(0L, log.lastStableOffset) - - // Even if the high watermark is updated, the first unstable offset does not move - log.updateHighWatermark(12L) - assertEquals(0L, log.lastStableOffset) - - assertThrows(classOf[KafkaStorageException], () => log.close()) - val reopenedLog = createLog(logDir, logConfig, lastShutdownClean = false) - assertEquals(11L, reopenedLog.logEndOffset) - assertEquals(1, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size) - reopenedLog.updateHighWatermark(12L) - assertEquals(Optional.empty, reopenedLog.firstUnstableOffset) - } - - @Test - def testOffsetSnapshot(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - // append a few records - appendAsFollower( - log, - MemoryRecords.withRecords( - Compression.NONE, - new SimpleRecord("a".getBytes), - new SimpleRecord("b".getBytes), - new SimpleRecord("c".getBytes) - ), - 5 - ) - - - log.updateHighWatermark(3L) - var offsets: LogOffsetSnapshot = log.fetchOffsetSnapshot - assertEquals(offsets.highWatermark.messageOffset, 3L) - assertFalse(offsets.highWatermark.messageOffsetOnly) - - offsets = log.fetchOffsetSnapshot - assertEquals(offsets.highWatermark.messageOffset, 3L) - assertFalse(offsets.highWatermark.messageOffsetOnly) - } - - @Test - def testLastStableOffsetWithMixedProducerData(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - // for convenience, both producers share the same epoch - val epoch = 5.toShort - - val pid1 = 137L - val seq1 = 0 - val pid2 = 983L - val seq2 = 0 - - // add some transactional records - val firstAppendInfo = log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid1, epoch, seq1, - new SimpleRecord("a".getBytes), - new SimpleRecord("b".getBytes), - new SimpleRecord("c".getBytes)), 0) - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - - // mix in some non-transactional data - log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("g".getBytes), - new SimpleRecord("h".getBytes), - new SimpleRecord("i".getBytes)), 0) - - // append data from a second transactional producer - val secondAppendInfo = log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid2, epoch, seq2, - new SimpleRecord("d".getBytes), - new SimpleRecord("e".getBytes), - new SimpleRecord("f".getBytes)), 0) - - // LSO should not have changed - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - - // now first producer's transaction is aborted - val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT, - mockTime.milliseconds(), transactionVersion = TransactionVersion.TV_0.featureLevel()) - log.updateHighWatermark(abortAppendInfo.lastOffset + 1) - - // LSO should now point to one less than the first offset of the second transaction - assertEquals(Optional.of(secondAppendInfo.firstOffset), log.firstUnstableOffset) - - // commit the second transaction - val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.COMMIT, - mockTime.milliseconds(), transactionVersion = TransactionVersion.TV_0.featureLevel()) - log.updateHighWatermark(commitAppendInfo.lastOffset + 1) - - // now there should be no first unstable offset - assertEquals(Optional.empty, log.firstUnstableOffset) - } - - @Test - def testAbortedTransactionSpanningMultipleSegments(): Unit = { - val pid = 137L - val epoch = 5.toShort - var seq = 0 - - val records = MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, - new SimpleRecord("a".getBytes), - new SimpleRecord("b".getBytes), - new SimpleRecord("c".getBytes)) - - val logConfig = LogTestUtils.createLogConfig(segmentBytes = records.sizeInBytes) - val log = createLog(logDir, logConfig) - - val firstAppendInfo = log.appendAsLeader(records, 0) - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - - // this write should spill to the second segment - seq = 3 - log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, - new SimpleRecord("d".getBytes), - new SimpleRecord("e".getBytes), - new SimpleRecord("f".getBytes)), 0) - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset) - - // now abort the transaction - val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, - mockTime.milliseconds(), transactionVersion = TransactionVersion.TV_0.featureLevel()) - log.updateHighWatermark(abortAppendInfo.lastOffset + 1) - assertEquals(Optional.empty, log.firstUnstableOffset) - - // now check that a fetch includes the aborted transaction - val fetchDataInfo = log.read(0L, 2048, FetchIsolation.TXN_COMMITTED, true) - - assertTrue(fetchDataInfo.abortedTransactions.isPresent) - assertEquals(1, fetchDataInfo.abortedTransactions.get.size) - assertEquals(new FetchResponseData.AbortedTransaction().setProducerId(pid).setFirstOffset(0), fetchDataInfo.abortedTransactions.get.get(0)) - } - - @Test - def testLoadPartitionDirWithNoSegmentsShouldNotThrow(): Unit = { - val dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 3)) - val logDir = new File(tmpDir, dirName) - logDir.mkdirs() - val logConfig = LogTestUtils.createLogConfig() - val log = createLog(logDir, logConfig) - assertEquals(1, log.numberOfSegments) - } - - @Test - def testSegmentDeletionWithHighWatermarkInitialization(): Unit = { - val logConfig = LogTestUtils.createLogConfig( - segmentBytes = 512, - segmentIndexBytes = 1000, - retentionMs = 999 - ) - val log = createLog(logDir, logConfig) - - val expiredTimestamp = mockTime.milliseconds() - 1000 - for (i <- 0 until 100) { - val records = TestUtils.singletonRecords(value = s"test$i".getBytes, timestamp = expiredTimestamp) - log.appendAsLeader(records, 0) - } - - val initialHighWatermark = log.updateHighWatermark(25L) - assertEquals(25L, initialHighWatermark) - - val initialNumSegments = log.numberOfSegments - assertTrue(log.deleteOldSegments > 0, "At least one segment should be deleted") - assertTrue(log.numberOfSegments < initialNumSegments) - assertTrue(log.logStartOffset <= initialHighWatermark) - } - - @Test - def testCannotDeleteSegmentsAtOrAboveHighWatermark(): Unit = { - val logConfig = LogTestUtils.createLogConfig( - segmentBytes = 512, - segmentIndexBytes = 1000, - retentionMs = 999 - ) - val log = createLog(logDir, logConfig) - - val expiredTimestamp = mockTime.milliseconds() - 1000 - for (i <- 0 until 100) { - val records = TestUtils.singletonRecords(value = s"test$i".getBytes, timestamp = expiredTimestamp) - log.appendAsLeader(records, 0) - } - - // ensure we have at least a few segments so the test case is not trivial - assertTrue(log.numberOfSegments > 5) - assertEquals(0L, log.highWatermark) - assertEquals(0L, log.logStartOffset) - assertEquals(100L, log.logEndOffset) - - for (hw <- 0 to 100) { - log.updateHighWatermark(hw) - assertEquals(hw, log.highWatermark) - log.deleteOldSegments() - assertTrue(log.logStartOffset <= hw) - - // verify that all segments up to the high watermark have been deleted - log.logSegments.asScala.headOption.foreach { segment => - assertTrue(segment.baseOffset <= hw) - assertTrue(segment.baseOffset >= log.logStartOffset) - } - log.logSegments.asScala.tail.foreach { segment => - assertTrue(segment.baseOffset > hw) - assertTrue(segment.baseOffset >= log.logStartOffset) - } - } - - assertEquals(100L, log.logStartOffset) - assertEquals(1, log.numberOfSegments) - assertEquals(0, log.activeSegment.size) - } - - @Test - def testCannotIncrementLogStartOffsetPastHighWatermark(): Unit = { - val logConfig = LogTestUtils.createLogConfig( - segmentBytes = 512, - segmentIndexBytes = 1000, - retentionMs = 999 - ) - val log = createLog(logDir, logConfig) - - for (i <- 0 until 100) { - val records = TestUtils.singletonRecords(value = s"test$i".getBytes) - log.appendAsLeader(records, 0) - } - - log.updateHighWatermark(25L) - assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, LogStartOffsetIncrementReason.ClientRecordDeletion)) - } - - @Test - def testBackgroundDeletionWithIOException(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) - val log = createLog(logDir, logConfig) - assertEquals(1, log.numberOfSegments, "The number of segments should be 1") - - // Delete the underlying directory to trigger a KafkaStorageException - val dir = log.dir - Utils.delete(dir) - Files.createFile(dir.toPath) - - assertThrows(classOf[KafkaStorageException], () => { - log.delete() - }) - assertTrue(log.logDirFailureChannel.hasOfflineLogDir(tmpDir.toString)) - } - /** * test renaming a log's dir without reinitialization, which is the case during topic deletion */ - @Test - def testRenamingDirWithoutReinitialization(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) - val log = createLog(logDir, logConfig) - assertEquals(1, log.numberOfSegments, "The number of segments should be 1") - - val newDir = TestUtils.randomPartitionLogDir(tmpDir) - assertTrue(newDir.exists()) - - log.renameDir(newDir.getName, false) - assertFalse(log.leaderEpochCache.nonEmpty) - assertTrue(log.partitionMetadataFile.isEmpty) - assertEquals(0, log.logEndOffset) - - // verify that the background deletion can succeed - log.delete() - assertEquals(0, log.numberOfSegments, "The number of segments should be 0") - assertFalse(newDir.exists()) - } @Test def testMaybeUpdateHighWatermarkAsFollower(): Unit = { From abc5b038ea911a1739990e345ef7906b1b9f0336 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 21 Mar 2026 14:43:04 +0800 Subject: [PATCH 3/3] fix build --- .../scala/unit/kafka/log/UnifiedLogTest.scala | 80 +------------------ .../storage/internals/log/UnifiedLogTest.java | 8 -- 2 files changed, 2 insertions(+), 86 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 2f4603ac9f9cf..03779e1015168 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -24,21 +24,18 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.internal._ -import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManager import org.apache.kafka.server.metrics.KafkaYammerMetrics -import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.{MockTime, Scheduler} -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetResultHolder, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats -import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _} +import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{EnumSource, ValueSource} @@ -46,7 +43,6 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doThrow, spy} import java.io._ -import java.nio.ByteBuffer import java.nio.file.Files import java.util import java.util.concurrent.ConcurrentHashMap @@ -84,37 +80,6 @@ class UnifiedLogTest { } } - - private def assertCachedFirstUnstableOffset(log: UnifiedLog, expectedOffset: Long): Unit = { - assertTrue(log.producerStateManager.firstUnstableOffset.isPresent) - val firstUnstableOffset = log.producerStateManager.firstUnstableOffset.get - assertEquals(expectedOffset, firstUnstableOffset.messageOffset) - assertFalse(firstUnstableOffset.messageOffsetOnly) - assertValidLogOffsetMetadata(log, firstUnstableOffset) - } - - private def assertValidLogOffsetMetadata(log: UnifiedLog, offsetMetadata: LogOffsetMetadata): Unit = { - assertFalse(offsetMetadata.messageOffsetOnly) - - val segmentBaseOffset = offsetMetadata.segmentBaseOffset - val segments = log.logSegments(segmentBaseOffset, segmentBaseOffset + 1) - assertFalse(segments.isEmpty) - - val segment = segments.iterator().next() - assertEquals(segmentBaseOffset, segment.baseOffset) - assertTrue(offsetMetadata.relativePositionInSegment <= segment.size) - - val readInfo = segment.read(offsetMetadata.messageOffset, - 2048, - Optional.of(segment.size), - false) - - if (offsetMetadata.relativePositionInSegment < segment.size) - assertEquals(offsetMetadata, readInfo.fetchOffsetMetadata) - else - assertNull(readInfo) - } - /** * test renaming a log's dir without reinitialization, which is the case during topic deletion */ @@ -1011,47 +976,6 @@ class UnifiedLogTest { assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()), result) } - private def appendTransactionalToBuffer(buffer: ByteBuffer, - producerId: Long, - producerEpoch: Short, - leaderEpoch: Int = 0): (Long, Int) => Unit = { - var sequence = 0 - (offset: Long, numRecords: Int) => { - val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, TimestampType.CREATE_TIME, - offset, mockTime.milliseconds(), producerId, producerEpoch, sequence, true, leaderEpoch) - for (seq <- sequence until sequence + numRecords) { - val record = new SimpleRecord(s"$seq".getBytes) - builder.append(record) - } - - sequence += numRecords - builder.close() - } - } - - private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, - producerId: Long, - producerEpoch: Short, - offset: Long, - controlType: ControlRecordType, - coordinatorEpoch: Int = 0, - leaderEpoch: Int = 0): Unit = { - val marker = new EndTransactionMarker(controlType, coordinatorEpoch) - MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker) - } - - private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = { - val builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, offset) - (0 until numRecords).foreach { seq => - builder.append(new SimpleRecord(s"$seq".getBytes)) - } - builder.close() - } - - private def appendAsFollower(log: UnifiedLog, records: MemoryRecords, leaderEpoch: Int): Unit = { - records.batches.forEach(_.setPartitionLeaderEpoch(leaderEpoch)) - log.appendAsFollower(records, leaderEpoch) - } private def createLog(dir: File, config: LogConfig, diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index 57feee817fd47..21ce813ddbbf8 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -5007,12 +5007,4 @@ private void appendNonTransactionalToBuffer(ByteBuffer buffer, long offset, int } builder.close(); } - - private void assertCachedFirstUnstableOffset(UnifiedLog log, long expectedOffset) throws IOException { - assertTrue(log.producerStateManager().firstUnstableOffset().isPresent()); - LogOffsetMetadata firstUnstableOffset = log.producerStateManager().firstUnstableOffset().get(); - assertEquals(expectedOffset, firstUnstableOffset.messageOffset); - assertFalse(firstUnstableOffset.messageOffsetOnly()); - assertValidLogOffsetMetadata(log, firstUnstableOffset); - } }