Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.automq.stream.s3.trace.TraceUtils;
import com.automq.stream.utils.FutureUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import com.automq.stream.api.FetchResult;
import com.automq.stream.api.RecordBatchWithContext;
Expand All @@ -34,7 +33,6 @@
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.ConvertedRecords;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.LogInputStream;
import org.apache.kafka.common.record.MemoryRecords;
Expand Down Expand Up @@ -109,6 +107,9 @@ public long appendedOffset() {
}

public CompletableFuture<Records> read(long startOffset, long maxOffset, int maxSize) {
if (startOffset >= maxOffset) {
return CompletableFuture.completedFuture(MemoryRecords.EMPTY);
}
if (ReadHint.isReadAll()) {
ReadOptions readOptions = ReadOptions.builder().fastRead(ReadHint.isFastRead()).pooledBuf(true).build();
FetchContext fetchContext = ContextUtils.creaetFetchContext();
Expand All @@ -134,7 +135,7 @@ private CompletableFuture<Records> readAll0(FetchContext context, long startOffs
long nextFetchOffset = startOffset - baseOffset;
long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset;
if (nextFetchOffset >= endOffset) {
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(MemoryRecords.EMPTY);
}
return fetch0(context, nextFetchOffset, endOffset, maxSize)
.thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf()));
Expand Down Expand Up @@ -512,27 +513,25 @@ private void ensureAllLoaded() throws IOException {
if (sizeInBytes != -1) {
return;
}
Records records = null;
Records records;
try {
records = elasticLogFileRecords.readAll0(FetchContext.DEFAULT, startOffset, maxOffset, fetchSize).get();
} catch (Throwable t) {
throw new IOException(FutureUtil.cause(t));
}
sizeInBytes = 0;
CompositeByteBuf allRecordsBuf = Unpooled.compositeBuffer(Integer.MAX_VALUE);
RecordBatch lastBatch = null;
for (RecordBatch batch : records.batches()) {
sizeInBytes += batch.sizeInBytes();
ByteBuffer buffer = ((DefaultRecordBatch) batch).buffer().duplicate();
allRecordsBuf.addComponent(true, Unpooled.wrappedBuffer(buffer));
lastBatch = batch;
}
if (lastBatch != null) {
lastOffset = lastBatch.lastOffset() + 1;
} else {
sizeInBytes = records.sizeInBytes();
if (records instanceof PooledMemoryRecords) {
memoryRecords = MemoryRecords.readableRecords(((PooledMemoryRecords) records).pack.nioBuffer());
lastOffset = ((PooledMemoryRecords) records).lastOffset();
} else if (records instanceof MemoryRecords) {
memoryRecords = (MemoryRecords) records;
lastOffset = startOffset;
for (RecordBatch batch: records.batches()) {
lastOffset = batch.lastOffset() + 1;
}
} else {
throw new IllegalArgumentException("unknown records type " + records.getClass());
}
memoryRecords = MemoryRecords.readableRecords(allRecordsBuf.nioBuffer());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
maxPosition: Long = size,
maxOffset: Long = Long.MaxValue,
minOneMessage: Boolean = false): CompletableFuture[FetchDataInfo] = {
// TODO: isolate the log clean read to another method
if (maxSize < 0)
return CompletableFuture.failedFuture(new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log"))
// Note that relativePositionInSegment here is a fake value. There are no 'position' in elastic streams.
Expand All @@ -184,7 +185,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
// 'minOneMessage' is also not used because we always read at least one message ('maxSize' is just a hint in ES SDK).
_log.read(startOffset, maxOffset, maxSize)
.thenApply(records => {
if (records == null) {
if (ReadHint.isReadAll() && records.sizeInBytes() == 0) {
// After topic compact, the read request might be out of range. Segment should return null and log will retry read next segment.
null
} else {
Expand Down
80 changes: 2 additions & 78 deletions core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordVersion, Records}
import org.apache.kafka.common.utils.{ThreadUtils, Time, Utils}
import org.apache.kafka.common.record.{MemoryRecords, RecordVersion}
import org.apache.kafka.common.utils.ThreadUtils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion

import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{CompletableFuture, Executors}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}

class ElasticUnifiedLog(_logStartOffset: Long,
Expand Down Expand Up @@ -184,52 +182,6 @@ object ElasticUnifiedLog extends Logging {
private val MaxCheckpointIntervalBytes = 50 * 1024 * 1024
private val MinCheckpointIntervalMs = 10 * 1000

def rebuildProducerState(producerStateManager: ProducerStateManager,
segments: LogSegments,
logStartOffset: Long,
lastOffset: Long,
time: Time,
reloadFromCleanShutdown: Boolean,
logPrefix: String): Unit = {
val offsetsToSnapshot = {
if (segments.nonEmpty) {
val lastSegmentBaseOffset = segments.lastSegment.get.baseOffset
val nextLatestSegmentBaseOffset = segments.lowerSegment(lastSegmentBaseOffset).map(_.baseOffset)
Seq(nextLatestSegmentBaseOffset, Some(lastSegmentBaseOffset), Some(lastOffset))
} else {
Seq(Some(lastOffset))
}
}

info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
val producerStateLoadStart = time.milliseconds()
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
val segmentRecoveryStart = time.milliseconds()

if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
segments.values(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
producerStateManager.updateMapEndOffset(startOffset)

if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
producerStateManager.takeSnapshot()

val maxPosition = segment.size

val fetchDataInfo = segment.read(startOffset,
maxSize = Int.MaxValue,
maxPosition = maxPosition)
if (fetchDataInfo != null)
loadProducersFromRecords(producerStateManager, fetchDataInfo.records)
}
}
producerStateManager.updateMapEndOffset(lastOffset)
producerStateManager.takeSnapshot()
info(s"${logPrefix}Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset")
}

/**
* If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache.
* Otherwise, the message format is considered incompatible and return None.
Expand All @@ -251,32 +203,4 @@ object ElasticUnifiedLog extends Logging {
Some(newLeaderEpochFileCache())
}
}

private def loadProducersFromRecords(producerStateManager: ProducerStateManager, records: Records): Unit = {
val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
records.batches.forEach { batch =>
if (batch.hasProducerId) {
val maybeCompletedTxn = updateProducers(
producerStateManager,
batch,
loadedProducers,
firstOffsetMetadata = None,
origin = AppendOrigin.Replication)
maybeCompletedTxn.foreach(completedTxns += _)
}
}
loadedProducers.values.foreach(producerStateManager.update)
completedTxns.foreach(producerStateManager.completeTxn)
}

private def updateProducers(producerStateManager: ProducerStateManager,
batch: RecordBatch,
producers: mutable.Map[Long, ProducerAppendInfo],
firstOffsetMetadata: Option[LogOffsetMetadata],
origin: AppendOrigin): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
appendInfo.append(batch, firstOffsetMetadata)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
import kafka.log.streamaspect.client.Context

Expand Down Expand Up @@ -86,6 +86,7 @@ class ElasticLogSegmentTest {

@BeforeEach
def setup(): Unit = {
ReadHint.markReadAll()
segments.clear()
logDir = TestUtils.tempDir()
Context.enableTestMode()
Expand All @@ -107,7 +108,7 @@ class ElasticLogSegmentTest {
def testReadOnEmptySegment(): Unit = {
val seg = getOrCreateSegment(40)
val read = seg.read(startOffset = 40, maxSize = 300)
assertFalse(read.records.records().iterator().hasNext)
assertNull(read, "Read beyond the last offset in the segment should be null")
}

/**
Expand All @@ -131,8 +132,8 @@ class ElasticLogSegmentTest {
val seg = getOrCreateSegment(40)
val ms = records(40, "hello", "there")
seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, ms)
val read = seg.read(startOffset = 52, maxSize = 200).records
assertFalse(read.records().iterator().hasNext, "Read beyond the last offset in the segment should give no data")
val read = seg.read(startOffset = 52, maxSize = 200)
assertNull(read, "Read beyond the last offset in the segment should give null")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ElasticUnifiedLogTest {

@BeforeEach
def setUp(): Unit = {
ReadHint.markReadAll()
val props = TestUtils.createSimpleEsBrokerConfig()
config = KafkaConfig.fromProps(props)
Context.enableTestMode()
Expand Down Expand Up @@ -2624,8 +2625,12 @@ class ElasticUnifiedLogTest {
minOneMessage = false)

// We do not check relativePositionInSegment because it is a fake value.
assertEquals(offsetMetadata.segmentBaseOffset, readInfo.fetchOffsetMetadata.segmentBaseOffset)
assertEquals(offsetMetadata.messageOffset, readInfo.fetchOffsetMetadata.messageOffset)
if (offsetMetadata.relativePositionInSegment < segment.size) {
assertEquals(offsetMetadata.segmentBaseOffset, readInfo.fetchOffsetMetadata.segmentBaseOffset)
assertEquals(offsetMetadata.messageOffset, readInfo.fetchOffsetMetadata.messageOffset)
} else {
assertNull(readInfo)
}
}

@Test
Expand Down