Skip to content

Commit

Permalink
KAFKA-14884: Include check transaction is still ongoing right before …
Browse files Browse the repository at this point in the history
…append (take 2) (#13787)

Introduced extra mapping to track verification state.

When verifying, there is a race condition that the add partitions verification response returns that the partition is in the ongoing transaction, but an abort marker is written before we get to append. Therefore, we track any given transaction we are verifying with an object unique to that transaction.

We check this unique state upon the first append to the log. After that, we can rely on currentTransactionFirstOffset. We remove the verification state on appending to the log with a transactional data record or marker.

We will also clean up lingering verification state entries via the producer state entry expiration mechanism. We do not update the the timestamp on retrying a verification for a transaction, so each entry must be verified before producer.id.expiration.ms.

There were a few other fixes:
- Moved the transaction manager handling for failed batch into the future completed exceptionally block to avoid processing it twice (this caused issues in unit tests)
- handle interrupted exceptions encountered when callback thread encountered them
- change handling to throw error if we try to set verification state and leaderLogIfLocal is None.

Reviewers: David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
jolshan committed Jul 14, 2023
1 parent d9253fe commit ea0bb00
Show file tree
Hide file tree
Showing 20 changed files with 719 additions and 154 deletions.
Expand Up @@ -793,19 +793,18 @@ private void failBatch(
Function<Integer, RuntimeException> recordExceptions,
boolean adjustSequenceNumbers
) {
if (transactionManager != null) {
try {
// This call can throw an exception in the rare case that there's an invalid state transition
// attempted. Catch these so as not to interfere with the rest of the logic.
transactionManager.handleFailedBatch(batch, topLevelException, adjustSequenceNumbers);
} catch (Exception e) {
log.debug("Encountered error when handling a failed batch", e);
}
}

this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

if (batch.completeExceptionally(topLevelException, recordExceptions)) {
if (transactionManager != null) {
try {
// This call can throw an exception in the rare case that there's an invalid state transition
// attempted. Catch these so as not to interfere with the rest of the logic.
transactionManager.handleFailedBatch(batch, topLevelException, adjustSequenceNumbers);
} catch (Exception e) {
log.debug("Encountered error when transaction manager was handling a failed batch", e);
}
}
maybeRemoveAndDeallocateBatch(batch);
}
}
Expand Down
Expand Up @@ -1030,6 +1030,11 @@ synchronized boolean isReady() {
return isTransactional() && currentState == State.READY;
}

// visible for testing
synchronized boolean isInitializing() {
return isTransactional() && currentState == State.INITIALIZING;
}

void handleCoordinatorReady() {
NodeApiVersions nodeApiVersions = transactionCoordinator != null ?
apiVersions.get(transactionCoordinator.idString()) :
Expand Down
Expand Up @@ -189,6 +189,10 @@ public void setDisconnectFuture(CompletableFuture<String> disconnectFuture) {

@Override
public void disconnect(String node) {
disconnect(node, false);
}

public void disconnect(String node, boolean allowLateResponses) {
long now = time.milliseconds();
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
Expand All @@ -197,7 +201,8 @@ public void disconnect(String node) {
short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), now, true, null, null, null));
iter.remove();
if (!allowLateResponses)
iter.remove();
}
}
CompletableFuture<String> curDisconnectFuture = disconnectFuture;
Expand Down
Expand Up @@ -3057,6 +3057,56 @@ public void testSenderShouldRetryWithBackoffOnRetriableError() {
assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
}

@Test
public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testFailTwice", 60000, 100, apiVersions);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();

// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce(); // send request

Node node = metadata.fetch().nodes().get(0);
time.sleep(2000L);
client.disconnect(node.idString(), true);
client.backoff(node, 10);

sender.runOnce(); // now expire the batch.
assertFutureFailure(request1, TimeoutException.class);

time.sleep(20);

sendIdempotentProducerResponse(0, tp0, Errors.INVALID_RECORD, -1);
sender.runOnce(); // receive late response

// Loop once and confirm that the transaction manager does not enter a fatal error state
sender.runOnce();
assertTrue(txnManager.hasAbortableError());
TransactionalRequestResult result = txnManager.beginAbort();
sender.runOnce();

respondToEndTxn(Errors.NONE);
sender.runOnce();
assertTrue(txnManager::isInitializing);
prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
sender.runOnce();
assertTrue(txnManager::isReady);

assertTrue(result.isSuccessful());
result.await();

txnManager.beginTransaction();
}

private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Expand Up @@ -576,8 +576,12 @@ class Partition(val topicPartition: TopicPartition,
}
}

def hasOngoingTransaction(producerId: Long): Boolean = {
leaderLogIfLocal.exists(leaderLog => leaderLog.hasOngoingTransaction(producerId))
// Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
def maybeStartTransactionVerification(producerId: Long): Object = {
leaderLogIfLocal match {
case Some(log) => log.maybeStartTransactionVerification(producerId)
case None => throw new NotLeaderOrFollowerException();
}
}

// Return true if the future replica exists and it has caught up with the current replica for this partition
Expand Down Expand Up @@ -1279,7 +1283,7 @@ class Partition(val topicPartition: TopicPartition,
}

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
requestLocal: RequestLocal): LogAppendInfo = {
requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
Expand All @@ -1293,7 +1297,7 @@ class Partition(val topicPartition: TopicPartition,
}

val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal)
interBrokerProtocolVersion, requestLocal, verificationGuard)

// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderLog))
Expand Down
63 changes: 58 additions & 5 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Expand Up @@ -577,6 +577,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
result
}

/**
* Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return null.
*/
def maybeStartTransactionVerification(producerId: Long): Object = lock synchronized {
if (hasOngoingTransaction(producerId))
null
else
getOrMaybeCreateVerificationGuard(producerId, true)
}

/**
* Maybe create the VerificationStateEntry for the given producer ID -- if an entry is present, return its verification guard, otherwise, return null.
*/
def getOrMaybeCreateVerificationGuard(producerId: Long, createIfAbsent: Boolean = false): Object = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId, createIfAbsent)
if (entry != null) entry.verificationGuard else null
}

/**
* Return true if the given producer ID has a transaction ongoing.
*/
def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
val entry = producerStateManager.activeProducers.get(producerId)
entry != null && entry.currentTxnFirstOffset.isPresent
Expand Down Expand Up @@ -662,9 +684,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.CLIENT,
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
requestLocal: RequestLocal = RequestLocal.NoCaching): LogAppendInfo = {
requestLocal: RequestLocal = RequestLocal.NoCaching,
verificationGuard: Object = null): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), ignoreRecordSize = false)
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
}

/**
Expand All @@ -681,6 +704,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validateAndAssignOffsets = false,
leaderEpoch = -1,
requestLocal = None,
verificationGuard = null,
// disable to check the validation of record size since the record is already accepted by leader.
ignoreRecordSize = true)
}
Expand Down Expand Up @@ -709,6 +733,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
verificationGuard: Object,
ignoreRecordSize: Boolean): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
Expand Down Expand Up @@ -833,7 +858,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, origin)
logOffsetMetadata, validRecords, origin, verificationGuard)

maybeDuplicate match {
case Some(duplicate) =>
Expand Down Expand Up @@ -961,7 +986,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,

private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,
origin: AppendOrigin):
origin: AppendOrigin,
requestVerificationGuard: Object):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
Expand All @@ -978,6 +1004,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (duplicateBatch.isPresent) {
return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get()))
}

// Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state.
// This guarantees that transactional records are never written to the log outside of the transaction coordinator's knowledge of an open transaction on
// the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
// There are two phases -- the first append to the log and subsequent appends.
//
// 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
// given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
// to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
// have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
// start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
// result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
//
// 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidRecordException("Record was not part of an ongoing transaction")
}

// We cache offset metadata for the start of each transaction. This allows us to
Expand All @@ -996,6 +1041,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
(updatedProducers, completedTxns.toList, None)
}

private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: Object): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && (requestVerificationGuard != getOrMaybeCreateVerificationGuard(batch.producerId) || requestVerificationGuard == null)
}

/**
* Validate the following:
* <ol>
Expand Down Expand Up @@ -1872,7 +1921,11 @@ object UnifiedLog extends Logging {
origin: AppendOrigin): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
val completedTxn = appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
// Whether we wrote a control marker or a data batch, we can remove verification guard since either the transaction is complete or we have a first offset.
if (batch.isTransactional)
producerStateManager.clearVerificationStateEntry(producerId)
completedTxn
}

/**
Expand Down

0 comments on commit ea0bb00

Please sign in to comment.