diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 6d976b9b66..cb11cf3039 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -71,6 +71,7 @@ private[log] object ProducerStateEntry { } private[log] case class BatchMetadata(lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) { + var recovered: Boolean = false def firstSeq: Int = DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta) def firstOffset: Long = lastOffset - offsetDelta @@ -140,8 +141,22 @@ private[log] class ProducerStateEntry(val producerId: Long, def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = { if (batch.producerEpoch != producerEpoch) None - else - batchWithSequenceRange(batch.baseSequence, batch.lastSequence) + else { + // AutoMQ for Kafka inject start + val metadata = batchWithSequenceRange(batch.baseSequence, batch.lastSequence) + if (metadata.isEmpty && this.batchMetadata.nonEmpty && this.batchMetadata.front.recovered) { + // the batchMetadata is recovered from snapshot + val front = batchMetadata.front + if (front.firstSeq <= batch.baseSequence() && front.lastSeq >= batch.lastSequence()) { + throw new DuplicateSequenceException( + String.format("The batch is duplicated, broker cached metadata is %s, the record batch is [%s, %s]", + this, batch.baseSequence(), batch.lastSequence()) + ) + } + } + // AutoMQ for Kafka inject end + metadata + } } // Return the batch metadata of the cached batch having the exact sequence range, if any. @@ -405,8 +420,11 @@ object ProducerStateManager { val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata] - if (offset >= 0) - lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp) + if (offset >= 0) { + val metadata = BatchMetadata(seq, offset, offsetDelta, timestamp) + metadata.recovered = true + lastAppendedDataBatches += metadata + } val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch, coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) @@ -435,12 +453,17 @@ object ProducerStateManager { struct.set(CrcField, 0L) // we'll fill this after writing the entries val entriesArray = entries.map { case (producerId, entry) => + // AutoMQ for Kafka inject start + // encode cached entries sequence range to the snapshot, so we can detect the duplicated message after partition reassignment + // TODO: final solution, encoded full cached entries to the snapshot + val offsetDelta = entry.lastSeq - entry.firstSeq + // AutoMQ for Kafka inject end val producerEntryStruct = struct.instance(ProducerEntriesField) producerEntryStruct.set(ProducerIdField, producerId) .set(ProducerEpochField, entry.producerEpoch) .set(LastSequenceField, entry.lastSeq) .set(LastOffsetField, entry.lastDataOffset) - .set(OffsetDeltaField, entry.lastOffsetDelta) + .set(OffsetDeltaField, offsetDelta) .set(TimestampField, entry.lastTimestamp) .set(CoordinatorEpochField, entry.coordinatorEpoch) .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L))