diff --git a/build.gradle b/build.gradle index ca885fdfc9..99b4a70623 100644 --- a/build.gradle +++ b/build.gradle @@ -225,7 +225,9 @@ if (file('.git').exists()) { 'licenses/*', '**/generated/**', 'clients/src/test/resources/serializedData/*', - '.github/**' + '.github/**', + 'CODE_OF_CONDUCT.md', + 'CONTRIBUTING_GUIDE.md', ]) } } else { diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 24b0e158e9..7f59be47e3 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -566,10 +566,10 @@ class ProducerStateManager( @volatile private var oldestTxnLastTimestamp: Long = -1L // ongoing transactions sorted by the first offset of the transaction - private val ongoingTxns = new util.TreeMap[Long, TxnMetadata] + protected val ongoingTxns = new util.TreeMap[Long, TxnMetadata] // completed transactions whose markers are at offsets above the high watermark - private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + protected val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] @threadsafe def hasLateTransaction(currentTimeMs: Long): Boolean = { @@ -608,7 +608,7 @@ class ProducerStateManager( case Some(prev) => if (!baseOffsets.contains(key)) { // this snapshot is now the largest stray snapshot. - prev.deleteIfExists() + deleteSnapshotFile(prev) ss.remove(prev.offset) latestStraySnapshot = Some(snapshot) } @@ -623,7 +623,7 @@ class ProducerStateManager( // delete the largestStraySnapshot. for (strayOffset <- latestStraySnapshot.map(_.offset); maxOffset <- maxSegmentBaseOffset) { if (strayOffset < maxOffset) { - Option(ss.remove(strayOffset)).foreach(_.deleteIfExists()) + Option(ss.remove(strayOffset)).foreach(s => deleteSnapshotFile(s)) } } @@ -663,6 +663,10 @@ class ProducerStateManager( */ private[log] def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset) + def deleteSnapshotFile(snapshotFile: SnapshotFile): Unit = { + snapshotFile.deleteIfExists() + } + /** * Returns the last offset of this map */ @@ -809,7 +813,9 @@ class ProducerStateManager( val snapshotFile = SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset)) val start = time.hiResClockMs() writeSnapshot(snapshotFile.file, producers) - info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.") + if (isDebugEnabled) { + debug(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.") + } snapshots.put(snapshotFile.offset, snapshotFile) @@ -926,7 +932,7 @@ class ProducerStateManager( * ProducerStateManager, and deletes the backing snapshot file. */ protected def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { - Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists()) + Option(snapshots.remove(snapshotOffset)).foreach(s => deleteSnapshotFile(s)) } /** diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 17d5f66031..a9b30e6d1c 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -2196,7 +2196,7 @@ object UnifiedLog extends Logging { handlingDir, s"Error while deleting producer state snapshots for $topicPartition in dir $handlingDir") { snapshotsToDelete.foreach { snapshot => - snapshot.deleteIfExists() + producerStateManager.deleteSnapshotFile(snapshot) } } // AutoMQ for Kafka inject end diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index db1b03c72d..57b111d4e5 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -36,7 +36,6 @@ import java.io.File import java.nio.ByteBuffer import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} -import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.util.control.Breaks.{break, breakable} @@ -140,7 +139,9 @@ class ElasticLog(val metaStream: MetaStream, } partitionMeta.setCleanerOffset(offsetCheckpoint) persistPartitionMeta() - info(s"saved cleanerOffsetCheckpoint: $offsetCheckpoint") + if (isDebugEnabled) { + debug(s"saved cleanerOffsetCheckpoint: $offsetCheckpoint") + } } def persistRecoverOffsetCheckpoint(): Unit = { @@ -149,7 +150,9 @@ class ElasticLog(val metaStream: MetaStream, } partitionMeta.setRecoverOffset(recoveryPoint) persistPartitionMeta() - info(s"saved recoverOffsetCheckpoint: $recoveryPoint") + if (isDebugEnabled) { + debug(s"saved recoverOffsetCheckpoint: $recoveryPoint") + } } def saveLeaderEpochCheckpoint(meta: ElasticLeaderEpochCheckpointMeta): Unit = { @@ -170,7 +173,9 @@ class ElasticLog(val metaStream: MetaStream, private def persistPartitionMeta(): Unit = { persistMeta(metaStream, MetaKeyValue.of(MetaStream.PARTITION_META_KEY, ElasticPartitionMeta.encode(partitionMeta))) - info(s"${logIdent}save partition meta $partitionMeta") + if (isDebugEnabled) { + debug(s"${logIdent}save partition meta $partitionMeta") + } } override private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { @@ -187,6 +192,7 @@ class ElasticLog(val metaStream: MetaStream, activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp, shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records) + APPEND_TIME_HIST.update(System.nanoTime() - startTimestamp) val endOffset = lastOffset + 1 updateLogEndOffset(endOffset) @@ -490,39 +496,17 @@ object ElasticLog extends Logging { } info(s"${logIdent}loaded partition meta: $partitionMeta") - def loadAllValidSnapshots(): mutable.Map[Long, ElasticPartitionProducerSnapshotMeta] = { - metaMap.filter(kv => kv._1.startsWith(MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX)) - .map(kv => (kv._1.stripPrefix(MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX).toLong, kv._2.asInstanceOf[ElasticPartitionProducerSnapshotMeta])) - } - //load producer snapshots for this partition - val producerSnapshotsMetaOpt = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta]) - val (producerSnapshotsMeta, snapshotsMap) = if (producerSnapshotsMetaOpt.isEmpty) { - // No need to persist if not exists - (new ElasticPartitionProducerSnapshotsMeta(), new mutable.HashMap[Long, ElasticPartitionProducerSnapshotMeta]()) - } else { - (producerSnapshotsMetaOpt.get, loadAllValidSnapshots()) - } - if (snapshotsMap.nonEmpty) { - info(s"${logIdent}loaded ${snapshotsMap.size} producer snapshots, offsets(filenames) are ${snapshotsMap.keys} ") + val producerSnapshotsMeta = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta]).getOrElse(new ElasticPartitionProducerSnapshotsMeta()) + val snapshotsMap = producerSnapshotsMeta.getSnapshots + if (!snapshotsMap.isEmpty) { + info(s"${logIdent}loaded ${snapshotsMap.size} producer snapshots, offsets(filenames) are ${snapshotsMap.keySet()} ") } else { info(s"${logIdent}loaded no producer snapshots") } - def persistProducerSnapshotMeta(meta: ElasticPartitionProducerSnapshotMeta): Unit = { - val key = MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX + meta.getOffset - if (meta.isEmpty) { - // The real deletion of this snapshot is done in metaStream's compaction. - producerSnapshotsMeta.remove(meta.getOffset) - } else { - producerSnapshotsMeta.add(meta.getOffset) - persistMeta(metaStream, MetaKeyValue.of(key, meta.encode())) - } - persistMeta(metaStream, MetaKeyValue.of(MetaStream.PRODUCER_SNAPSHOTS_META_KEY, producerSnapshotsMeta.encode())) - } - val producerStateManager = ElasticProducerStateManager(topicPartition, dir, - maxTransactionTimeoutMs, producerStateManagerConfig, time, snapshotsMap, persistProducerSnapshotMeta) + maxTransactionTimeoutMs, producerStateManagerConfig, time, snapshotsMap, kv => metaStream.append(kv).thenApply(_ => null)) val logMeta: ElasticLogMeta = metaMap.get(MetaStream.LOG_META_KEY).map(m => m.asInstanceOf[ElasticLogMeta]).getOrElse(new ElasticLogMeta()) logStreamManager = new ElasticLogStreamManager(logMeta.getStreamMap, client.streamClient(), config.replicationFactor, leaderEpoch) @@ -686,7 +670,7 @@ object ElasticLog extends Logging { meta.baseOffset(baseOffset) meta.streamSuffix(suffix) meta.createTimestamp(time.milliseconds()) - val segment: ElasticLogSegment = ElasticLogSegment(dir, meta, streamSliceManager, config, time, logSegmentManager.logSegmentEventListener()) + val segment: ElasticLogSegment = ElasticLogSegment(dir, meta, streamSliceManager, config, time, logSegmentManager.logSegmentEventListener(), logIdent = logIdent) var metaSaveCf: CompletableFuture[Void] = CompletableFuture.completedFuture(null) if (suffix.equals("")) { metaSaveCf = logSegmentManager.create(baseOffset, segment) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogLoader.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogLoader.scala index c37c2506ae..46ffa0a922 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogLoader.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogLoader.scala @@ -47,7 +47,7 @@ class ElasticLogLoader(logMeta: ElasticLogMeta, numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], createAndSaveSegmentFunc: (Long, File, LogConfig, ElasticStreamSliceManager, Time) => (ElasticLogSegment, CompletableFuture[Void])) extends Logging { - logIdent = s"[ElasticLogLoader partition=$topicPartition, dir=${dir.getParent}] " + logIdent = s"[ElasticLogLoader partition=$topicPartition] " /** * Load the log segments from the log files on disk, and returns the components of the loaded log. @@ -68,20 +68,14 @@ class ElasticLogLoader(logMeta: ElasticLogMeta, // load all segments loadSegments() + // make sure the producer state manager endOffset is less than or equal to the recoveryPointCheckpoint + producerStateManager.truncateAndReload(logStartOffsetCheckpoint, recoveryPointCheckpoint, time.milliseconds()) val (newRecoveryPoint: Long, nextOffset: Long) = { recoverLog() } val newLogStartOffset = math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset) - // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here - // from scratch. - if (!producerStateManager.isEmpty) { - throw new IllegalStateException("Producer state must be empty during log initialization") - } - - producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq) - ElasticUnifiedLog.rebuildProducerState(producerStateManager, segments, newLogStartOffset, nextOffset, time, reloadFromCleanShutdown = false, logIdent) val activeSegment = segments.lastSegment.get LoadedLogOffsets( newLogStartOffset, @@ -96,7 +90,7 @@ class ElasticLogLoader(logMeta: ElasticLogMeta, private def loadSegments(): Unit = { logMeta.getSegmentMetas.forEach(segmentMeta => { - val segment = ElasticLogSegment(dir, segmentMeta, streamSliceManager, config, time, logSegmentsManager.logSegmentEventListener()) + val segment = ElasticLogSegment(dir, segmentMeta, streamSliceManager, config, time, logSegmentsManager.logSegmentEventListener(), logIdent = logIdent) segments.add(segment) logSegmentsManager.put(segment.baseOffset, segment) }) @@ -110,22 +104,6 @@ class ElasticLogLoader(logMeta: ElasticLogMeta, * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow */ private def recoverSegment(segment: LogSegment): Int = { - val producerStateManager = ElasticProducerStateManager( - topicPartition, - dir, - this.producerStateManager.maxTransactionTimeoutMs, - this.producerStateManager.producerStateManagerConfig, - time, - this.producerStateManager.snapshotsMap, - this.producerStateManager.persistFun) - ElasticUnifiedLog.rebuildProducerState( - producerStateManager, - segments, - logStartOffsetCheckpoint, - segment.baseOffset, - time, - reloadFromCleanShutdown = false, - logIdent) val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache) // once we have recovered the segment's data, take a snapshot to ensure that we won't // need to reload the same segment again while recovering another segment. @@ -168,7 +146,6 @@ class ElasticLogLoader(logMeta: ElasticLogMeta, var numFlushed = 0 val threadName = Thread.currentThread().getName numRemainingSegments.put(threadName, numUnflushed) - while (unflushedIter.hasNext && !truncated) { val segment = unflushedIter.next() info(s"Recovering unflushed segment ${segment.baseOffset}. $numFlushed/$numUnflushed recovered for $topicPartition.") diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala index 78f66770ac..466a8b8072 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala @@ -41,7 +41,10 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, val indexIntervalBytes: Int, val rollJitterMs: Long, val time: Time, - val logListener: ElasticLogSegmentEventListener) extends LogSegment with Comparable[ElasticLogSegment]{ + val logListener: ElasticLogSegmentEventListener, + _logIdent: String = "") extends LogSegment with Comparable[ElasticLogSegment]{ + + logIdent = _logIdent def log: FileRecords = throw new UnsupportedOperationException() @@ -135,7 +138,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, } } - protected def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = { + protected def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch, txnIndexCheckpoint: Option[Long]): Unit = { if (batch.hasProducerId) { val producerId = batch.producerId val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.Replication) @@ -143,7 +146,9 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) - updateTxnIndex(completedTxn, lastStableOffset) + if (txnIndexCheckpoint.isEmpty || txnIndexCheckpoint.get < completedTxn.lastOffset) { + updateTxnIndex(completedTxn, lastStableOffset) + } producerStateManager.completeTxn(completedTxn) } } @@ -195,10 +200,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, @nonthreadsafe override def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = { - timeIndex.reset() - txnIndex.reset() logListener.onEvent(baseOffset, ElasticLogSegmentEvent.SEGMENT_UPDATE) - recover0(producerStateManager, leaderEpochCache) } @@ -207,8 +209,14 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, var validBytes = 0 var lastIndexEntry = 0 maxTimestampAndOffsetSoFar = TimestampOffset.Unknown + // exclusive recover from the checkpoint cause the offset the offset of record in batch + val timeIndexCheckpoint = timeIndex.asInstanceOf[ElasticTimeIndex].loadLastEntry().offset + // exclusive recover from the checkpoint + val txnIndexCheckpoint = txnIndex.loadLastOffset() try { - for (batch <- _log.batches.asScala) { + val recoverPoint = math.max(producerStateManager.mapEndOffset, baseOffset) + info(s"[UNCLEAN_SHUTDOWN] recover range [$recoverPoint, ${_log.nextOffset()})") + for (batch <- _log.batchesFrom(recoverPoint).asScala) { batch.ensureValid() // The max timestamp is exposed at the batch level, so no need to iterate the records if (batch.maxTimestamp > maxTimestampSoFar) { @@ -216,7 +224,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, } // Build offset index - if (validBytes - lastIndexEntry > indexIntervalBytes) { + if (validBytes - lastIndexEntry > indexIntervalBytes && batch.baseOffset() > timeIndexCheckpoint) { timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) lastIndexEntry = validBytes } @@ -227,7 +235,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _)) cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } - updateProducerState(producerStateManager, batch) + updateProducerState(producerStateManager, batch, txnIndexCheckpoint) } } } catch { @@ -238,7 +246,6 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, // won't have record corrupted cause truncate // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well. timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true) - timeIndex.trimToValidSize() 0 } @@ -413,7 +420,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, object ElasticLogSegment { def apply(dir: File, meta: ElasticStreamSegmentMeta, sm: ElasticStreamSliceManager, logConfig: LogConfig, - time: Time, segmentEventListener: ElasticLogSegmentEventListener): ElasticLogSegment = { + time: Time, segmentEventListener: ElasticLogSegmentEventListener, logIdent: String = ""): ElasticLogSegment = { val baseOffset = meta.baseOffset val suffix = meta.streamSuffix val log = new ElasticLogFileRecords(sm.loadOrCreateSlice("log" + suffix, meta.log), baseOffset, meta.logSize()) @@ -421,6 +428,6 @@ object ElasticLogSegment { val timeIndex = new ElasticTimeIndex(UnifiedLog.timeIndexFile(dir, baseOffset, suffix), new StreamSliceSupplier(sm, "tim" + suffix, meta.time), baseOffset, logConfig.maxIndexSize, lastTimeIndexEntry) val txnIndex = new ElasticTransactionIndex(UnifiedLog.transactionIndexFile(dir, baseOffset, suffix), new StreamSliceSupplier(sm, "txn" + suffix, meta.txn), baseOffset) - new ElasticLogSegment(meta, log, timeIndex, txnIndex, baseOffset, logConfig.indexInterval, logConfig.segmentJitterMs, time, segmentEventListener) + new ElasticLogSegment(meta, log, timeIndex, txnIndex, baseOffset, logConfig.indexInterval, logConfig.segmentJitterMs, time, segmentEventListener, logIdent) } } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticPartitionProducerSnapshotMeta.java b/core/src/main/scala/kafka/log/streamaspect/ElasticPartitionProducerSnapshotMeta.java deleted file mode 100644 index 9a47471d97..0000000000 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticPartitionProducerSnapshotMeta.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.streamaspect; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -/** - * refers to one snapshot file - */ -public class ElasticPartitionProducerSnapshotMeta { - /** - * raw data of the snapshot - */ - private byte[] rawSnapshotData; - /** - * the offset of the snapshot. Snapshot file name = offset + ".snapshot" - */ - private long offset; - - public ElasticPartitionProducerSnapshotMeta(long offset, byte[] snapshot) { - this.offset = offset; - this.rawSnapshotData = snapshot; - } - - public byte[] getRawSnapshotData() { - return rawSnapshotData; - } - - public void setRawSnapshotData(byte[] rawSnapshotData) { - this.rawSnapshotData = rawSnapshotData; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public boolean isEmpty() { - return rawSnapshotData == null || rawSnapshotData.length == 0; - } - - public ByteBuffer encode() { - if (rawSnapshotData == null || rawSnapshotData.length == 0) { - ByteBuffer buffer = ByteBuffer.allocate(8); - buffer.putLong(offset); - buffer.flip(); - return buffer; - } - - ByteBuffer buffer = ByteBuffer.allocate(rawSnapshotData.length + 8); - buffer.putLong(offset); - buffer.put(rawSnapshotData); - buffer.flip(); - return buffer; - } - - public String fileName() { - return offset + ".snapshot"; - } - - public static ElasticPartitionProducerSnapshotMeta decode(ByteBuffer buffer) { - if (buffer == null || buffer.remaining() == 0) { - return new ElasticPartitionProducerSnapshotMeta(-1, null); - } - - long offset = buffer.getLong(); - byte[] snapshot = new byte[buffer.remaining()]; - buffer.get(snapshot); - return new ElasticPartitionProducerSnapshotMeta(offset, snapshot); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ElasticPartitionProducerSnapshotMeta)) { - return false; - } - ElasticPartitionProducerSnapshotMeta other = (ElasticPartitionProducerSnapshotMeta) o; - return offset == other.offset && Arrays.equals(rawSnapshotData, other.rawSnapshotData); - } - - @Override - public int hashCode() { - if (isEmpty()) { - return 0; - } - return (int) (offset ^ (offset >>> 32)) + Arrays.hashCode(rawSnapshotData); - } -} diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticPartitionProducerSnapshotsMeta.java b/core/src/main/scala/kafka/log/streamaspect/ElasticPartitionProducerSnapshotsMeta.java index 2142c0a59e..efcedf0f81 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticPartitionProducerSnapshotsMeta.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticPartitionProducerSnapshotsMeta.java @@ -17,56 +17,68 @@ package kafka.log.streamaspect; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; /** * refers to all valid snapshot files */ public class ElasticPartitionProducerSnapshotsMeta { - private final Set snapshots; + public static final byte MAGIC_CODE = 0x18; + private final Map snapshots; public ElasticPartitionProducerSnapshotsMeta() { - this.snapshots = new HashSet<>(); + this(new HashMap<>()); } - public ElasticPartitionProducerSnapshotsMeta(Set snapshots) { + public ElasticPartitionProducerSnapshotsMeta(Map snapshots) { this.snapshots = snapshots; } - public Set getSnapshots() { + public Map getSnapshots() { return snapshots; } - public void remove(Long offset) { - snapshots.remove(offset); - } - - public void add(Long offset) { - snapshots.add(offset); + public boolean isEmpty() { + return snapshots.isEmpty(); } public ByteBuffer encode() { - ByteBuffer buffer = ByteBuffer.allocate(snapshots.size() * 8); - snapshots.forEach(item -> { - if (item != null) { - buffer.putLong(item); + int size = 1 /* magic code */ + snapshots.size() * (8 /* offset */ + 4 /* length */); + for (ByteBuffer snapshot : snapshots.values()) { + if (snapshot != null) { + size += snapshot.remaining(); } + } + ByteBuf buf = Unpooled.buffer(size); + buf.writeByte(MAGIC_CODE); + snapshots.forEach((offset, snapshot) -> { + buf.writeLong(offset); + buf.writeInt(snapshot.remaining()); + buf.writeBytes(snapshot.duplicate()); }); - buffer.flip(); - return buffer; + return buf.nioBuffer(); } public static ElasticPartitionProducerSnapshotsMeta decode(ByteBuffer buffer) { - Set snapshots = new HashSet<>(); - while (buffer.hasRemaining()) { - snapshots.add(buffer.getLong()); + ByteBuf buf = Unpooled.wrappedBuffer(buffer); + byte magicCode = buf.readByte(); + if (magicCode != MAGIC_CODE) { + throw new IllegalArgumentException("invalid magic code " + magicCode); + } + Map snapshots = new HashMap<>(); + while (buf.readableBytes() != 0) { + long offset = buf.readLong(); + int length = buf.readInt(); + byte[] snapshot = new byte[length]; + buf.readBytes(snapshot); + ByteBuffer snapshotBuf = ByteBuffer.wrap(snapshot); + snapshots.put(offset, snapshotBuf); } return new ElasticPartitionProducerSnapshotsMeta(snapshots); } - - public boolean isEmpty() { - return snapshots.isEmpty(); - } } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticProducerStateManager.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticProducerStateManager.scala index a8730efb17..0ba872350b 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticProducerStateManager.scala @@ -23,8 +23,11 @@ import org.apache.kafka.common.protocol.types.SchemaException import org.apache.kafka.common.utils.Time import java.io.File -import java.util.concurrent.ConcurrentSkipListMap +import java.nio.ByteBuffer +import java.util +import java.util.concurrent.{CompletableFuture, ConcurrentSkipListMap} import scala.collection.mutable +import scala.jdk.CollectionConverters.CollectionHasAsScala /** * ElasticProducerStateManager. Temporarily, we only persist the last snapshot. @@ -37,16 +40,16 @@ class ElasticProducerStateManager( override val maxTransactionTimeoutMs: Int, override val producerStateManagerConfig: ProducerStateManagerConfig, override val time: Time, - val snapshotsMap: mutable.Map[Long, ElasticPartitionProducerSnapshotMeta], - val persistFun: ElasticPartitionProducerSnapshotMeta => Unit + val snapshotsMap: util.NavigableMap[java.lang.Long, ByteBuffer], + val persistFun: MetaKeyValue => CompletableFuture[Void] ) extends ProducerStateManager(topicPartition, __logDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) { this.logIdent = s"[ElasticProducerStateManager partition=$topicPartition] " override protected def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() - snapshotsMap.foreach { case (offset, meta) => - tm.put(offset, SnapshotFile(new File(meta.fileName()))) + snapshotsMap.forEach { case (offset, meta) => + tm.put(offset, SnapshotFile(new File(s"$offset.snapshot"))) } tm } @@ -76,39 +79,82 @@ class ElasticProducerStateManager( } } + + override def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = { + // remove all out of range snapshots + snapshots.values().asScala.foreach { snapshot => + if (snapshot.offset > logEndOffset || snapshot.offset <= logStartOffset) { + removeAndDeleteSnapshot(snapshot.offset) + } + } + + if (logEndOffset != mapEndOffset) { + producers.clear() + ongoingTxns.clear() + updateOldestTxnTimestamp() + + // since we assume that the offset is less than or equal to the high watermark, it is + // safe to clear the unreplicated transactions + unreplicatedTxns.clear() + } + loadFromSnapshot(logStartOffset, currentTimeMs) + } + override def takeSnapshot(): Unit = { + takeSnapshot0() + } + + def takeSnapshot0(): CompletableFuture[Void] = { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { val snapshotFile = SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset)) val start = time.hiResClockMs() - writeSnapshot(snapshotFile.offset, producers) + val rst = writeSnapshot(snapshotFile.offset, producers) + snapshots.put(snapshotFile.offset, snapshotFile) info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.") - snapshots.put(snapshotFile.offset, snapshotFile) // Update the last snap offset according to the serialized map lastSnapOffset = lastMapOffset + rst + } else { + CompletableFuture.completedFuture(null) } } - private def writeSnapshot(offset: Long, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { + /** + * Only keep the last snapshot which offset less than or equals to the recoveryPointCheckpoint + */ + def takeSnapshotAndRemoveExpired(recoveryPointCheckpoint: Long): CompletableFuture[Void] = { + val lastSnapshotOffset = snapshotsMap.floorKey(recoveryPointCheckpoint) + if (lastSnapshotOffset != null) { + val expiredSnapshotOffsets = new util.ArrayList[java.lang.Long](snapshotsMap.headMap(lastSnapshotOffset, false).keySet()) + expiredSnapshotOffsets.forEach(offset => { + snapshotsMap.remove(offset) + snapshots.remove(offset) + }) + } + takeSnapshot0() + } + + private def writeSnapshot(offset: Long, entries: mutable.Map[Long, ProducerStateEntry]): CompletableFuture[Void] = { val buffer = ProducerStateManager.writeSnapshotToBuffer(entries) val rawSnapshot: Array[Byte] = new Array[Byte](buffer.remaining()) buffer.get(rawSnapshot) - val meta = new ElasticPartitionProducerSnapshotMeta(offset, rawSnapshot) - snapshotsMap.put(offset, meta) - persistFun(meta) + snapshotsMap.put(offset, ByteBuffer.wrap(rawSnapshot)) + val meta = new ElasticPartitionProducerSnapshotsMeta(snapshotsMap) + persistFun(MetaKeyValue.of(MetaStream.PRODUCER_SNAPSHOTS_META_KEY, meta.encode())) } private def readSnapshot(file: File): Iterable[ProducerStateEntry] = { val offset = LocalLog.offsetFromFile(file) - if (!snapshotsMap.contains(offset)) { + if (!snapshotsMap.containsKey(offset)) { throw new CorruptSnapshotException(s"Snapshot not found") } try { - ProducerStateManager.readSnapshotFromBuffer(snapshotsMap(offset).getRawSnapshotData) + ProducerStateManager.readSnapshotFromBuffer(snapshotsMap.get(offset).array()) } catch { case e: SchemaException => throw new CorruptSnapshotException(s"Snapshot failed schema validation: ${e.getMessage}") @@ -127,13 +173,17 @@ class ElasticProducerStateManager( None } + override def deleteSnapshotFile(snapshotFile: SnapshotFile): Unit = { + deleteSnapshot(snapshotFile.offset) + } + private def deleteSnapshot(snapshotOffset: Long): Unit = { + // the real deletion will happens after meta compaction. snapshots.remove(snapshotOffset) - snapshotsMap.remove(snapshotOffset).foreach(snapshot => { - snapshot.setRawSnapshotData(null) - persistFun(snapshot) - info(s"Deleted producer snapshot file '$snapshotOffset' for partition $topicPartition") - }) + val deleted = snapshotsMap.remove(snapshotOffset) + if (deleted != null && isDebugEnabled) { + debug(s"Deleted producer snapshot file '$snapshotOffset' for partition $topicPartition") + } } } @@ -144,8 +194,8 @@ object ElasticProducerStateManager { maxTransactionTimeoutMs: Int, producerStateManagerConfig: ProducerStateManagerConfig, time: Time, - snapshotMap: mutable.Map[Long, ElasticPartitionProducerSnapshotMeta], - persistFun: ElasticPartitionProducerSnapshotMeta => Unit + snapshotMap: util.Map[java.lang.Long, ByteBuffer], + persistFun: MetaKeyValue => CompletableFuture[Void] ): ElasticProducerStateManager = { val stateManager = new ElasticProducerStateManager( topicPartition, @@ -153,7 +203,7 @@ object ElasticProducerStateManager { maxTransactionTimeoutMs, producerStateManagerConfig, time, - snapshotMap, + new util.TreeMap[java.lang.Long, ByteBuffer](snapshotMap), persistFun ) stateManager diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala index cc4f79b12e..f2c2615c29 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticTimeIndex.scala @@ -50,6 +50,18 @@ class ElasticTimeIndex(__file: File, streamSegmentSupplier: StreamSliceSupplier, override def lastEntry: TimestampOffset = _lastEntry + /** + * In the case of unclean shutdown, the last entry needs to be recovered from the time index. + */ + def loadLastEntry(): TimestampOffset = { + if (entries == 0) { + TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) + } else { + _lastEntry = entry(entries - 1) + _lastEntry + } + } + private def lastEntryFromIndexFile: TimestampOffset = { inLock(lock) { _entries match { diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.scala index 1ce5dac336..992345b94a 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticTransactionIndex.scala @@ -34,6 +34,20 @@ class ElasticTransactionIndex(__file: File, streamSliceSupplier: StreamSliceSupp @volatile private var lastAppend: CompletableFuture[_] = CompletableFuture.completedFuture(null) private var closed = false + /** + * In the case of unclean shutdown, the last entry needs to be recovered from the txn index. + */ + def loadLastOffset(): Option[Long] = { + if (stream.nextOffset() == 0) { + None + } else { + val record = stream.fetch(stream.nextOffset() - 1, 1, Int.MaxValue).get().recordBatchList().get(0) + val readBuf = record.rawPayload() + val abortedTxn = new AbortedTxn(readBuf) + Some(abortedTxn.lastOffset) + } + } + override def append(abortedTxn: AbortedTxn): Unit = { if (closed) throw new IOException(s"Attempt to append to closed transaction index $file") diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala index a4c5f435c0..0067692ba7 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala @@ -18,15 +18,19 @@ package kafka.log.streamaspect import kafka.log._ +import kafka.log.streamaspect.ElasticUnifiedLog.{CheckpointExecutor, MaxCheckpointIntervalBytes, MinCheckpointIntervalMs} import kafka.server.epoch.LeaderEpochFileCache -import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata} +import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogOffsetMetadata, RequestLocal} import kafka.utils.Logging import org.apache.kafka.common.errors.OffsetOutOfRangeException -import org.apache.kafka.common.record.{RecordBatch, RecordVersion, Records} -import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordVersion, Records} +import org.apache.kafka.common.utils.{ThreadUtils, Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.server.common.MetadataVersion -import java.util.concurrent.CompletableFuture +import java.nio.ByteBuffer +import java.util +import java.util.concurrent.{CompletableFuture, Executors} import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -44,10 +48,31 @@ class ElasticUnifiedLog(_logStartOffset: Long, elasticLog.confirmOffsetChangeListener = Some(() => confirmOffsetChangeListener.map(_.apply())) + var checkpointIntervalBytes = 0 + var lastCheckpointTimestamp = time.milliseconds() + def confirmOffset(): LogOffsetMetadata = { elasticLog.confirmOffset } + + override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin, interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal): LogAppendInfo = { + checkpointIntervalBytes += records.sizeInBytes() + val rst = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion, requestLocal) + if (checkpointIntervalBytes > MaxCheckpointIntervalBytes && time.milliseconds() - lastCheckpointTimestamp > MinCheckpointIntervalMs) { + checkpointIntervalBytes = 0 + lastCheckpointTimestamp = time.milliseconds() + CheckpointExecutor.execute(() => checkpoint()) + } + rst + } + + private def checkpoint(): Unit = { + producerStateManager.asInstanceOf[ElasticProducerStateManager].takeSnapshotAndRemoveExpired(elasticLog.recoveryPoint) + flush(true) + elasticLog.persistRecoverOffsetCheckpoint() + } + override private[log] def replaceSegments(newSegments: collection.Seq[LogSegment], oldSegments: collection.Seq[LogSegment]): Unit = { val deletedSegments = elasticLog.replaceSegments(newSegments, oldSegments) deleteProducerSnapshots(deletedSegments, asyncDelete = true) @@ -69,15 +94,6 @@ class ElasticUnifiedLog(_logStartOffset: Long, // topic id is passed by constructor arguments every time, there is no need load from partition meta file. } - // only for testing - private[log] def listProducerSnapshots(): mutable.Map[Long, ElasticPartitionProducerSnapshotMeta] = { - val producerSnapshots: mutable.Map[Long, ElasticPartitionProducerSnapshotMeta] = mutable.Map() - elasticLog.metaStream.getAllProducerSnapshots.forEach((offset, snapshot) => { - producerSnapshots.put(offset.longValue(), snapshot) - }) - producerSnapshots - } - // only for testing private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment], asyncDelete: Boolean): Unit = { @@ -144,9 +160,18 @@ class ElasticUnifiedLog(_logStartOffset: Long, override private[log] def delete(): Unit = { throw new UnsupportedOperationException("delete() is not supported for ElasticUnifiedLog") } + + // only used for test + def listProducerSnapshots(): util.NavigableMap[java.lang.Long, ByteBuffer] = { + producerStateManager.asInstanceOf[ElasticProducerStateManager].snapshotsMap + } } object ElasticUnifiedLog extends Logging { + private val CheckpointExecutor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("checkpoint-executor", true)) + private val MaxCheckpointIntervalBytes = 50 * 1024 * 1024 + private val MinCheckpointIntervalMs = 10 * 1000 + def rebuildProducerState(producerStateManager: ProducerStateManager, segments: LogSegments, logStartOffset: Long, diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index b2631a288d..f39921b27d 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -17,26 +17,25 @@ package kafka.log.streamaspect; -import com.automq.stream.api.ReadOptions; -import io.netty.buffer.Unpooled; +import com.automq.stream.DefaultRecordBatch; import com.automq.stream.api.AppendResult; import com.automq.stream.api.FetchResult; +import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; -import org.apache.commons.lang3.tuple.Pair; +import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -51,25 +50,27 @@ public class MetaStream implements Stream { public static final String LOG_META_KEY = "LOG"; public static final String PRODUCER_SNAPSHOTS_META_KEY = "PRODUCER_SNAPSHOTS"; - public static final String PRODUCER_SNAPSHOT_KEY_PREFIX = "PRODUCER_SNAPSHOT_"; public static final String PARTITION_META_KEY = "PARTITION"; public static final String LEADER_EPOCH_CHECKPOINT_KEY = "LEADER_EPOCH_CHECKPOINT"; public static final Logger LOGGER = LoggerFactory.getLogger(MetaStream.class); + private static final double COMPACTION_HOLLOW_RATE = 0.95; + private static final long COMPACTION_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(1); + private final Stream innerStream; - private final ScheduledExecutorService trimScheduler; + private final ScheduledExecutorService scheduler; private final String logIdent; /** * metaCache is used to cache meta key values. * key: meta key * value: pair of base offset and meta value */ - private final Map> metaCache; + private final Map metaCache; /** * trimFuture is used to record a trim task. It may be cancelled and rescheduled. */ - private ScheduledFuture trimFuture; + private ScheduledFuture compactionFuture; /** * closed is used to record if the stream is fenced. @@ -81,9 +82,9 @@ public class MetaStream implements Stream { */ private volatile boolean replayDone; - public MetaStream(Stream innerStream, ScheduledExecutorService trimScheduler, String logIdent) { + public MetaStream(Stream innerStream, ScheduledExecutorService scheduler, String logIdent) { this.innerStream = innerStream; - this.trimScheduler = trimScheduler; + this.scheduler = scheduler; this.metaCache = new ConcurrentHashMap<>(); this.logIdent = logIdent; this.replayDone = false; @@ -114,10 +115,10 @@ public CompletableFuture append(RecordBatch batch) { throw new UnsupportedOperationException("append record batch is not supported in meta stream"); } - public CompletableFuture append(MetaKeyValue kv) { + public synchronized CompletableFuture append(MetaKeyValue kv) { + metaCache.put(kv.getKey(), new MetadataValue(nextOffset(), kv.getValue())); return append0(kv).thenApply(result -> { - metaCache.put(kv.getKey(), Pair.of(result.baseOffset(), kv.getValue())); - trimAsync(); + tryCompaction(); return result; }); } @@ -142,8 +143,8 @@ public AppendResult appendSync(MetaKeyValue kv) throws IOException { * * @return a future of append result */ - private CompletableFuture append0(MetaKeyValue kv) { - return innerStream.append(RawPayloadRecordBatch.of(MetaKeyValue.encode(kv))); + private synchronized CompletableFuture append0(MetaKeyValue kv) { + return innerStream.append(new DefaultRecordBatch(1, System.currentTimeMillis(), Collections.emptyMap(), MetaKeyValue.encode(kv))); } @Override @@ -158,8 +159,8 @@ public CompletableFuture trim(long newStartOffset) { @Override public CompletableFuture close() { - if (trimFuture != null) { - trimFuture.cancel(true); + if (compactionFuture != null) { + compactionFuture.cancel(true); } return doCompaction() .thenRun(innerStream::close) @@ -172,8 +173,8 @@ public boolean isFenced() { @Override public CompletableFuture destroy() { - if (trimFuture != null) { - trimFuture.cancel(true); + if (compactionFuture != null) { + compactionFuture.cancel(true); } return innerStream.destroy(); } @@ -186,11 +187,15 @@ public CompletableFuture destroy() { public Map replay() throws IOException { replayDone = false; metaCache.clear(); - StringBuilder sb = new StringBuilder(logIdent) - .append("metaStream replay summary:") - .append(" id: ") - .append(streamId()) - .append(", "); + boolean summaryEnabled = LOGGER.isDebugEnabled(); + StringBuilder sb = new StringBuilder(); + if (summaryEnabled) { + sb.append(logIdent) + .append("metaStream replay summary:") + .append(" id: ") + .append(streamId()) + .append(", "); + } long totalValueSize = 0L; long startOffset = startOffset(); @@ -203,9 +208,11 @@ public Map replay() throws IOException { for (RecordBatchWithContext context : fetchRst.recordBatchList()) { try { MetaKeyValue kv = MetaKeyValue.decode(Unpooled.copiedBuffer(context.rawPayload()).nioBuffer()); - metaCache.put(kv.getKey(), Pair.of(context.baseOffset(), kv.getValue())); + metaCache.put(kv.getKey(), new MetadataValue(context.baseOffset(), kv.getValue())); totalValueSize += kv.getValue().remaining(); - sb.append("(key: ").append(kv.getKey()).append(", offset: ").append(context.baseOffset()).append(", value size: ").append(kv.getValue().remaining()).append("); "); + if (summaryEnabled) { + sb.append("(key: ").append(kv.getKey()).append(", offset: ").append(context.baseOffset()).append(", value size: ").append(kv.getValue().remaining()).append("); "); + } } catch (Exception e) { LOGGER.error("{} streamId {}: decode meta failed, offset: {}, error: {}", logIdent, streamId(), context.baseOffset(), e.getMessage()); } @@ -225,116 +232,94 @@ public Map replay() throws IOException { throw new RuntimeException(e); } - if (totalValueSize > 0 && LOGGER.isDebugEnabled()) { + if (totalValueSize > 0 && summaryEnabled) { LOGGER.debug(sb.append("total value size: ").append(totalValueSize).toString()); } return getValidMetaMap(); } - public Map getAllProducerSnapshots() { - if (!metaCache.containsKey(PRODUCER_SNAPSHOTS_META_KEY)) { - return Collections.emptyMap(); - } - Map snapshots = new HashMap<>(); - ElasticPartitionProducerSnapshotsMeta snapshotsMeta = ElasticPartitionProducerSnapshotsMeta.decode(metaCache.get(PRODUCER_SNAPSHOTS_META_KEY).getRight().duplicate()); - snapshotsMeta.getSnapshots().forEach(offset -> { - String key = PRODUCER_SNAPSHOT_KEY_PREFIX + offset; - if (!metaCache.containsKey(key)) { - throw new RuntimeException("Missing producer snapshot meta for offset " + offset); - } - snapshots.put(offset, ElasticPartitionProducerSnapshotMeta.decode(metaCache.get(key).getRight().duplicate())); - }); - return snapshots; - } - private Map getValidMetaMap() { Map metaMap = new HashMap<>(); - metaCache.forEach((key, pair) -> { + metaCache.forEach((key, value) -> { switch (key) { case LOG_META_KEY: - metaMap.put(key, ElasticLogMeta.decode(pair.getRight().duplicate())); + metaMap.put(key, ElasticLogMeta.decode(value.value())); break; case PARTITION_META_KEY: - metaMap.put(key, ElasticPartitionMeta.decode(pair.getRight().duplicate())); + metaMap.put(key, ElasticPartitionMeta.decode(value.value())); break; case PRODUCER_SNAPSHOTS_META_KEY: - metaMap.put(key, ElasticPartitionProducerSnapshotsMeta.decode(pair.getRight().duplicate())); + metaMap.put(key, ElasticPartitionProducerSnapshotsMeta.decode(value.value())); break; case LEADER_EPOCH_CHECKPOINT_KEY: - metaMap.put(key, ElasticLeaderEpochCheckpointMeta.decode(pair.getRight().duplicate())); + metaMap.put(key, ElasticLeaderEpochCheckpointMeta.decode(value.value())); break; default: - if (key.startsWith(PRODUCER_SNAPSHOT_KEY_PREFIX)) { - metaMap.put(key, ElasticPartitionProducerSnapshotMeta.decode(pair.getRight().duplicate())); - } else { - LOGGER.error("{} streamId {}: unknown meta key: {}", logIdent, streamId(), key); - } + LOGGER.error("{} streamId {}: unknown meta key: {}", logIdent, streamId(), key); } }); return metaMap; } - private void trimAsync() { - if (trimFuture != null) { - trimFuture.cancel(true); + private void tryCompaction() { + if (compactionFuture != null) { + compactionFuture.cancel(true); } - // trigger after 10 SECONDS to avoid successive trims - trimFuture = trimScheduler.schedule(this::doCompaction, 10, TimeUnit.SECONDS); + // trigger after 10s to avoid compacting too quick + compactionFuture = scheduler.schedule(this::doCompaction, 10, TimeUnit.SECONDS); } - private CompletableFuture doCompaction() { - if (!replayDone || metaCache.size() <= 1) { + private synchronized CompletableFuture doCompaction() { + if (!replayDone) { return CompletableFuture.completedFuture(null); } - - List> futures = new ArrayList<>(); - Set validSnapshots = metaCache.get(PRODUCER_SNAPSHOTS_META_KEY) == null ? Collections.emptySet() : - ElasticPartitionProducerSnapshotsMeta.decode(metaCache.get(PRODUCER_SNAPSHOTS_META_KEY).getRight().duplicate()).getSnapshots(); - - long lastOffset = 0L; - StringBuilder sb = new StringBuilder(logIdent) - .append("metaStream compaction summary:") - .append(" id: ") - .append(streamId()) - .append(", "); - long totalValueSize = 0L; - - Iterator>> iterator = metaCache.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - // remove invalid producer snapshots - if (entry.getKey().startsWith(PRODUCER_SNAPSHOT_KEY_PREFIX)) { - long offset = parseProducerSnapshotOffset(entry.getKey()); - if (!validSnapshots.contains(offset)) { - iterator.remove(); - continue; - } + long startOffset = startOffset(); + long endOffset = nextOffset(); + int size = (int) (endOffset - startOffset); + if (size == 0) { + return CompletableFuture.completedFuture(null); + } + double hollowRate = (double) metaCache.size() / size; + if (hollowRate < COMPACTION_HOLLOW_RATE) { + return CompletableFuture.completedFuture(null); + } + MetadataValue last = null; + for (MetadataValue value : metaCache.values()) { + if (last == null || value.offset > last.offset) { + last = value; } - if (lastOffset < entry.getValue().getLeft()) { - lastOffset = entry.getValue().getLeft(); + } + List overwrite = new LinkedList<>(); + for (Map.Entry entry : metaCache.entrySet()) { + String key = entry.getKey(); + MetadataValue value = entry.getValue(); + if (value == last || last.timestamp - value.timestamp < COMPACTION_THRESHOLD_MS) { + continue; } - totalValueSize += entry.getValue().getRight().remaining(); - sb.append("(key: ").append(entry.getKey()).append(", offset: ").append(entry.getValue().getLeft()).append(", value size: ").append(entry.getValue().getRight().remaining()).append("); "); - futures.add(append0(MetaKeyValue.of(entry.getKey(), entry.getValue().getRight().duplicate()))); + overwrite.add(MetaKeyValue.of(key, value.value())); } - - final long finalLastOffset = lastOffset; - sb.append("compact before: ").append(finalLastOffset + 1).append(", total value size: ").append(totalValueSize); - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenCompose(result -> trim(finalLastOffset + 1)) - .thenRun(() -> { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(sb.toString()); - } - } - ); + CompletableFuture overwriteCf = CompletableFuture.allOf(overwrite.stream().map(this::append).toArray(CompletableFuture[]::new)); + return overwriteCf.thenAccept(nil -> { + OptionalLong minOffset = metaCache.values().stream().mapToLong(v -> v.offset).min(); + minOffset.ifPresent(offset -> { + trim(offset); + LOGGER.info("compact streamId={} done, compact from [{}, {}) to [{}, {})", streamId(), startOffset, endOffset, offset, nextOffset()); + }); + }); } - private long parseProducerSnapshotOffset(String key) { - if (!key.startsWith(PRODUCER_SNAPSHOT_KEY_PREFIX)) { - throw new IllegalArgumentException("Invalid producer snapshot key: " + key); + static class MetadataValue { + private final ByteBuffer value; + final long offset; + final long timestamp = System.currentTimeMillis(); + + public MetadataValue(long offset, ByteBuffer value) { + this.offset = offset; + this.value = value; + } + + public ByteBuffer value() { + return value.duplicate(); } - String[] split = key.split("_"); - return Long.parseLong(split[split.length - 1]); } } diff --git a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java index c297413dc1..1a3e3044d6 100644 --- a/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java +++ b/core/src/main/scala/kafka/log/streamaspect/cache/FileCache.java @@ -116,6 +116,9 @@ public Optional get(String filePath, long position, int length) { return Optional.empty(); } Map.Entry entry = cache.floorEntry(position); + if (entry == null) { + return Optional.empty(); + } long filePosition = entry.getKey(); Value value = entry.getValue(); if (entry.getKey() + entry.getValue().dataLength < position + length) { diff --git a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala index f8de51e00e..842975859d 100644 --- a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticLogSegmentTest.scala @@ -280,6 +280,9 @@ class ElasticLogSegmentTest { assertEquals(100L, abortedTxn.lastStableOffset) // recover again, but this time assuming the transaction from pid2 began on a previous segment + // the elastic log segment instance cannot be repeated recover, so we need to reset the index to support it. + segment.timeIndex.reset() + segment.txnIndex.reset() stateManager = newProducerStateManager() stateManager.loadProducerEntry(new ProducerStateEntry(pid2, mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, diff --git a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala index 2250e375a3..418063955e 100755 --- a/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/streamaspect/ElasticUnifiedLogTest.scala @@ -855,7 +855,7 @@ class ElasticUnifiedLogTest { log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "c".getBytes())), producerId = pid1, producerEpoch = epoch, sequence = 2), leaderEpoch = 0) log.updateHighWatermark(log.logEndOffset) - assertEquals(log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), log.listProducerSnapshots().keys.toSeq.sorted, + assertEquals(log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), log.listProducerSnapshots().keySet().asScala.toSeq.sorted, "expected a snapshot file per segment base offset, except the first segment") assertEquals(2, log.listProducerSnapshots().size) @@ -865,7 +865,7 @@ class ElasticUnifiedLogTest { log.deleteOldSegments() // Sleep to breach the file delete delay and run scheduled file deletion tasks mockTime.sleep(1) - assertEquals(log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), log.listProducerSnapshots().keys.toSeq.sorted, + assertEquals(log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), log.listProducerSnapshots().keySet().asScala.toSeq.sorted, "expected a snapshot file per segment base offset, excluding the first") }