New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8334 Make sure the thread which tries to complete delayed reque… #8657
Conversation
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
Outdated
Show resolved
Hide resolved
@rajinisivaram @junrao @windkit please take a look :) |
507196d
to
51884d0
Compare
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the PR. Sorry for the delay. Made a pass of non-testing files. Overall, I felt that this approach works. It adds its own complexity, but it's probably better than adding a separate thread pool. A few comments below.
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
Outdated
Show resolved
Hide resolved
51884d0
to
e2f74f9
Compare
core/src/test/scala/unit/kafka/coordinator/group/DelayedJoinTest.scala
Outdated
Show resolved
Hide resolved
e2f74f9
to
3ac14e7
Compare
3ac14e7
to
b013041
Compare
@junrao Could you take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the updated PR. Made a pass of all files. A few more comments below.
offsetsPartitions.map(_.partition).toSet, isCommit = isCommit) | ||
catch { | ||
case e: IllegalStateException if isCommit | ||
&& e.getMessage.contains("though the offset commit record itself hasn't been appended to the log")=> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why do we need this logic now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestReplicaManager#appendRecords (https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala#L207) always complete the delayedProduce immediately so the txn offset is append also. This PR tries to complete the delayedProduce after releasing the group lock so it is possible to cause following execution order.
- txn prepare
- txn completion (fail)
- txn append (this is executed by delayedProduce)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I am still not sure that I fully understand this. It seems that by not completing the delayedProduce within the group lock, we are hitting IllegalStateException. That seems a bug. Do you know which code depends on that? It seems that we do hold a group lock when updating the txnOffset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems a bug.
The root cause (changed by this PR) is that the "txn initialization" and "txn append" are not executed within same lock.
The test story is shown below.
CommitTxnOffsetsOperation
calls GroupMetadata.prepareTxnOffsetCommit
to add CommitRecordMetadataAndOffset(None, offsetAndMetadata)
to pendingTransactionalOffsetCommits
(this is the link you attached).
GroupMetadata.completePendingTxnOffsetCommit
called by CompleteTxnOperation
throws IllegalStateException
if CommitRecordMetadataAndOffset.appendedBatchOffset
is None
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L664).
Why it does not cause error before?
CommitRecordMetadataAndOffset.appendedBatchOffset
is updated by the callback putCacheCallback
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407). TestReplicManager
always create delayedProduce
do handle the putCacheCallback
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala#L188). The condition to complete the delayedProduce
is completeAttempts.incrementAndGet() >= 3
. And the condition gets true when call both producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
and tryCompleteDelayedRequests()
since the former calls tryComplete
two times and another calls tryComplete
once. It means putCacheCallback
is always executed by TestReplicManager.appendRecords
and noted that TestReplicManager.appendRecords
is executed within a group lock (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L738) . In short, txn initialization (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L464) and txn append (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L407) are executed with same group lock. Hence, the following execution order is impossible.
- txn initialization
- txn completion
- txn append
However, this PR disable to complete delayed requests within group lock held by caller. The putCacheCallback
which used to append txn needs to require group lock again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great explanation. I understand the issue now. Essentially, this exposed a limitation of the existing test. The existing test happens to work because the producer callbacks are always completed in the same ReplicaManager.appendRecords() call under the group lock. However, this is not necessarily the general case.
Your fix works, but may hide other real problems. I was thinking that another way to fix this is to change the test a bit. For example, we expect CompleteTxnOperation to happen after CommitTxnOffsetsOperation. So, instead of letting them run in parallel, we can change the test to make sure that CompleteTxnOperation only runs after CommitTxnOffsetsOperation completes successfully. JoinGroupOperation and SyncGroupOperation might need a similar consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we expect CompleteTxnOperation to happen after CommitTxnOffsetsOperation. So, instead of letting them run in parallel, we can change the test to make sure that CompleteTxnOperation only runs after CommitTxnOffsetsOperation completes successfully.
will roger that !
JoinGroupOperation and SyncGroupOperation might need a similar consideration.
I didn't notice something interesting. Could you share it with me?
@@ -536,6 +537,11 @@ class GroupCoordinatorTest { | |||
// Make sure the NewMemberTimeout is not still in effect, and the member is not kicked | |||
assertEquals(1, group.size) | |||
|
|||
// prepare the mock replica manager again since the delayed join is going to complete | |||
EasyMock.reset(replicaManager) | |||
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why do we need to mock this since replicaManager.getMagic() is only called through replicaManager.handleWriteTxnMarkersRequest()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GroupMetadataManager#storeGroup
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L245) also call ReplicaManager.getMagic
.
There are delayed ops are completed by timer.advanceClock
so we have to mock the replicaManager.getMagic
. the mock is same to https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala#L3823.
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
Outdated
Show resolved
Hide resolved
So... could we keep it simpler? |
4a9cbc9
to
d8beeab
Compare
the flaky is traced by #8853 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the updated PR. Just one comment below. Also, there are a few comments not addressed from the previous round.
It will be helpful if you could preserve the commit history in future updates to the PR since that makes it easier to identify the delta changes.
my bad :( I'll keep that in mind |
d8beeab
to
8893158
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the updated PR. Added a few more comments below.
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
Outdated
Show resolved
Hide resolved
offsetsPartitions.map(_.partition).toSet, isCommit = isCommit) | ||
catch { | ||
case e: IllegalStateException if isCommit | ||
&& e.getMessage.contains("though the offset commit record itself hasn't been appended to the log")=> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great explanation. I understand the issue now. Essentially, this exposed a limitation of the existing test. The existing test happens to work because the producer callbacks are always completed in the same ReplicaManager.appendRecords() call under the group lock. However, this is not necessarily the general case.
Your fix works, but may hide other real problems. I was thinking that another way to fix this is to change the test a bit. For example, we expect CompleteTxnOperation to happen after CommitTxnOffsetsOperation. So, instead of letting them run in parallel, we can change the test to make sure that CompleteTxnOperation only runs after CommitTxnOffsetsOperation completes successfully. JoinGroupOperation and SyncGroupOperation might need a similar consideration.
8893158
to
85003c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the updates PR. Just a few more comments below.
groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) | ||
val lock = new ReentrantLock() | ||
val producerId = producerIdCount | ||
producerIdCount += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the intention for the test is probably to use the same producerId since it tests more on transactional conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. However, the same producerId means the group completed by CompleteTxnOperation is possible to be impacted by any CommitTxnOffsetsOperation (since the partitions are same also). Hence, the side-effect is that we need a single lock to control the happen-before of txn completion and commit so the test will get slower.
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
Outdated
Show resolved
Hide resolved
85003c3
to
142a6c4
Compare
@junrao Thanks for all reviews again 👍
my bad. I forgot this request :( Expect for
It seems we can remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the updated PR. Just a few more minor comments.
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the new update. A few more minor comments.
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the updated PR. A few more minor comments below.
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
Outdated
Show resolved
Hide resolved
…sts does NOT hold any group lock
…lse to DelayedOperation
68c39cc
to
fbd4656
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 : Thanks for the latest changes. LGTM.
Latest system result has 1 failure.
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-09-08--001.1599611744--chia7712--fix_8334_avoid_deadlock--fbd46565a/report.html
Also, are the jenkins test failures related to this PR?
@ijuma @hachikuji @rajinisivaram : I think this PR is ready to be merged. Any further comments from you? |
On my local, they are flaky on trunk branch. |
@chia7712 : Thanks a lot for staying on this tricky issue and finding a simpler solution! |
thanks for all suggestions. I benefit a lot from it. |
The main changes of this PR are shown below.
tryLock
bylock
forDelayedOperation#maybeTryComplete
BEFORE
AFTER
Committer Checklist (excluded from commit message)