Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package kafka.log

import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords}

import java.io.{File, IOException}
import java.nio.file.Files
import java.text.NumberFormat
Expand All @@ -31,7 +33,6 @@ import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.log.streamaspect.ElasticLogManager
import kafka.log.streamaspect.ElasticLogFileRecords.BatchIteratorRecordsAdaptor

import java.util.concurrent.{CompletableFuture, ExecutionException}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -435,9 +436,11 @@ class LocalLog(@volatile protected var _dir: File,
if (fetchDataInfo != null) {
if (includeAbortedTxns) {
// AutoMQ for Kafka inject start
val upperBoundOpt = fetchDataInfo.records match {
case adaptor: BatchIteratorRecordsAdaptor =>
Some(adaptor.lastOffset())
val upperBoundOpt = fetchDataInfo.records match {
case records: PooledMemoryRecords =>
Some(records.lastOffset())
case adapter: BatchIteratorRecordsAdaptor =>
Some(adapter.lastOffset())
case _ =>
None
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.log.streamaspect
import com.automq.stream.api.{Client, CreateStreamOptions, KeyValue, OpenStreamOptions}
import io.netty.buffer.Unpooled
import kafka.log._
import kafka.log.streamaspect.ElasticLogFileRecords.BatchIteratorRecordsAdaptor
import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords}
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil}
import kafka.server.checkpoints.LeaderEpochCheckpointFile
import kafka.server.epoch.EpochEntry
Expand Down Expand Up @@ -345,8 +345,10 @@ class ElasticLog(val metaStream: MetaStream,
} else {
if (includeAbortedTxns) {
val upperBoundOpt = fetchDataInfo.records match {
case adaptor: BatchIteratorRecordsAdaptor =>
Some(adaptor.lastOffset())
case records: PooledMemoryRecords =>
Some(records.lastOffset())
case adapter: BatchIteratorRecordsAdaptor =>
Some(adapter.lastOffset())
case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public long appendedOffset() {
}

public CompletableFuture<Records> read(long startOffset, long maxOffset, int maxSize) {
if (ReadManualReleaseHint.isMarked()) {
if (ReadAllHint.isMarked()) {
return readAll0(startOffset, maxOffset, maxSize);
} else {
return CompletableFuture.completedFuture(new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize));
Expand Down Expand Up @@ -276,16 +276,20 @@ protected RecordBatchIterator<RecordBatch> batchIterator(long startOffset, long
public static class PooledMemoryRecords extends AbstractRecords implements PooledResource {
private final List<FetchResult> fetchResults;
private final MemoryRecords memoryRecords;
private final long lastOffset;

private PooledMemoryRecords(List<FetchResult> fetchResults) {
this.fetchResults = fetchResults;
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
long lastOffset = 0;
for (FetchResult fetchResult : fetchResults) {
for (RecordBatchWithContext recordBatchWithContext : fetchResult.recordBatchList()) {
compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(recordBatchWithContext.rawPayload()));
lastOffset = recordBatchWithContext.lastOffset();
}
}
this.memoryRecords = MemoryRecords.readableRecords(compositeByteBuf.nioBuffer());
this.lastOffset = lastOffset;
}

public static PooledMemoryRecords of(List<FetchResult> fetchResults) {
Expand Down Expand Up @@ -322,6 +326,10 @@ public void release() {
fetchResults.forEach(FetchResult::free);
fetchResults.clear();
}

public long lastOffset() {
return lastOffset;
}
}

static class StreamSegmentInputStream implements LogInputStream<RecordBatch> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@

import java.util.concurrent.atomic.AtomicBoolean;

public class ReadManualReleaseHint {
public class ReadAllHint {

public static final FastThreadLocal<AtomicBoolean> MANUAL_RELEASE = new FastThreadLocal<AtomicBoolean>() {
public static final FastThreadLocal<AtomicBoolean> HINT = new FastThreadLocal<AtomicBoolean>() {
@Override
protected AtomicBoolean initialValue() {
return new AtomicBoolean(false);
}
};

public static boolean isMarked() {
return MANUAL_RELEASE.get().get();
return HINT.get().get();
}

public static void mark() {
MANUAL_RELEASE.get().set(true);
HINT.get().set(true);
}

public static void reset() {
MANUAL_RELEASE.get().set(false);
HINT.get().set(false);
}

}
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.server

import kafka.log.streamaspect.ReadManualReleaseHint
import kafka.log.streamaspect.ReadAllHint
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicIdPartition
Expand Down Expand Up @@ -163,14 +163,14 @@ class DelayedFetch(
}

// AutoMQ for Kafka inject start
ReadManualReleaseHint.mark()
val logReadResults = replicaManager.readFromLocalLog(
ReadAllHint.mark()
val logReadResults = replicaManager.readAsyncFromLocalLog(
params,
fetchInfos,
quota,
readFromPurgatory = true
)
ReadManualReleaseHint.reset()
ReadAllHint.reset()
// AutoMQ for Kafka inject end

val fetchPartitionData = logReadResults.map { case (tp, result) =>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.controller.ReplicaAssignment
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.AppendOrigin
import kafka.log.streamaspect.{ElasticLogManager, ReadManualReleaseHint}
import kafka.log.streamaspect.{ElasticLogManager, ReadAllHint}
import kafka.message.ZStdCompressionCodec
import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsUtil}
import kafka.network.RequestChannel
Expand Down Expand Up @@ -1093,9 +1093,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// The fetching is done is a separate thread pool to avoid blocking io thread.
fetchingExecutors.submit(new Runnable {
override def run(): Unit = {
ReadManualReleaseHint.mark()
ReadAllHint.mark()
doFetchingRecords()
ReadManualReleaseHint.reset()
ReadAllHint.reset()
}
})
} else {
Expand Down