From 5537c8146f46a583c957098d3436c51e922fc91e Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 1 Feb 2024 12:42:16 +0800 Subject: [PATCH] fix(issues754): fix consume aborted txn Signed-off-by: Robin Han --- .../streamaspect/ElasticLogFileRecords.java | 12 ++++++---- .../log/streamaspect/ElasticLogTest.scala | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index da2acfe537..fe2bb2c7da 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -137,7 +137,7 @@ private CompletableFuture readAll0(FetchContext context, long startOffs return CompletableFuture.completedFuture(null); } return fetch0(context, nextFetchOffset, endOffset, maxSize) - .thenApply(rst -> PooledMemoryRecords.of(rst, context.readOptions().pooledBuf())); + .thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf())); } private CompletableFuture> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize) { @@ -305,13 +305,15 @@ protected RecordBatchIterator batchIterator(long startOffset, long } public static class PooledMemoryRecords extends AbstractRecords implements PooledResource { + private final long logBaseOffset; private final ByteBuf pack; private final MemoryRecords memoryRecords; private final long lastOffset; private final boolean pooled; private boolean freed; - private PooledMemoryRecords(List fetchResults, boolean pooled) { + private PooledMemoryRecords(long logBaseOffset, List fetchResults, boolean pooled) { + this.logBaseOffset = logBaseOffset; this.pooled = pooled; long lastOffset = 0; int size = 0; @@ -335,11 +337,11 @@ private PooledMemoryRecords(List fetchResults, boolean pooled) { fetchResults.forEach(FetchResult::free); fetchResults.clear(); this.memoryRecords = MemoryRecords.readableRecords(pack.nioBuffer()); - this.lastOffset = lastOffset; + this.lastOffset = logBaseOffset + lastOffset; } - public static PooledMemoryRecords of(List fetchResults, boolean pooled) { - return new PooledMemoryRecords(fetchResults, pooled); + public static PooledMemoryRecords of(long logBaseOffset, List fetchResults, boolean pooled) { + return new PooledMemoryRecords(logBaseOffset, fetchResults, pooled); } @Override diff --git a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogTest.scala index b7c1329209..7dbb180b1d 100644 --- a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogTest.scala @@ -660,6 +660,30 @@ class ElasticLogTest { assertThrows(classOf[KafkaException], () => log.roll()) } + @Test + def testAbortTxn_withRoll(): Unit = { + var keyValues = Seq(KeyValue("a=", "1")) + appendRecords(kvsToRecords(keyValues), initialOffset = 0) + keyValues = Seq(KeyValue("a=", "2")) + appendRecords(kvsToRecords(keyValues), initialOffset = 1) + log.roll() + keyValues = Seq(KeyValue("a=", "3")) + appendRecords(kvsToRecords(keyValues), initialOffset = 2) + log.segments.activeSegment.updateTxnIndex(new CompletedTxn(1, 2, 2, true), 1) + keyValues = Seq(KeyValue("a=", "4")) + appendRecords(kvsToRecords(keyValues), initialOffset = 3) + keyValues = Seq(KeyValue("a=", "5")) + appendRecords(kvsToRecords(keyValues), initialOffset = 4) + log.segments.activeSegment.updateTxnIndex(new CompletedTxn(1, 4, 4, true), 4) + ReadHint.markReadAll() + val fetchDataInfo = log.read(2, 1024, true, log.logEndOffsetMetadata, true) + assertEquals(3L, fetchDataInfo.records.records.asScala.size) + val abortedTxns = fetchDataInfo.abortedTransactions.get.toArray + assertEquals(2, abortedTxns.size) + assertEquals(2L, abortedTxns(0).firstOffset) + assertEquals(4L, abortedTxns(1).firstOffset()) + } + private def createElasticLogWithActiveSegment(dir: File = logDir, config: LogConfig, // segments: LogSegments = new LogSegments(topicPartition),