Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: remove DelayedOperations.checkAndCompleteFetch #9278

Merged
merged 1 commit into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ class DelayedOperations(topicPartition: TopicPartition,
deleteRecords.checkAndComplete(requestKey)
}

def checkAndCompleteFetch(): Unit = {
fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition))
}

def numDelayedDelete: Int = deleteRecords.numDelayed
}

Expand Down
128 changes: 7 additions & 121 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package kafka.cluster

import java.nio.ByteBuffer
import java.util.{Optional, Properties}
import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}

import com.yammer.metrics.core.Metric
import kafka.api.{ApiVersion, LeaderAndIsr}
Expand All @@ -29,24 +28,23 @@ import kafka.metrics.KafkaYammerMetrics
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
import org.junit.Test
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.junit.Assert._
import org.mockito.Mockito._
import org.scalatest.Assertions.assertThrows
import org.junit.Test
import org.mockito.ArgumentMatchers
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.Assertions.assertThrows
import unit.kafka.cluster.AbstractPartitionTest

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer

class PartitionTest extends AbstractPartitionTest {

Expand Down Expand Up @@ -725,8 +723,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)

assertTrue("Expected become leader transition to succeed",
Expand Down Expand Up @@ -922,102 +918,6 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicaIds)
}

/**
* Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks.
* Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they
* need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers
* (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for
* read lock of one Partition while holding on to read lock of another Partition.
*/
@Test
def testDelayedFetchAfterAppendRecords(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

```appendRecordsToLeader`` does not complete delayed fetch anymore (c2273ad) so this test can be removed.

val controllerEpoch = 0
val leaderEpoch = 5
val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicaIds
val logConfig = LogConfig(new Properties)

val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, () => logConfig) }
val partitions = ListBuffer.empty[Partition]

logs.foreach { log =>
val tp = log.topicPartition
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val partition = new Partition(tp,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
logManager)

when(delayedOperations.checkAndCompleteFetch())
.thenAnswer((invocation: InvocationOnMock) => {
// Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
val anotherPartition = (tp.partition + 1) % topicPartitions.size
val partition = partitions(anotherPartition)
partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
})

partition.setLog(log, isFutureLog = false)
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicaIds)
.setIsNew(true)
partition.makeLeader(leaderState, offsetCheckpoints)
partitions += partition
}

def createRecords(baseOffset: Long): MemoryRecords = {
val records = List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes))
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
baseOffset, time.milliseconds, 0)
records.foreach(builder.append)
builder.build()
}

val done = new AtomicBoolean()
val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
try {
// Invoke some operation that acquires leaderIsrUpdate write lock on one thread
executor.submit((() => {
while (!done.get) {
partitions.foreach(_.maybeShrinkIsr())
}
}): Runnable)
// Append records to partitions, one partition-per-thread
val futures = partitions.map { partition =>
executor.submit((() => {
(1 to 10000).foreach { _ =>
partition.appendRecordsToLeader(createRecords(baseOffset = 0),
origin = AppendOrigin.Client,
requiredAcks = 0)
}
}): Runnable)
}
futures.foreach(_.get(15, TimeUnit.SECONDS))
done.set(true)
} catch {
case e: TimeoutException =>
val allThreads = TestUtils.allThreadStackTraces()
fail(s"Test timed out with exception $e, thread stack traces: $allThreads")
} finally {
executor.shutdownNow()
executor.awaitTermination(5, TimeUnit.SECONDS)
}
}

def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
Expand Down Expand Up @@ -1079,8 +979,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = replicas

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)

val initializeTimeMs = time.milliseconds()
Expand Down Expand Up @@ -1138,8 +1036,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
"Expected become leader transition to succeed",
Expand Down Expand Up @@ -1200,8 +1096,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
partition.makeLeader(
Expand Down Expand Up @@ -1251,8 +1145,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
Expand Down Expand Up @@ -1305,8 +1197,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
Expand Down Expand Up @@ -1374,8 +1264,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue(
Expand Down Expand Up @@ -1429,8 +1317,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId, remoteBrokerId).asJava

doNothing().when(delayedOperations).checkAndCompleteFetch()

val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed",
Expand Down