diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 69751aeafe9f..abd7fcaf3c7f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -17,6 +17,7 @@ package kafka.coordinator.transaction import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean import kafka.coordinator.AbstractCoordinatorConcurrencyTest import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ @@ -117,6 +118,28 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren } } + @Test + def testConcurrentGoodPathWithConcurrentPartitionLoading(): Unit = { + // This is a somewhat contrived test case which reproduces the bug in KAFKA-9777. + // When a new partition needs to be loaded, we acquire the write lock in order to + // add the partition to the set of loading partitions. We should still be able to + // make progress with transactions even while this is ongoing. + + val keepRunning = new AtomicBoolean(true) + val t = new Thread() { + override def run(): Unit = { + while (keepRunning.get()) { + txnStateManager.addLoadingPartition(numPartitions + 1, coordinatorEpoch) + } + } + } + t.start() + + verifyConcurrentOperations(createTransactions, allOperations) + keepRunning.set(false) + t.join() + } + @Test def testConcurrentGoodPathSequence(): Unit = { verifyConcurrentOperations(createTransactions, allOperations)