diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a2c18c241d..7329eae2e6 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -572,6 +572,13 @@ class Partition(val topicPartition: TopicPartition, } // AutoMQ for Kafka inject start + private def checkClosed(): Unit = { + if (closed) { + throw new NotLeaderOrFollowerException("Leader %d for partition %s on broker %d is already closed" + .format(localBrokerId, topicPartition, localBrokerId)) + } + } + /** * Close this partition and trigger a re-election. * It may be messy here, but it is necessary to ensure the following steps: @@ -1268,12 +1275,10 @@ class Partition(val topicPartition: TopicPartition, def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int, requestLocal: RequestLocal): LogAppendInfo = { + checkClosed() val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { // AutoMQ for Kafka inject start - if (closed) { - throw new NotLeaderOrFollowerException("Leader %d for partition %s on broker %d is already closed" - .format(localBrokerId, topicPartition, localBrokerId)) - } + checkClosed() // AutoMQ for Kafka inject end leaderLogIfLocal match { case Some(leaderLog) => @@ -1372,7 +1377,9 @@ class Partition(val topicPartition: TopicPartition, logReadInfo } else { + checkClosed() inReadLock(leaderIsrUpdateLock) { + checkClosed() val localLog = localLogWithEpochOrThrow( fetchPartitionData.currentLeaderEpoch, fetchParams.fetchOnlyLeader