Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions core/src/main/scala/kafka/log/ProducerStateManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private[log] object ProducerStateEntry {
}

private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) {
var recovered: Boolean = false
def firstSeq: Int = DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta)
def firstOffset: Long = lastOffset - offsetDelta

Expand Down Expand Up @@ -140,8 +141,22 @@ private[log] class ProducerStateEntry(val producerId: Long,
def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
if (batch.producerEpoch != producerEpoch)
None
else
batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
else {
// AutoMQ for Kafka inject start
val metadata = batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
if (metadata.isEmpty && this.batchMetadata.nonEmpty && this.batchMetadata.front.recovered) {
// the batchMetadata is recovered from snapshot
val front = batchMetadata.front
if (front.firstSeq <= batch.baseSequence() && front.lastSeq >= batch.lastSequence()) {
throw new DuplicateSequenceException(
String.format("The batch is duplicated, broker cached metadata is %s, the record batch is [%s, %s]",
this, batch.baseSequence(), batch.lastSequence())
)
}
}
// AutoMQ for Kafka inject end
metadata
}
}

// Return the batch metadata of the cached batch having the exact sequence range, if any.
Expand Down Expand Up @@ -405,8 +420,11 @@ object ProducerStateManager {
val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata]
if (offset >= 0)
lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp)
if (offset >= 0) {
val metadata = BatchMetadata(seq, offset, offsetDelta, timestamp)
metadata.recovered = true
lastAppendedDataBatches += metadata
}

val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch,
coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
Expand Down Expand Up @@ -435,12 +453,17 @@ object ProducerStateManager {
struct.set(CrcField, 0L) // we'll fill this after writing the entries
val entriesArray = entries.map {
case (producerId, entry) =>
// AutoMQ for Kafka inject start
// encode cached entries sequence range to the snapshot, so we can detect the duplicated message after partition reassignment
// TODO: final solution, encoded full cached entries to the snapshot
val offsetDelta = entry.lastSeq - entry.firstSeq
// AutoMQ for Kafka inject end
val producerEntryStruct = struct.instance(ProducerEntriesField)
producerEntryStruct.set(ProducerIdField, producerId)
.set(ProducerEpochField, entry.producerEpoch)
.set(LastSequenceField, entry.lastSeq)
.set(LastOffsetField, entry.lastDataOffset)
.set(OffsetDeltaField, entry.lastOffsetDelta)
.set(OffsetDeltaField, offsetDelta)
.set(TimestampField, entry.lastTimestamp)
.set(CoordinatorEpochField, entry.coordinatorEpoch)
.set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))
Expand Down