Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private CompletableFuture<Records> readAll0(FetchContext context, long startOffs
return CompletableFuture.completedFuture(null);
}
return fetch0(context, nextFetchOffset, endOffset, maxSize)
.thenApply(rst -> PooledMemoryRecords.of(rst, context.readOptions().pooledBuf()));
.thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf()));
}

private CompletableFuture<LinkedList<FetchResult>> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize) {
Expand Down Expand Up @@ -305,13 +305,15 @@ protected RecordBatchIterator<RecordBatch> batchIterator(long startOffset, long
}

public static class PooledMemoryRecords extends AbstractRecords implements PooledResource {
private final long logBaseOffset;
private final ByteBuf pack;
private final MemoryRecords memoryRecords;
private final long lastOffset;
private final boolean pooled;
private boolean freed;

private PooledMemoryRecords(List<FetchResult> fetchResults, boolean pooled) {
private PooledMemoryRecords(long logBaseOffset, List<FetchResult> fetchResults, boolean pooled) {
this.logBaseOffset = logBaseOffset;
this.pooled = pooled;
long lastOffset = 0;
int size = 0;
Expand All @@ -335,11 +337,11 @@ private PooledMemoryRecords(List<FetchResult> fetchResults, boolean pooled) {
fetchResults.forEach(FetchResult::free);
fetchResults.clear();
this.memoryRecords = MemoryRecords.readableRecords(pack.nioBuffer());
this.lastOffset = lastOffset;
this.lastOffset = logBaseOffset + lastOffset;
}

public static PooledMemoryRecords of(List<FetchResult> fetchResults, boolean pooled) {
return new PooledMemoryRecords(fetchResults, pooled);
public static PooledMemoryRecords of(long logBaseOffset, List<FetchResult> fetchResults, boolean pooled) {
return new PooledMemoryRecords(logBaseOffset, fetchResults, pooled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,30 @@ class ElasticLogTest {
assertThrows(classOf[KafkaException], () => log.roll())
}

@Test
def testAbortTxn_withRoll(): Unit = {
var keyValues = Seq(KeyValue("a=", "1"))
appendRecords(kvsToRecords(keyValues), initialOffset = 0)
keyValues = Seq(KeyValue("a=", "2"))
appendRecords(kvsToRecords(keyValues), initialOffset = 1)
log.roll()
keyValues = Seq(KeyValue("a=", "3"))
appendRecords(kvsToRecords(keyValues), initialOffset = 2)
log.segments.activeSegment.updateTxnIndex(new CompletedTxn(1, 2, 2, true), 1)
keyValues = Seq(KeyValue("a=", "4"))
appendRecords(kvsToRecords(keyValues), initialOffset = 3)
keyValues = Seq(KeyValue("a=", "5"))
appendRecords(kvsToRecords(keyValues), initialOffset = 4)
log.segments.activeSegment.updateTxnIndex(new CompletedTxn(1, 4, 4, true), 4)
ReadHint.markReadAll()
val fetchDataInfo = log.read(2, 1024, true, log.logEndOffsetMetadata, true)
assertEquals(3L, fetchDataInfo.records.records.asScala.size)
val abortedTxns = fetchDataInfo.abortedTransactions.get.toArray
assertEquals(2, abortedTxns.size)
assertEquals(2L, abortedTxns(0).firstOffset)
assertEquals(4L, abortedTxns(1).firstOffset())
}

private def createElasticLogWithActiveSegment(dir: File = logDir,
config: LogConfig,
// segments: LogSegments = new LogSegments(topicPartition),
Expand Down