Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9171: Handle ReplicaNotAvailableException during DelayedFetch #7678

Merged
merged 2 commits into from Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 23 additions & 19 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Expand Up @@ -68,12 +68,13 @@ class DelayedFetch(delayMs: Long,
* The operation can be completed if:
*
* Case A: This broker is no longer the leader for some partitions it tries to fetch
* Case B: This broker does not know of some partitions it tries to fetch
* Case C: The fetch offset locates not on the last segment of the log
* Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case E: The partition is in an offline log directory on this broker
* Case F: This broker is the leader, but the requested epoch is now fenced
* Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Case B: The replica is no longer available on this broker
* Case C: This broker does not know of some partitions it tries to fetch
* Case D: The partition is in an offline log directory on this broker
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case H: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Upon completion, should return whatever data is available for each valid partition
*/
override def tryComplete(): Boolean = {
Expand All @@ -94,16 +95,16 @@ class DelayedFetch(delayMs: Long,
case FetchTxnCommitted => offsetSnapshot.lastStableOffset
}

// Go directly to the check for Case D if the message offsets are the same. If the log segment
// Go directly to the check for Case G if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case C.
// which would incorrectly be seen as an instance of Case F.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case C, this can happen when the new fetch operation is on a truncated leader
// Case F, this can happen when the new fetch operation is on a truncated leader
debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicPartition.")
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case C, this can happen when the fetch operation is falling behind the current segment
// Case F, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.")
// We will not force complete the fetch request if a replica should be throttled.
Expand All @@ -118,31 +119,34 @@ class DelayedFetch(delayMs: Long,
}

if (fetchMetadata.isFromFollower) {
// Case G check if the follower has the latest HW from the leader
// Case H check if the follower has the latest HW from the leader
if (partition.getReplica(fetchMetadata.replicaId)
.exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
return forceComplete()
}
}
}
} catch {
case _: KafkaStorageException => // Case E
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
case _: NotLeaderForPartitionException => // Case A
debug(s"Broker is no longer the leader of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case B
case _: ReplicaNotAvailableException => // Case B
debug(s"Broker no longer has a replica of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case C
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case F
case _: KafkaStorageException => // Case D
debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchMetadata immediately")
return forceComplete()
case _: FencedLeaderEpochException => // Case E
debug(s"Broker is the leader of partition $topicPartition, but the requested epoch " +
s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately")
return forceComplete()
case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
}
}

// Case D
// Case G
if (accumulatedSize >= fetchMetadata.fetchMinBytes)
forceComplete()
else
Expand Down
Expand Up @@ -19,11 +19,10 @@ package kafka.server
import java.util.Optional

import scala.collection.Seq

import kafka.cluster.{Partition, Replica}
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.FencedLeaderEpochException
import org.apache.kafka.common.errors.{FencedLeaderEpochException, ReplicaNotAvailableException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
Expand Down Expand Up @@ -82,6 +81,46 @@ class DelayedFetchTest extends EasyMockSupport {
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
}

@Test
def testReplicaNotAvailable(): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
val logStartOffset = 0L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1

val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)

var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}

val delayedFetch = new DelayedFetch(
delayMs = 500,
fetchMetadata = fetchMetadata,
replicaManager = replicaManager,
quota = replicaQuota,
clientMetadata = None,
responseCallback = callback)

val partition: Partition = mock(classOf[Partition])

EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
.andThrow(new ReplicaNotAvailableException(s"Replica for $topicPartition not available"))
expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.REPLICA_NOT_AVAILABLE)
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)

replayAll()

assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)
}

def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
Expand Down