Skip to content

Commit

Permalink
KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch (#…
Browse files Browse the repository at this point in the history
…7678)

Reviewers: Ismael Juma <ismael@juma.me.uk>
  • Loading branch information
rajinisivaram committed Nov 11, 2019
1 parent f15d318 commit 6f00086
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 21 deletions.
42 changes: 23 additions & 19 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Expand Up @@ -68,12 +68,13 @@ class DelayedFetch(delayMs: Long,
* The operation can be completed if:
*
* Case A: This broker is no longer the leader for some partitions it tries to fetch
* Case B: This broker does not know of some partitions it tries to fetch
* Case C: The fetch offset locates not on the last segment of the log
* Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case E: The partition is in an offline log directory on this broker
* Case F: This broker is the leader, but the requested epoch is now fenced
* Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Case B: The replica is no longer available on this broker
* Case C: This broker does not know of some partitions it tries to fetch
* Case D: The partition is in an offline log directory on this broker
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case H: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Upon completion, should return whatever data is available for each valid partition
*/
override def tryComplete(): Boolean = {
Expand All @@ -94,16 +95,16 @@ class DelayedFetch(delayMs: Long,
case FetchTxnCommitted => offsetSnapshot.lastStableOffset
}

// Go directly to the check for Case D if the message offsets are the same. If the log segment
// Go directly to the check for Case G if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case C.
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case C, this can happen when the new fetch operation is on a truncated leader
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case C, this can happen when the fetch operation is falling behind the current segment
// Case F, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
Expand All @@ -118,31 +119,34 @@ class DelayedFetch(delayMs: Long,
}

if (fetchMetadata.isFromFollower) {
// Case G check if the follower has the latest HW from the leader
// Case H check if the follower has the latest HW from the leader
if (partition.getReplica(fetchMetadata.replicaId)
.exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
return forceComplete()
}
}
}
} catch {
case _: KafkaStorageException => // Case E
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
case _: NotLeaderForPartitionException => // Case A
debug(s"Broker is no longer the leader of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case B
case _: ReplicaNotAvailableException => // Case B
debug(s"Broker no longer has a replica of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case C
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case F
case _: KafkaStorageException => // Case D
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case E
debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
return forceComplete()
case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
}
}

// Case D
// Case G
if (accumulatedSize >= fetchMetadata.fetchMinBytes)
forceComplete()
else
Expand Down
Expand Up @@ -19,11 +19,10 @@ package kafka.server
import java.util.Optional

import scala.collection.Seq

import kafka.cluster.{Partition, Replica}
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.FencedLeaderEpochException
import org.apache.kafka.common.errors.{FencedLeaderEpochException, ReplicaNotAvailableException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
Expand Down Expand Up @@ -82,6 +81,46 @@ class DelayedFetchTest extends EasyMockSupport {
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
}

@Test
def testReplicaNotAvailable(): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
val logStartOffset = 0L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)

var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}

val delayedFetch = new DelayedFetch(
delayMs = 500,
fetchMetadata = fetchMetadata,
replicaManager = replicaManager,
quota = replicaQuota,
clientMetadata = None,
responseCallback = callback)

val partition: Partition = mock(classOf[Partition])

EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
.andThrow(new ReplicaNotAvailableException(s"Replica for $topicPartition not available"))
expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.REPLICA_NOT_AVAILABLE)
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)

replayAll()

assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)
}

def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
Expand Down

0 comments on commit 6f00086

Please sign in to comment.