Skip to content

Commit

Permalink
Add new concurrency test case to reproduce bug
Browse files Browse the repository at this point in the history
  • Loading branch information
hachikuji committed Mar 30, 2020
1 parent 88b8b0f commit 2ffe6c0
Showing 1 changed file with 23 additions and 0 deletions.
Expand Up @@ -17,6 +17,7 @@
package kafka.coordinator.transaction

import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean

import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
Expand Down Expand Up @@ -117,6 +118,28 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
}
}

@Test
def testConcurrentGoodPathWithConcurrentPartitionLoading(): Unit = {
// This is a somewhat contrived test case which reproduces the bug in KAFKA-9777.
// When a new partition needs to be loaded, we acquire the write lock in order to
// add the partition to the set of loading partitions. We should still be able to
// make progress with transactions even while this is ongoing.

val keepRunning = new AtomicBoolean(true)
val t = new Thread() {
override def run(): Unit = {
while (keepRunning.get()) {
txnStateManager.addLoadingPartition(numPartitions + 1, coordinatorEpoch)
}
}
}
t.start()

verifyConcurrentOperations(createTransactions, allOperations)
keepRunning.set(false)
t.join()
}

@Test
def testConcurrentGoodPathSequence(): Unit = {
verifyConcurrentOperations(createTransactions, allOperations)
Expand Down

0 comments on commit 2ffe6c0

Please sign in to comment.