Skip to content

Commit

Permalink
MINOR: remove DelayedOperations.checkAndCompleteFetch (#9278)
Browse files Browse the repository at this point in the history
Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
chia7712 committed Sep 10, 2020
1 parent 86013dc commit 6bba661
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 125 deletions.
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ class DelayedOperations(topicPartition: TopicPartition,
deleteRecords.checkAndComplete(requestKey)
}

def checkAndCompleteFetch(): Unit = {
fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition))
}

def numDelayedDelete: Int = deleteRecords.numDelayed
}

Expand Down
128 changes: 7 additions & 121 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package kafka.cluster

import java.nio.ByteBuffer
import java.util.{Optional, Properties}
import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}

import com.yammer.metrics.core.Metric
import kafka.api.{ApiVersion, LeaderAndIsr}
Expand All @@ -29,24 +28,23 @@ import kafka.metrics.KafkaYammerMetrics
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
import org.junit.Test
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.junit.Assert._
import org.mockito.Mockito._
import org.scalatest.Assertions.assertThrows
import org.junit.Test
import org.mockito.ArgumentMatchers
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.Assertions.assertThrows
import unit.kafka.cluster.AbstractPartitionTest

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer

class PartitionTest extends AbstractPartitionTest {

Expand Down Expand Up @@ -725,8 +723,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)

assertTrue("Expected become leader transition to succeed",
Expand Down Expand Up @@ -922,102 +918,6 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicaIds)
}

/**
* Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks.
* Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they
* need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers
* (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for
* read lock of one Partition while holding on to read lock of another Partition.
*/
@Test
def testDelayedFetchAfterAppendRecords(): Unit = {
val controllerEpoch = 0
val leaderEpoch = 5
val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicaIds
val logConfig = LogConfig(new Properties)

val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, () => logConfig) }
val partitions = ListBuffer.empty[Partition]

logs.foreach { log =>
val tp = log.topicPartition
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val partition = new Partition(tp,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
logManager)

when(delayedOperations.checkAndCompleteFetch())
.thenAnswer((invocation: InvocationOnMock) => {
// Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
val anotherPartition = (tp.partition + 1) % topicPartitions.size
val partition = partitions(anotherPartition)
partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
})

partition.setLog(log, isFutureLog = false)
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicaIds)
.setIsNew(true)
partition.makeLeader(leaderState, offsetCheckpoints)
partitions += partition
}

def createRecords(baseOffset: Long): MemoryRecords = {
val records = List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes))
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
baseOffset, time.milliseconds, 0)
records.foreach(builder.append)
builder.build()
}

val done = new AtomicBoolean()
val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
try {
// Invoke some operation that acquires leaderIsrUpdate write lock on one thread
executor.submit((() => {
while (!done.get) {
partitions.foreach(_.maybeShrinkIsr())
}
}): Runnable)
// Append records to partitions, one partition-per-thread
val futures = partitions.map { partition =>
executor.submit((() => {
(1 to 10000).foreach { _ =>
partition.appendRecordsToLeader(createRecords(baseOffset = 0),
origin = AppendOrigin.Client,
requiredAcks = 0)
}
}): Runnable)
}
futures.foreach(_.get(15, TimeUnit.SECONDS))
done.set(true)
} catch {
case e: TimeoutException =>
val allThreads = TestUtils.allThreadStackTraces()
fail(s"Test timed out with exception $e, thread stack traces: $allThreads")
} finally {
executor.shutdownNow()
executor.awaitTermination(5, TimeUnit.SECONDS)
}
}

def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
Expand Down Expand Up @@ -1079,8 +979,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = replicas

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)

val initializeTimeMs = time.milliseconds()
Expand Down Expand Up @@ -1138,8 +1036,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
"Expected become leader transition to succeed",
Expand Down Expand Up @@ -1200,8 +1096,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(
Expand Down Expand Up @@ -1251,8 +1145,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
Expand Down Expand Up @@ -1305,8 +1197,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
Expand Down Expand Up @@ -1374,8 +1264,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
Expand Down Expand Up @@ -1429,8 +1317,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
Expand Down

0 comments on commit 6bba661

Please sign in to comment.