Skip to content

Commit

Permalink
KAFKA-9144; Track timestamp from txn markers to prevent early produce…
Browse files Browse the repository at this point in the history
…r expiration (#7687)

Existing producer state expiration uses timestamps from data records only and not from transaction markers. This can cause premature producer expiration when the coordinator times out a transaction because we drop the state from existing batches. This in turn can allow the coordinator epoch to revert to a previous value, which can lead to validation failures during log recovery. This patch fixes the problem by also leveraging the timestamp from transaction markers.

We also change the validation logic so that coordinator epoch is verified only for new marker appends. When replicating from the leader and when recovering the log, we only log a warning if we notice that the coordinator epoch has gone backwards. This allows recovery from previous occurrences of this bug.

Finally, this patch fixes one minor issue when loading producer state from the snapshot file. When the only record for a given producer is a control record, the "last offset" field will be set to -1 in the snapshot. We should check for this case when loading to be sure we recover the state consistently.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
hachikuji committed Jan 9, 2020
1 parent 30a7db8 commit b13a7ff
Show file tree
Hide file tree
Showing 24 changed files with 509 additions and 351 deletions.
Expand Up @@ -534,16 +534,16 @@ static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}

public static int incrementSequence(int baseSequence, int increment) {
if (baseSequence > Integer.MAX_VALUE - increment)
return increment - (Integer.MAX_VALUE - baseSequence) - 1;
return baseSequence + increment;
public static int incrementSequence(int sequence, int increment) {
if (sequence > Integer.MAX_VALUE - increment)
return increment - (Integer.MAX_VALUE - sequence) - 1;
return sequence + increment;
}

public static int decrementSequence(int baseSequence, int decrement) {
if (baseSequence < decrement)
return Integer.MAX_VALUE - (decrement - baseSequence) + 1;
return baseSequence - decrement;
public static int decrementSequence(int sequence, int decrement) {
if (sequence < decrement)
return Integer.MAX_VALUE - (decrement - sequence) + 1;
return sequence - decrement;
}

private abstract class RecordIterator implements CloseableIterator<Record> {
Expand Down
Expand Up @@ -410,6 +410,12 @@ public void testIncrementSequence() {
assertEquals(4, DefaultRecordBatch.incrementSequence(Integer.MAX_VALUE - 5, 10));
}

@Test
public void testDecrementSequence() {
assertEquals(0, DefaultRecordBatch.decrementSequence(5, 5));
assertEquals(Integer.MAX_VALUE, DefaultRecordBatch.decrementSequence(0, 1));
}

private static DefaultRecordBatch recordsWithInvalidRecordCount(Byte magicValue, long timestamp,
CompressionType codec, int invalidCount) {
ByteBuffer buf = ByteBuffer.allocate(512);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Expand Up @@ -938,7 +938,7 @@ class Partition(val topicPartition: TopicPartition,
}
}

def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
Expand All @@ -951,7 +951,7 @@ class Partition(val topicPartition: TopicPartition,
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
}

val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion)

// we may need to increment high watermark since ISR could be down to 1
Expand Down
Expand Up @@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
import kafka.common.{MessageFormatter, OffsetAndMetadata}
import kafka.log.AppendOrigin
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager}
import kafka.utils.CoreUtils.inLock
Expand Down Expand Up @@ -315,7 +316,7 @@ class GroupMetadataManager(brokerId: Int,
timeout = config.offsetCommitTimeoutMs.toLong,
requiredAcks = config.offsetCommitRequiredAcks,
internalTopicsAllowed = true,
isFromClient = false,
origin = AppendOrigin.Coordinator,
entriesPerPartition = records,
delayedProduceLock = Some(group.lock),
responseCallback = callback)
Expand Down Expand Up @@ -830,7 +831,7 @@ class GroupMetadataManager(brokerId: Int,
// do not need to require acks since even if the tombstone is lost,
// it will be appended again in the next purge cycle
val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones.toArray: _*)
partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0)
partition.appendRecordsToLeader(records, origin = AppendOrigin.Coordinator, requiredAcks = 0)

offsetsRemoved += removedOffsets.size
trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
Expand Down
Expand Up @@ -22,24 +22,24 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock

import kafka.log.LogConfig
import kafka.log.{AppendOrigin, LogConfig}
import kafka.message.UncompressedCodec
import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{Logging, Pool, Scheduler}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.stats.{Avg, Max}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, Max}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.collection.mutable


object TransactionStateManager {
Expand Down Expand Up @@ -211,7 +211,7 @@ class TransactionStateManager(brokerId: Int,
config.requestTimeoutMs,
TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
isFromClient = false,
origin = AppendOrigin.Coordinator,
recordsPerPartition,
removeFromCacheCallback,
Some(stateLock.readLock)
Expand Down Expand Up @@ -638,7 +638,7 @@ class TransactionStateManager(brokerId: Int,
newMetadata.txnTimeoutMs.toLong,
TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
isFromClient = false,
origin = AppendOrigin.Coordinator,
recordsPerPartition,
updateCacheCallback,
delayedProduceLock = Some(stateLock.readLock))
Expand Down
48 changes: 27 additions & 21 deletions core/src/main/scala/kafka/log/Log.scala
Expand Up @@ -902,7 +902,7 @@ class Log(@volatile var dir: File,
val maybeCompletedTxn = updateProducers(batch,
loadedProducers,
firstOffsetMetadata = None,
isFromClient = false)
origin = AppendOrigin.Replication)
maybeCompletedTxn.foreach(completedTxns += _)
}
}
Expand Down Expand Up @@ -991,14 +991,16 @@ class Log(@volatile var dir: File,
* Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
*
* @param records The records to append
* @param isFromClient Whether or not this append is from a producer
* @param origin Declares the origin of the append which affects required validations
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
def appendAsLeader(records: MemoryRecords,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.Client,
interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
}

/**
Expand All @@ -1009,7 +1011,11 @@ class Log(@volatile var dir: File,
* @return Information about the appended messages including the first and last offset.
*/
def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
append(records,
origin = AppendOrigin.Replication,
interBrokerProtocolVersion = ApiVersion.latestVersion,
assignOffsets = false,
leaderEpoch = -1)
}

/**
Expand All @@ -1019,7 +1025,7 @@ class Log(@volatile var dir: File,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
* @param records The log records to append
* @param isFromClient Whether or not this append is from a producer
* @param origin Declares the origin of the append which affects required validations
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
Expand All @@ -1028,9 +1034,13 @@ class Log(@volatile var dir: File,
* @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
* @return Information about the appended messages including the first and last offset.
*/
private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo = {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
val appendInfo = analyzeAndValidateRecords(records, origin)

// return if we have no valid messages or if this is a duplicate of the last appended entry
if (appendInfo.shallowCount == 0)
Expand Down Expand Up @@ -1060,7 +1070,7 @@ class Log(@volatile var dir: File,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
isFromClient,
origin,
interBrokerProtocolVersion,
brokerTopicStats)
} catch {
Expand Down Expand Up @@ -1146,7 +1156,7 @@ class Log(@volatile var dir: File,
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, isFromClient)
logOffsetMetadata, validRecords, origin)

maybeDuplicate.foreach { duplicate =>
appendInfo.firstOffset = Some(duplicate.firstOffset)
Expand Down Expand Up @@ -1263,7 +1273,7 @@ class Log(@volatile var dir: File,

private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,
isFromClient: Boolean):
origin: AppendOrigin):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
Expand All @@ -1275,7 +1285,7 @@ class Log(@volatile var dir: File,

// if this is a client produce request, there will be up to 5 batches which could have been duplicated.
// If we find a duplicate, we return the metadata of the appended batch to the client.
if (isFromClient) {
if (origin == AppendOrigin.Client) {
maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
return (updatedProducers, completedTxns.toList, Some(duplicate))
}
Expand All @@ -1288,11 +1298,7 @@ class Log(@volatile var dir: File,
else
None

val maybeCompletedTxn = updateProducers(batch,
updatedProducers,
firstOffsetMetadata = firstOffsetMetadata,
isFromClient = isFromClient)

val maybeCompletedTxn = updateProducers(batch, updatedProducers, firstOffsetMetadata, origin)
maybeCompletedTxn.foreach(completedTxns += _)
}

Expand All @@ -1319,7 +1325,7 @@ class Log(@volatile var dir: File,
* <li> Whether any compression codec is used (if many are used, then the last one is given)
* </ol>
*/
private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = {
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset: Option[Long] = None
Expand All @@ -1333,7 +1339,7 @@ class Log(@volatile var dir: File,

for (batch <- records.batches.asScala) {
// we only validate V2 and higher to avoid potential compatibility issues with older clients
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.Client && batch.baseOffset != 0)
throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
s"be 0, but it is ${batch.baseOffset}")

Expand Down Expand Up @@ -1394,9 +1400,9 @@ class Log(@volatile var dir: File,
private def updateProducers(batch: RecordBatch,
producers: mutable.Map[Long, ProducerAppendInfo],
firstOffsetMetadata: Option[LogOffsetMetadata],
isFromClient: Boolean): Option[CompletedTxn] = {
origin: AppendOrigin): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient))
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
appendInfo.append(batch, firstOffsetMetadata)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogSegment.scala
Expand Up @@ -245,7 +245,7 @@ class LogSegment private[log] (val log: FileRecords,
private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
if (batch.hasProducerId) {
val producerId = batch.producerId
val appendInfo = producerStateManager.prepareUpdate(producerId, isFromClient = false)
val appendInfo = producerStateManager.prepareUpdate(producerId, origin = AppendOrigin.Replication)
val maybeCompletedTxn = appendInfo.append(batch, firstOffsetMetadataOpt = None)
producerStateManager.update(appendInfo)
maybeCompletedTxn.foreach { completedTxn =>
Expand Down

0 comments on commit b13a7ff

Please sign in to comment.