-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends #8782
Conversation
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
Outdated
Show resolved
Hide resolved
@@ -285,15 +280,16 @@ class TransactionMarkerChannelManager(config: KafkaConfig, | |||
} | |||
} | |||
|
|||
private def maybeWriteTxnCompletion(transactionalId: String): Unit = { | |||
Option(transactionsWithPendingMarkers.get(transactionalId)).foreach { pendingCommitTxn => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple threads may see the transaction still as pending and attempt completion.
@@ -221,7 +221,6 @@ class TransactionMarkerChannelManagerTest { | |||
EasyMock.replay(metadataCache) | |||
|
|||
channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, txnMetadata1, txnMetadata1.prepareComplete(time.milliseconds())) | |||
channelManager.addTxnMarkersToSend(coordinatorEpoch, txnResult, txnMetadata2, txnMetadata2.prepareComplete(time.milliseconds())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change to use mock
instead of niceMock
led to a failure because of an unexpected append to the log. It seemed like this call was not necessary to test the behavior we were interested in here, so I removed it rather than adding the expected call to append.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just question on the unit test.
@@ -86,6 +90,70 @@ class TransactionMarkerChannelManagerTest { | |||
.anyTimes() | |||
} | |||
|
|||
@Test | |||
def shouldOnlyWriteTxnCompletionOnce(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test cover concurrent calls to maybeWriteTxnCompletion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does. I was trying to setup this test to fit how we're likely hitting this in practice. In the call to addTxnMarkersToSend
, before calling maybeWriteTxnCompletion
, we have to acquire the lock. It is possible that the caller fails to acquire the lock before the markers finish getting written and the transaction gets completed in the request completion handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, so when the bug still exist this test would probably not fail consistently, but would be flaky, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It fails deterministically without the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I missed the txnMetadata2.lock.lock()
before starting the scheduler, thanks.
@@ -291,7 +358,7 @@ class TransactionMarkerChannelManagerTest { | |||
|
|||
val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) | |||
for (requestAndHandler <- requestAndHandlers) { | |||
requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.PRODUCE, 0, "client", 1), | |||
requestAndHandler.handler.onComplete(new ClientResponse(new RequestHeader(ApiKeys.WRITE_TXN_MARKERS, 0, "client", 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Not sure why they did not fail before :)
retest this please |
2 similar comments
retest this please |
retest this please |
retest this please |
…licate appends (#8782) The method `maybeWriteTxnCompletion` is unsafe for concurrent calls. This can cause duplicate attempts to write the completion record to the log, which can ultimately lead to illegal state errors and possible to correctness violations if another transaction had been started before the duplicate was written. This patch fixes the problem by ensuring only one thread can successfully remove the pending completion from the map. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
…use_all_dns_ips-as-default * apache-github/trunk: KAFKA-9788; Use distinct names for transaction and group load time sensors (#8784) KAFKA-9514; The protocol generator generated useless condition when a field is made nullable and flexible version is used (#8793) MINOR: Update to Gradle 6.5 and tweak build jvm config (#8751) MINOR: Upgrade spotbugs and spotbugsPlugin (#8790) KAFKA-10089 The stale ssl engine factory is not closed after reconfigure (#8792) KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends (#8782) KAFKA-10084: Fix EosTestDriver end offset (#8785) KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (#8786)
The method
maybeWriteTxnCompletion
is unsafe for concurrent calls. This can cause duplicate attempts to write the completion record to the log, which can ultimately lead to illegal state errors and possible to correctness violations if another transaction had been started before the duplicate was written. This patch fixes the problem by ensuring only one thread can successfully remove the pending completion from the map.Committer Checklist (excluded from commit message)