From eaddf121a31e7eb02fa68e818b707b2f889c7dba Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 1 Feb 2024 15:45:15 +0800 Subject: [PATCH] fix(log): keep log the same pattern as kafka Signed-off-by: Robin Han --- .../streamaspect/ElasticLogFileRecords.java | 33 ++++---- .../log/streamaspect/ElasticLogSegment.scala | 3 +- .../log/streamaspect/ElasticUnifiedLog.scala | 80 +------------------ .../streamaspect/ElasticLogSegmentTest.scala | 9 ++- .../streamaspect/ElasticUnifiedLogTest.scala | 9 ++- 5 files changed, 32 insertions(+), 102 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index fe2bb2c7da..1d8de941bd 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -24,7 +24,6 @@ import com.automq.stream.s3.trace.TraceUtils; import com.automq.stream.utils.FutureUtil; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import com.automq.stream.api.FetchResult; import com.automq.stream.api.RecordBatchWithContext; @@ -34,7 +33,6 @@ import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.ConvertedRecords; -import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.LogInputStream; import org.apache.kafka.common.record.MemoryRecords; @@ -109,6 +107,9 @@ public long appendedOffset() { } public CompletableFuture read(long startOffset, long maxOffset, int maxSize) { + if (startOffset >= maxOffset) { + return CompletableFuture.completedFuture(MemoryRecords.EMPTY); + } if (ReadHint.isReadAll()) { ReadOptions readOptions = ReadOptions.builder().fastRead(ReadHint.isFastRead()).pooledBuf(true).build(); FetchContext fetchContext = ContextUtils.creaetFetchContext(); @@ -134,7 +135,7 @@ private CompletableFuture readAll0(FetchContext context, long startOffs long nextFetchOffset = startOffset - baseOffset; long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset; if (nextFetchOffset >= endOffset) { - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(MemoryRecords.EMPTY); } return fetch0(context, nextFetchOffset, endOffset, maxSize) .thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf())); @@ -512,27 +513,25 @@ private void ensureAllLoaded() throws IOException { if (sizeInBytes != -1) { return; } - Records records = null; + Records records; try { records = elasticLogFileRecords.readAll0(FetchContext.DEFAULT, startOffset, maxOffset, fetchSize).get(); } catch (Throwable t) { throw new IOException(FutureUtil.cause(t)); } - sizeInBytes = 0; - CompositeByteBuf allRecordsBuf = Unpooled.compositeBuffer(Integer.MAX_VALUE); - RecordBatch lastBatch = null; - for (RecordBatch batch : records.batches()) { - sizeInBytes += batch.sizeInBytes(); - ByteBuffer buffer = ((DefaultRecordBatch) batch).buffer().duplicate(); - allRecordsBuf.addComponent(true, Unpooled.wrappedBuffer(buffer)); - lastBatch = batch; - } - if (lastBatch != null) { - lastOffset = lastBatch.lastOffset() + 1; - } else { + sizeInBytes = records.sizeInBytes(); + if (records instanceof PooledMemoryRecords) { + memoryRecords = MemoryRecords.readableRecords(((PooledMemoryRecords) records).pack.nioBuffer()); + lastOffset = ((PooledMemoryRecords) records).lastOffset(); + } else if (records instanceof MemoryRecords) { + memoryRecords = (MemoryRecords) records; lastOffset = startOffset; + for (RecordBatch batch: records.batches()) { + lastOffset = batch.lastOffset() + 1; + } + } else { + throw new IllegalArgumentException("unknown records type " + records.getClass()); } - memoryRecords = MemoryRecords.readableRecords(allRecordsBuf.nioBuffer()); } } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala index 910a7dfe9f..c44ae5660f 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala @@ -173,6 +173,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, maxPosition: Long = size, maxOffset: Long = Long.MaxValue, minOneMessage: Boolean = false): CompletableFuture[FetchDataInfo] = { + // TODO: isolate the log clean read to another method if (maxSize < 0) return CompletableFuture.failedFuture(new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")) // Note that relativePositionInSegment here is a fake value. There are no 'position' in elastic streams. @@ -184,7 +185,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, // 'minOneMessage' is also not used because we always read at least one message ('maxSize' is just a hint in ES SDK). _log.read(startOffset, maxOffset, maxSize) .thenApply(records => { - if (records == null) { + if (ReadHint.isReadAll() && records.sizeInBytes() == 0) { // After topic compact, the read request might be out of range. Segment should return null and log will retry read next segment. null } else { diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala index 480f50220c..d8df9e6d41 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala @@ -23,16 +23,14 @@ import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata, RequestLocal} import kafka.utils.Logging import org.apache.kafka.common.errors.OffsetOutOfRangeException -import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordVersion, Records} -import org.apache.kafka.common.utils.{ThreadUtils, Time, Utils} +import org.apache.kafka.common.record.{MemoryRecords, RecordVersion} +import org.apache.kafka.common.utils.ThreadUtils import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.server.common.MetadataVersion import java.nio.ByteBuffer import java.util import java.util.concurrent.{CompletableFuture, Executors} -import scala.collection.mutable -import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} class ElasticUnifiedLog(_logStartOffset: Long, @@ -184,52 +182,6 @@ object ElasticUnifiedLog extends Logging { private val MaxCheckpointIntervalBytes = 50 * 1024 * 1024 private val MinCheckpointIntervalMs = 10 * 1000 - def rebuildProducerState(producerStateManager: ProducerStateManager, - segments: LogSegments, - logStartOffset: Long, - lastOffset: Long, - time: Time, - reloadFromCleanShutdown: Boolean, - logPrefix: String): Unit = { - val offsetsToSnapshot = { - if (segments.nonEmpty) { - val lastSegmentBaseOffset = segments.lastSegment.get.baseOffset - val nextLatestSegmentBaseOffset = segments.lowerSegment(lastSegmentBaseOffset).map(_.baseOffset) - Seq(nextLatestSegmentBaseOffset, Some(lastSegmentBaseOffset), Some(lastOffset)) - } else { - Seq(Some(lastOffset)) - } - } - - info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset") - val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset - val producerStateLoadStart = time.milliseconds() - producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) - val segmentRecoveryStart = time.milliseconds() - - if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) { - segments.values(producerStateManager.mapEndOffset, lastOffset).foreach { segment => - val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) - producerStateManager.updateMapEndOffset(startOffset) - - if (offsetsToSnapshot.contains(Some(segment.baseOffset))) - producerStateManager.takeSnapshot() - - val maxPosition = segment.size - - val fetchDataInfo = segment.read(startOffset, - maxSize = Int.MaxValue, - maxPosition = maxPosition) - if (fetchDataInfo != null) - loadProducersFromRecords(producerStateManager, fetchDataInfo.records) - } - } - producerStateManager.updateMapEndOffset(lastOffset) - producerStateManager.takeSnapshot() - info(s"${logPrefix}Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " + - s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset") - } - /** * If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache. * Otherwise, the message format is considered incompatible and return None. @@ -251,32 +203,4 @@ object ElasticUnifiedLog extends Logging { Some(newLeaderEpochFileCache()) } } - - private def loadProducersFromRecords(producerStateManager: ProducerStateManager, records: Records): Unit = { - val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] - val completedTxns = ListBuffer.empty[CompletedTxn] - records.batches.forEach { batch => - if (batch.hasProducerId) { - val maybeCompletedTxn = updateProducers( - producerStateManager, - batch, - loadedProducers, - firstOffsetMetadata = None, - origin = AppendOrigin.Replication) - maybeCompletedTxn.foreach(completedTxns += _) - } - } - loadedProducers.values.foreach(producerStateManager.update) - completedTxns.foreach(producerStateManager.completeTxn) - } - - private def updateProducers(producerStateManager: ProducerStateManager, - batch: RecordBatch, - producers: mutable.Map[Long, ProducerAppendInfo], - firstOffsetMetadata: Option[LogOffsetMetadata], - origin: AppendOrigin): Option[CompletedTxn] = { - val producerId = batch.producerId - val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin)) - appendInfo.append(batch, firstOffsetMetadata) - } } diff --git a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala index 842975859d..a4d6af6fd9 100644 --- a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala @@ -26,7 +26,7 @@ import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.{Time, Utils} -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull} import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} import kafka.log.streamaspect.client.Context @@ -86,6 +86,7 @@ class ElasticLogSegmentTest { @BeforeEach def setup(): Unit = { + ReadHint.markReadAll() segments.clear() logDir = TestUtils.tempDir() Context.enableTestMode() @@ -107,7 +108,7 @@ class ElasticLogSegmentTest { def testReadOnEmptySegment(): Unit = { val seg = getOrCreateSegment(40) val read = seg.read(startOffset = 40, maxSize = 300) - assertFalse(read.records.records().iterator().hasNext) + assertNull(read, "Read beyond the last offset in the segment should be null") } /** @@ -131,8 +132,8 @@ class ElasticLogSegmentTest { val seg = getOrCreateSegment(40) val ms = records(40, "hello", "there") seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, ms) - val read = seg.read(startOffset = 52, maxSize = 200).records - assertFalse(read.records().iterator().hasNext, "Read beyond the last offset in the segment should give no data") + val read = seg.read(startOffset = 52, maxSize = 200) + assertNull(read, "Read beyond the last offset in the segment should give null") } /** diff --git a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala index 418063955e..0036470837 100755 --- a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala @@ -61,6 +61,7 @@ class ElasticUnifiedLogTest { @BeforeEach def setUp(): Unit = { + ReadHint.markReadAll() val props = TestUtils.createSimpleEsBrokerConfig() config = KafkaConfig.fromProps(props) Context.enableTestMode() @@ -2624,8 +2625,12 @@ class ElasticUnifiedLogTest { minOneMessage = false) // We do not check relativePositionInSegment because it is a fake value. - assertEquals(offsetMetadata.segmentBaseOffset, readInfo.fetchOffsetMetadata.segmentBaseOffset) - assertEquals(offsetMetadata.messageOffset, readInfo.fetchOffsetMetadata.messageOffset) + if (offsetMetadata.relativePositionInSegment < segment.size) { + assertEquals(offsetMetadata.segmentBaseOffset, readInfo.fetchOffsetMetadata.segmentBaseOffset) + assertEquals(offsetMetadata.messageOffset, readInfo.fetchOffsetMetadata.messageOffset) + } else { + assertNull(readInfo) + } } @Test