Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ if (file('.git').exists()) {
'licenses/*',
'**/generated/**',
'clients/src/test/resources/serializedData/*',
'.github/**'
'.github/**',
'CODE_OF_CONDUCT.md',
'CONTRIBUTING_GUIDE.md',
])
}
} else {
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/scala/kafka/log/ProducerStateManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,10 @@ class ProducerStateManager(
@volatile private var oldestTxnLastTimestamp: Long = -1L

// ongoing transactions sorted by the first offset of the transaction
private val ongoingTxns = new util.TreeMap[Long, TxnMetadata]
protected val ongoingTxns = new util.TreeMap[Long, TxnMetadata]

// completed transactions whose markers are at offsets above the high watermark
private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
protected val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]

@threadsafe
def hasLateTransaction(currentTimeMs: Long): Boolean = {
Expand Down Expand Up @@ -608,7 +608,7 @@ class ProducerStateManager(
case Some(prev) =>
if (!baseOffsets.contains(key)) {
// this snapshot is now the largest stray snapshot.
prev.deleteIfExists()
deleteSnapshotFile(prev)
ss.remove(prev.offset)
latestStraySnapshot = Some(snapshot)
}
Expand All @@ -623,7 +623,7 @@ class ProducerStateManager(
// delete the largestStraySnapshot.
for (strayOffset <- latestStraySnapshot.map(_.offset); maxOffset <- maxSegmentBaseOffset) {
if (strayOffset < maxOffset) {
Option(ss.remove(strayOffset)).foreach(_.deleteIfExists())
Option(ss.remove(strayOffset)).foreach(s => deleteSnapshotFile(s))
}
}

Expand Down Expand Up @@ -663,6 +663,10 @@ class ProducerStateManager(
*/
private[log] def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset)

def deleteSnapshotFile(snapshotFile: SnapshotFile): Unit = {
snapshotFile.deleteIfExists()
}

/**
* Returns the last offset of this map
*/
Expand Down Expand Up @@ -809,7 +813,9 @@ class ProducerStateManager(
val snapshotFile = SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset))
val start = time.hiResClockMs()
writeSnapshot(snapshotFile.file, producers)
info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.")
if (isDebugEnabled) {
debug(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.")
}

snapshots.put(snapshotFile.offset, snapshotFile)

Expand Down Expand Up @@ -926,7 +932,7 @@ class ProducerStateManager(
* ProducerStateManager, and deletes the backing snapshot file.
*/
protected def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
Option(snapshots.remove(snapshotOffset)).foreach(s => deleteSnapshotFile(s))
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2196,7 +2196,7 @@ object UnifiedLog extends Logging {
handlingDir,
s"Error while deleting producer state snapshots for $topicPartition in dir $handlingDir") {
snapshotsToDelete.foreach { snapshot =>
snapshot.deleteIfExists()
producerStateManager.deleteSnapshotFile(snapshot)
}
}
// AutoMQ for Kafka inject end
Expand Down
48 changes: 16 additions & 32 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.control.Breaks.{break, breakable}

Expand Down Expand Up @@ -140,7 +139,9 @@ class ElasticLog(val metaStream: MetaStream,
}
partitionMeta.setCleanerOffset(offsetCheckpoint)
persistPartitionMeta()
info(s"saved cleanerOffsetCheckpoint: $offsetCheckpoint")
if (isDebugEnabled) {
debug(s"saved cleanerOffsetCheckpoint: $offsetCheckpoint")
}
}

def persistRecoverOffsetCheckpoint(): Unit = {
Expand All @@ -149,7 +150,9 @@ class ElasticLog(val metaStream: MetaStream,
}
partitionMeta.setRecoverOffset(recoveryPoint)
persistPartitionMeta()
info(s"saved recoverOffsetCheckpoint: $recoveryPoint")
if (isDebugEnabled) {
debug(s"saved recoverOffsetCheckpoint: $recoveryPoint")
}
}

def saveLeaderEpochCheckpoint(meta: ElasticLeaderEpochCheckpointMeta): Unit = {
Expand All @@ -170,7 +173,9 @@ class ElasticLog(val metaStream: MetaStream,

private def persistPartitionMeta(): Unit = {
persistMeta(metaStream, MetaKeyValue.of(MetaStream.PARTITION_META_KEY, ElasticPartitionMeta.encode(partitionMeta)))
info(s"${logIdent}save partition meta $partitionMeta")
if (isDebugEnabled) {
debug(s"${logIdent}save partition meta $partitionMeta")
}
}

override private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
Expand All @@ -187,6 +192,7 @@ class ElasticLog(val metaStream: MetaStream,

activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)

APPEND_TIME_HIST.update(System.nanoTime() - startTimestamp)
val endOffset = lastOffset + 1
updateLogEndOffset(endOffset)
Expand Down Expand Up @@ -490,39 +496,17 @@ object ElasticLog extends Logging {
}
info(s"${logIdent}loaded partition meta: $partitionMeta")

def loadAllValidSnapshots(): mutable.Map[Long, ElasticPartitionProducerSnapshotMeta] = {
metaMap.filter(kv => kv._1.startsWith(MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX))
.map(kv => (kv._1.stripPrefix(MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX).toLong, kv._2.asInstanceOf[ElasticPartitionProducerSnapshotMeta]))
}

//load producer snapshots for this partition
val producerSnapshotsMetaOpt = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta])
val (producerSnapshotsMeta, snapshotsMap) = if (producerSnapshotsMetaOpt.isEmpty) {
// No need to persist if not exists
(new ElasticPartitionProducerSnapshotsMeta(), new mutable.HashMap[Long, ElasticPartitionProducerSnapshotMeta]())
} else {
(producerSnapshotsMetaOpt.get, loadAllValidSnapshots())
}
if (snapshotsMap.nonEmpty) {
info(s"${logIdent}loaded ${snapshotsMap.size} producer snapshots, offsets(filenames) are ${snapshotsMap.keys} ")
val producerSnapshotsMeta = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta]).getOrElse(new ElasticPartitionProducerSnapshotsMeta())
val snapshotsMap = producerSnapshotsMeta.getSnapshots
if (!snapshotsMap.isEmpty) {
info(s"${logIdent}loaded ${snapshotsMap.size} producer snapshots, offsets(filenames) are ${snapshotsMap.keySet()} ")
} else {
info(s"${logIdent}loaded no producer snapshots")
}

def persistProducerSnapshotMeta(meta: ElasticPartitionProducerSnapshotMeta): Unit = {
val key = MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX + meta.getOffset
if (meta.isEmpty) {
// The real deletion of this snapshot is done in metaStream's compaction.
producerSnapshotsMeta.remove(meta.getOffset)
} else {
producerSnapshotsMeta.add(meta.getOffset)
persistMeta(metaStream, MetaKeyValue.of(key, meta.encode()))
}
persistMeta(metaStream, MetaKeyValue.of(MetaStream.PRODUCER_SNAPSHOTS_META_KEY, producerSnapshotsMeta.encode()))
}

val producerStateManager = ElasticProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time, snapshotsMap, persistProducerSnapshotMeta)
maxTransactionTimeoutMs, producerStateManagerConfig, time, snapshotsMap, kv => metaStream.append(kv).thenApply(_ => null))

val logMeta: ElasticLogMeta = metaMap.get(MetaStream.LOG_META_KEY).map(m => m.asInstanceOf[ElasticLogMeta]).getOrElse(new ElasticLogMeta())
logStreamManager = new ElasticLogStreamManager(logMeta.getStreamMap, client.streamClient(), config.replicationFactor, leaderEpoch)
Expand Down Expand Up @@ -686,7 +670,7 @@ object ElasticLog extends Logging {
meta.baseOffset(baseOffset)
meta.streamSuffix(suffix)
meta.createTimestamp(time.milliseconds())
val segment: ElasticLogSegment = ElasticLogSegment(dir, meta, streamSliceManager, config, time, logSegmentManager.logSegmentEventListener())
val segment: ElasticLogSegment = ElasticLogSegment(dir, meta, streamSliceManager, config, time, logSegmentManager.logSegmentEventListener(), logIdent = logIdent)
var metaSaveCf: CompletableFuture[Void] = CompletableFuture.completedFuture(null)
if (suffix.equals("")) {
metaSaveCf = logSegmentManager.create(baseOffset, segment)
Expand Down
31 changes: 4 additions & 27 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ElasticLogLoader(logMeta: ElasticLogMeta,
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
createAndSaveSegmentFunc: (Long, File, LogConfig, ElasticStreamSliceManager, Time) => (ElasticLogSegment, CompletableFuture[Void]))
extends Logging {
logIdent = s"[ElasticLogLoader partition=$topicPartition, dir=${dir.getParent}] "
logIdent = s"[ElasticLogLoader partition=$topicPartition] "

/**
* Load the log segments from the log files on disk, and returns the components of the loaded log.
Expand All @@ -68,20 +68,14 @@ class ElasticLogLoader(logMeta: ElasticLogMeta,
// load all segments
loadSegments()

// make sure the producer state manager endOffset is less than or equal to the recoveryPointCheckpoint
producerStateManager.truncateAndReload(logStartOffsetCheckpoint, recoveryPointCheckpoint, time.milliseconds())
val (newRecoveryPoint: Long, nextOffset: Long) = {
recoverLog()
}

val newLogStartOffset = math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)

// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty) {
throw new IllegalStateException("Producer state must be empty during log initialization")
}

producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq)
ElasticUnifiedLog.rebuildProducerState(producerStateManager, segments, newLogStartOffset, nextOffset, time, reloadFromCleanShutdown = false, logIdent)
val activeSegment = segments.lastSegment.get
LoadedLogOffsets(
newLogStartOffset,
Expand All @@ -96,7 +90,7 @@ class ElasticLogLoader(logMeta: ElasticLogMeta,

private def loadSegments(): Unit = {
logMeta.getSegmentMetas.forEach(segmentMeta => {
val segment = ElasticLogSegment(dir, segmentMeta, streamSliceManager, config, time, logSegmentsManager.logSegmentEventListener())
val segment = ElasticLogSegment(dir, segmentMeta, streamSliceManager, config, time, logSegmentsManager.logSegmentEventListener(), logIdent = logIdent)
segments.add(segment)
logSegmentsManager.put(segment.baseOffset, segment)
})
Expand All @@ -110,22 +104,6 @@ class ElasticLogLoader(logMeta: ElasticLogMeta,
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private def recoverSegment(segment: LogSegment): Int = {
val producerStateManager = ElasticProducerStateManager(
topicPartition,
dir,
this.producerStateManager.maxTransactionTimeoutMs,
this.producerStateManager.producerStateManagerConfig,
time,
this.producerStateManager.snapshotsMap,
this.producerStateManager.persistFun)
ElasticUnifiedLog.rebuildProducerState(
producerStateManager,
segments,
logStartOffsetCheckpoint,
segment.baseOffset,
time,
reloadFromCleanShutdown = false,
logIdent)
val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.
Expand Down Expand Up @@ -168,7 +146,6 @@ class ElasticLogLoader(logMeta: ElasticLogMeta,
var numFlushed = 0
val threadName = Thread.currentThread().getName
numRemainingSegments.put(threadName, numUnflushed)

while (unflushedIter.hasNext && !truncated) {
val segment = unflushedIter.next()
info(s"Recovering unflushed segment ${segment.baseOffset}. $numFlushed/$numUnflushed recovered for $topicPartition.")
Expand Down
31 changes: 19 additions & 12 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time,
val logListener: ElasticLogSegmentEventListener) extends LogSegment with Comparable[ElasticLogSegment]{
val logListener: ElasticLogSegmentEventListener,
_logIdent: String = "") extends LogSegment with Comparable[ElasticLogSegment]{

logIdent = _logIdent

def log: FileRecords = throw new UnsupportedOperationException()

Expand Down Expand Up @@ -135,15 +138,17 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
}
}

protected def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
protected def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch, txnIndexCheckpoint: Option[Long]): Unit = {
if (batch.hasProducerId) {
val producerId = batch.producerId
val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.Replication)
val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
producerStateManager.update(appendInfo)
maybeCompletedTxn.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
updateTxnIndex(completedTxn, lastStableOffset)
if (txnIndexCheckpoint.isEmpty || txnIndexCheckpoint.get < completedTxn.lastOffset) {
updateTxnIndex(completedTxn, lastStableOffset)
}
producerStateManager.completeTxn(completedTxn)
}
}
Expand Down Expand Up @@ -195,10 +200,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,

@nonthreadsafe
override def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
timeIndex.reset()
txnIndex.reset()
logListener.onEvent(baseOffset, ElasticLogSegmentEvent.SEGMENT_UPDATE)

recover0(producerStateManager, leaderEpochCache)
}

Expand All @@ -207,16 +209,22 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
var validBytes = 0
var lastIndexEntry = 0
maxTimestampAndOffsetSoFar = TimestampOffset.Unknown
// exclusive recover from the checkpoint cause the offset the offset of record in batch
val timeIndexCheckpoint = timeIndex.asInstanceOf[ElasticTimeIndex].loadLastEntry().offset
// exclusive recover from the checkpoint
val txnIndexCheckpoint = txnIndex.loadLastOffset()
try {
for (batch <- _log.batches.asScala) {
val recoverPoint = math.max(producerStateManager.mapEndOffset, baseOffset)
info(s"[UNCLEAN_SHUTDOWN] recover range [$recoverPoint, ${_log.nextOffset()})")
for (batch <- _log.batchesFrom(recoverPoint).asScala) {
batch.ensureValid()
// The max timestamp is exposed at the batch level, so no need to iterate the records
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampAndOffsetSoFar = TimestampOffset(batch.maxTimestamp, batch.lastOffset)
}

// Build offset index
if (validBytes - lastIndexEntry > indexIntervalBytes) {
if (validBytes - lastIndexEntry > indexIntervalBytes && batch.baseOffset() > timeIndexCheckpoint) {
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
Expand All @@ -227,7 +235,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
updateProducerState(producerStateManager, batch, txnIndexCheckpoint)
}
}
} catch {
Expand All @@ -238,7 +246,6 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
// won't have record corrupted cause truncate
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
timeIndex.trimToValidSize()
0
}

Expand Down Expand Up @@ -413,14 +420,14 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,

object ElasticLogSegment {
def apply(dir: File, meta: ElasticStreamSegmentMeta, sm: ElasticStreamSliceManager, logConfig: LogConfig,
time: Time, segmentEventListener: ElasticLogSegmentEventListener): ElasticLogSegment = {
time: Time, segmentEventListener: ElasticLogSegmentEventListener, logIdent: String = ""): ElasticLogSegment = {
val baseOffset = meta.baseOffset
val suffix = meta.streamSuffix
val log = new ElasticLogFileRecords(sm.loadOrCreateSlice("log" + suffix, meta.log), baseOffset, meta.logSize())
val lastTimeIndexEntry = meta.timeIndexLastEntry().toTimestampOffset
val timeIndex = new ElasticTimeIndex(UnifiedLog.timeIndexFile(dir, baseOffset, suffix), new StreamSliceSupplier(sm, "tim" + suffix, meta.time), baseOffset, logConfig.maxIndexSize, lastTimeIndexEntry)
val txnIndex = new ElasticTransactionIndex(UnifiedLog.transactionIndexFile(dir, baseOffset, suffix), new StreamSliceSupplier(sm, "txn" + suffix, meta.txn), baseOffset)

new ElasticLogSegment(meta, log, timeIndex, txnIndex, baseOffset, logConfig.indexInterval, logConfig.segmentJitterMs, time, segmentEventListener)
new ElasticLogSegment(meta, log, timeIndex, txnIndex, baseOffset, logConfig.indexInterval, logConfig.segmentJitterMs, time, segmentEventListener, logIdent)
}
}
Loading