Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14920: Address timeouts and out of order sequences #14033

Merged
merged 4 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,9 @@ class Partition(val topicPartition: TopicPartition,
}

// Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
def maybeStartTransactionVerification(producerId: Long): Object = {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = {
leaderLogIfLocal match {
case Some(log) => log.maybeStartTransactionVerification(producerId)
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
case None => throw new NotLeaderOrFollowerException();
}
}
Expand Down
22 changes: 16 additions & 6 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -581,18 +581,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return null.
*/
def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = lock synchronized {
if (hasOngoingTransaction(producerId))
null
else
getOrMaybeCreateVerificationGuard(producerId, true)
maybeCreateVerificationGuard(producerId, sequence, epoch)
}

/**
* Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
* Maybe create the VerificationStateEntry for the given producer ID -- always return the verification guard
*/
def getOrMaybeCreateVerificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId, createIfAbsent)
def maybeCreateVerificationGuard(producerId: Long,
sequence: Int,
epoch: Short): Object = lock synchronized {
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
}

/**
* If an VerificationStateEntry is present for the given producer ID, return its verification guard, otherwise, return null.
*/
def verificationGuard(producerId: Long): Object = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId)
if (entry != null) entry.verificationGuard else null
}

Expand Down Expand Up @@ -1042,7 +1051,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: Object): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && (requestVerificationGuard != getOrMaybeCreateVerificationGuard(batch.producerId) || requestVerificationGuard == null)
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() &&
(requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ class ReplicaManager(val config: KafkaConfig,

if (transactionalBatches.nonEmpty) {
// We return verification guard if the partition needs to be verified. If no state is present, no need to verify.
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(records.firstBatch.producerId)
val firstBatch = records.firstBatch
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
if (verificationGuard != null) {
verificationGuards.put(topicPartition, verificationGuard)
unverifiedEntries.put(topicPartition, records)
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ class PartitionTest extends AbstractPartitionTest {
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = 0L,
producerId = 2L)
val verificationGuard = partition.maybeStartTransactionVerification(2L)
val verificationGuard = partition.maybeStartTransactionVerification(2L, 0, 0)
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching, verificationGuard)

def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = {
Expand Down Expand Up @@ -3390,20 +3390,20 @@ class PartitionTest extends AbstractPartitionTest {
assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))

// Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
val verificationGuard = partition.maybeStartTransactionVerification(producerId)
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertNotNull(verificationGuard)

// With the wrong verification guard, append should fail.
assertThrows(classOf[InvalidRecordException], () => partition.appendRecordsToLeader(transactionRecords(),
origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))

// We should return the same verification object when we still need to verify. Append should proceed.
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId)
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertEquals(verificationGuard, verificationGuard2)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)

// We should no longer need a verification object. Future appends without verification guard will also succeed.
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId)
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertNull(verificationGuard3)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
}
Expand Down
57 changes: 48 additions & 9 deletions core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ class ProducerStateManagerTest {
val producerEpoch = 0.toShort
val offset = 992342L
val seq = 0
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT)
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), AppendOrigin.CLIENT,
stateManager.maybeCreateVerificationStateEntry(producerId, seq, producerEpoch))

val firstOffsetMetadata = new LogOffsetMetadata(offset, 990000L, 234224)
producerAppendInfo.appendDataBatch(producerEpoch, seq, seq, time.milliseconds(),
Expand Down Expand Up @@ -388,7 +389,8 @@ class ProducerStateManagerTest {
partition,
producerId,
ProducerStateEntry.empty(producerId),
AppendOrigin.CLIENT
AppendOrigin.CLIENT,
stateManager.maybeCreateVerificationStateEntry(producerId, 0, producerEpoch)
)
val firstOffsetMetadata = new LogOffsetMetadata(startOffset, segmentBaseOffset, 50 * relativeOffset)
producerAppendInfo.appendDataBatch(producerEpoch, 0, 0, time.milliseconds(),
Expand Down Expand Up @@ -1089,37 +1091,74 @@ class ProducerStateManagerTest {

@Test
def testEntryForVerification(): Unit = {
val originalEntry = stateManager.verificationStateEntry(producerId, true)
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
val originalEntryVerificationGuard = originalEntry.verificationGuard()

def verifyEntry(producerId: Long, newEntry: VerificationStateEntry): Unit = {
val entry = stateManager.verificationStateEntry(producerId, false)
val entry = stateManager.verificationStateEntry(producerId)
assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
assertEquals(entry.verificationGuard, newEntry.verificationGuard)
}

// If we already have an entry, reuse it.
val updatedEntry = stateManager.verificationStateEntry(producerId, true)
val updatedEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
verifyEntry(producerId, updatedEntry)

// Add the transactional data and clear the entry.
append(stateManager, producerId, 0, 0, offset = 0, isTransactional = true)
stateManager.clearVerificationStateEntry(producerId)
assertNull(stateManager.verificationStateEntry(producerId, false))
assertNull(stateManager.verificationStateEntry(producerId))
}

@Test
def testSequenceAndEpochInVerificationEntry(): Unit = {
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 1, 0)
val originalEntryVerificationGuard = originalEntry.verificationGuard()

def verifyEntry(producerId: Long, newEntry: VerificationStateEntry, expectedSequence: Int, expectedEpoch: Short): Unit = {
val entry = stateManager.verificationStateEntry(producerId)
assertEquals(originalEntryVerificationGuard, entry.verificationGuard)
assertEquals(entry.verificationGuard, newEntry.verificationGuard)
assertEquals(expectedSequence, entry.lowestSequence)
assertEquals(expectedEpoch, entry.epoch)
}
verifyEntry(producerId, originalEntry, 1, 0)

// If we see a lower sequence, update to the lower one.
val updatedEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
verifyEntry(producerId, updatedEntry, 0, 0)

// If we see a new epoch that is higher, update the sequence.
val updatedEntryNewEpoch = stateManager.maybeCreateVerificationStateEntry(producerId, 2, 1)
verifyEntry(producerId, updatedEntryNewEpoch, 2, 1)

// Ignore a lower epoch.
val updatedEntryOldEpoch = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
verifyEntry(producerId, updatedEntryOldEpoch, 2, 1)
}

@Test
def testThrowOutOfOrderSequenceWithVerificationSequenceCheck(): Unit = {
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)

// Trying to append with a higher sequence should fail
assertThrows(classOf[OutOfOrderSequenceException], () => append(stateManager, producerId, 0, 4, offset = 0, isTransactional = true))

assertEquals(originalEntry, stateManager.verificationStateEntry(producerId))
}

@Test
def testVerificationStateEntryExpiration(): Unit = {
val originalEntry = stateManager.verificationStateEntry(producerId, true)
val originalEntry = stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)

// Before timeout we do not remove. Note: Accessing the verification entry does not update the time.
time.sleep(producerStateManagerConfig.producerIdExpirationMs / 2)
stateManager.removeExpiredProducers(time.milliseconds())
assertEquals(originalEntry, stateManager.verificationStateEntry(producerId, false))
assertEquals(originalEntry, stateManager.verificationStateEntry(producerId))

time.sleep((producerStateManagerConfig.producerIdExpirationMs / 2) + 1)
stateManager.removeExpiredProducers(time.milliseconds())
assertNull(stateManager.verificationStateEntry(producerId, false))
assertNull(stateManager.verificationStateEntry(producerId))
}

private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = {
Expand Down
22 changes: 11 additions & 11 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3677,7 +3677,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

val idempotentRecords = MemoryRecords.withIdempotentRecords(
CompressionType.NONE,
Expand All @@ -3688,14 +3688,14 @@ class UnifiedLogTest {
new SimpleRecord("2".getBytes)
)

val verificationGuard = log.maybeStartTransactionVerification(producerId)
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
assertNotNull(verificationGuard)

log.appendAsLeader(idempotentRecords, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))

// Since we wrote idempotent records, we keep verification guard.
assertEquals(verificationGuard, log.getOrMaybeCreateVerificationGuard(producerId))
assertEquals(verificationGuard, log.verificationGuard(producerId))

val transactionalRecords = MemoryRecords.withTransactionalRecords(
CompressionType.NONE,
Expand All @@ -3709,10 +3709,10 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId))
// Verification guard should be cleared now.
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

// A subsequent maybeStartTransactionVerification will be empty since we are already verified.
assertNull(log.maybeStartTransactionVerification(producerId))
assertNull(log.maybeStartTransactionVerification(producerId, sequence + 2, producerEpoch))

val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
Expand All @@ -3722,10 +3722,10 @@ class UnifiedLogTest {

log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

// A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
val newVerificationGuard = log.maybeStartTransactionVerification(producerId)
val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence + 3, producerEpoch)
assertNotNull(newVerificationGuard)
assertNotEquals(verificationGuard, newVerificationGuard)
}
Expand All @@ -3739,7 +3739,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)

val verificationGuard = log.maybeStartTransactionVerification(producerId)
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
assertNotNull(verificationGuard)

val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
Expand All @@ -3750,7 +3750,7 @@ class UnifiedLogTest {

log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))
}

@Test
Expand All @@ -3763,7 +3763,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.getOrMaybeCreateVerificationGuard(producerId))
assertNull(log.verificationGuard(producerId))

val transactionalRecords = MemoryRecords.withTransactionalRecords(
CompressionType.NONE,
Expand All @@ -3774,7 +3774,7 @@ class UnifiedLogTest {
new SimpleRecord("2".getBytes)
)

val verificationGuard = log.maybeStartTransactionVerification(producerId)
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
// Append should not throw error.
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2847,7 +2847,7 @@ class ReplicaManagerTest {
private def getVerificationGuard(replicaManager: ReplicaManager,
tp: TopicPartition,
producerId: Long): Object = {
replicaManager.getPartitionOrException(tp).log.get.getOrMaybeCreateVerificationGuard(producerId)
replicaManager.getPartitionOrException(tp).log.get.verificationGuard(producerId)
}

private def setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager: AddPartitionsToTxnManager,
Expand Down