-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6096: Add multi-threaded tests for group coordinator, txn manager #4122
KAFKA-6096: Add multi-threaded tests for group coordinator, txn manager #4122
Conversation
@hachikuji These are the tests I have so far. They currently test only the good paths, but would have been sufficient to detect the deadlocks in KAFKA-5970 and KAFKA-6042. All the error paths need to be added as well, but it will be good if you can review the current code to see if there is a simpler way of adding all the cases to the test. |
|
||
@Test | ||
def verifyGoodPathConcurrency() { | ||
val operations: Seq[GroupOperation[ _, _]] = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the approach. But I guess we don't have any tests with multiple members in the same group yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests create multiple members in each group (5 groups with 5 members each at the moment, run concurrently across 5 threads).
import scala.collection.mutable | ||
import scala.collection.JavaConverters._ | ||
|
||
class TransactionStateManagerConcurrencyTest extends AbstractCoordinatorConcurrencyTest[Transaction] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, why do this at the lower level instead of the way you did the group coordinator tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thank you for the review. I was trying to use logic from existing mock tests in both cases to trigger the operations. The transaction coordinator unit tests use a mock TransactionStateManager
and various other mocks, so it looked like a lot more work to get those to trigger the path with appendRecords
. But it does make sense to write the tests at the coordinator level, so I will update.
5e39e73
to
ed8fe07
Compare
730e742
to
da1c9cb
Compare
retest this please |
Looks like
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. This testing is very valuable. I added a few minor comments and questions.
var hasMore = true | ||
while (hasMore) { | ||
hasMore = false | ||
val head = taskQueue.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a little unconventional to use ".synchronized" instead of a space.
// Run some random operations | ||
RandomOperationSequence(createMembers(s"random$i"), operations).run() | ||
|
||
// Check that proper sequences till work correctly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "still"?
abstract class OperationSequence(members: Set[M], operations: Seq[Operation]) { | ||
def actionSequence: Seq[Set[Action]] | ||
def run(): Unit = { | ||
actionSequence.foreach { actions => verifyConcurrentActions(actions) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could just be actionSequence.foreach(verifyConcurrentActions)
} | ||
|
||
override def appendRecords(timeout: Long, | ||
requiredAcks: Short, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should align with the previous argument?
override def setUp() { | ||
super.setUp() | ||
|
||
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand this comment. How do we actually ensure that one of the partitions remains unowned by the coordinator? Also, what is the point of doing so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, copy-paste from another test. Removed comment.
new InitProducerIdOperation(), | ||
new AddPartitionsToTransactionOperation(Set(new TopicPartition("topic", 0))), | ||
new EndTransactionOperation(TransactionResult.COMMIT), | ||
new EndTransactionOperation(TransactionResult.ABORT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why we have two EndTxn operaitons. If we run them sequentially, the last one would cause an unexpected transition. Is that a useful case to check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to test abort as well as commit, but you are right, this doesn't really help. I have changed the code to do commit for half of the transactions and abort for the other half, so that the sequence makes sense.
} | ||
} | ||
|
||
private def loadUnloadActions(firstPartitionSet: Set[Int], secondPartitionSet: Set[Int]): Set[Action] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any better names we can choose for these arguments?
} | ||
|
||
@Test | ||
def testConcurrentLoadUnloadPartitions(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some high level comments would be helpful for these load/unload test cases since the scenario is a little more complex.
} | ||
} | ||
|
||
class AddPartitionsToTransactionOperation(partitions: Set[TopicPartition]) extends TxnOperation[Errors] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe we could just use Txn
instead of Transaction
like we do elsewhere.
|
||
def createResponse(request: WriteTxnMarkersRequest): WriteTxnMarkersResponse = { | ||
val pidErrorMap = request.markers.asScala.map { marker => | ||
(marker.producerId().asInstanceOf[java.lang.Long], marker.partitions.asScala.map { tp => (tp, Errors.NONE) }.toMap.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: drop parenthesis after producerId
da1c9cb
to
7a5594d
Compare
@hachikuji Thank you for reviewing this PR. And sorry about the delay in addressing the comments. I have updated the PR and rebased. Can you take another look when you have some time? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
retest this please |
The failures seem unrelated. I will merge to trunk and 1.0. |
Well, I had intended to merge to 1.0, but there are conflicts unfortunately with the new zk client. Maybe that's ok since we're unlikely to make major changes to in 1.0 anyway. |
No description provided.