Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14372: choose replicas only from isr for preferred read replica #12877

Merged
merged 4 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1244,8 +1244,14 @@ class ReplicaManager(val config: KafkaConfig,

partition.remoteReplicas.foreach { replica =>
val replicaState = replica.stateSnapshot
// Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
if (replicaState.logEndOffset >= fetchOffset && replicaState.logStartOffset <= fetchOffset) {
// Exclude replicas that are not in the ISR as the follower may lag behind. Worst case, the follower
// will continue to lag and the consumer will fall behind the produce. The leader will
// continuously pick the lagging follower when the consumer refreshes its preferred read replica.
// This can go on indefinitely.
if (partition.inSyncReplicaIds.contains(replica.brokerId) &&
replicaState.logEndOffset >= fetchOffset &&
replicaState.logStartOffset <= fetchOffset) {

replicaInfoSet.add(new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replicaState.logEndOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.FetchResponse
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

Expand Down Expand Up @@ -84,9 +84,63 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
assertEquals(Errors.NONE, response.error)
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
} finally {
socket.close()
}
}

@Test
def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
// Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower.
val admin = createAdminClient()
TestUtils.createTopicWithAdmin(
admin,
topic,
brokers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)

TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)

val topicPartition = new TopicPartition(topic, 0)
val offsetMap = Map(topicPartition -> 10L)

val request = createConsumerFetchRequest(
maxResponseBytes = 1000,
maxPartitionBytes = 1000,
Seq(topicPartition),
offsetMap,
ApiKeys.FETCH.latestVersion,
maxWaitMs = 20000,
minBytes = 1,
rackId = followerBrokerId.toString
)
var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
assertEquals(Errors.NONE, response.error)
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
validatePreferredReadReplica(response, preferredReadReplica = 1)

// Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms
brokers(followerBrokerId).shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that the follower is removed from the ISR after this line or do we have a race condition with the below checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a check to confirm that the follower is no longer reachable.

forgot to comment but the test validates that the leader chooses a broker from the live set of brokers. this passes with the previous implementation as well.

this doesn't test follower falling out of ISR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok. i did not see this because i thought that the purpose of the patch was to verify the ISR behavior. i guess that this extra test does not hurt so we can keep it.

TestUtils.waitUntilTrue(() => {
val endpoints = brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition, listenerName)
!endpoints.contains(followerBrokerId)
}, "follower is still reachable.")

response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer)
assertEquals(Errors.NONE, response.error)
assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
validatePreferredReadReplica(response, preferredReadReplica = -1)
}

private def validatePreferredReadReplica(response: FetchResponse, preferredReadReplica: Int): Unit = {
assertEquals(1, response.data.responses.size)
response.data.responses.forEach { topicResponse =>
assertEquals(1, topicResponse.partitions.size)
topicResponse.partitions.forEach { partitionResponse =>
assertEquals(preferredReadReplica, partitionResponse.preferredReadReplica)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ class BaseFetchRequestTest extends BaseRequestTest {
offsetMap: Map[TopicPartition, Long],
version: Short,
maxWaitMs: Int = Int.MaxValue,
minBytes: Int = 0
minBytes: Int = 0,
rackId: String = ""
): FetchRequest = {
FetchRequest.Builder.forConsumer(version, maxWaitMs, minBytes, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
.setMaxBytes(maxResponseBytes).build()
.setMaxBytes(maxResponseBytes)
.rackId(rackId)
.build()
}

protected def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
Expand Down
81 changes: 77 additions & 4 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, ReplicaSelector, ReplicaView}
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
Expand Down Expand Up @@ -1189,7 +1190,7 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)

// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
Expand All @@ -1202,7 +1203,7 @@ class ReplicaManagerTest {
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())

val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
Expand Down Expand Up @@ -1245,7 +1246,7 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)

// Make this replica the leader
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
Expand All @@ -1258,7 +1259,7 @@ class ReplicaManagerTest {
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ())

val metadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
Expand All @@ -1279,6 +1280,75 @@ class ReplicaManagerTest {
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}

@Test
def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName))

try {
val leaderBrokerId = 0
val followerBrokerId = 1
val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a")
val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b")
val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
val topicId = Uuid.randomUuid()
val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0)

when(replicaManager.metadataCache.getPartitionReplicaEndpoints(
tp0,
new ListenerName("default")
)).thenReturn(Map(
leaderBrokerId -> leaderNode,
followerBrokerId -> followerNode
).toMap)

// Make this replica the leader and remove follower from ISR.
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion,
0,
0,
brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(leaderBrokerId)
.setLeaderEpoch(1)
.setIsr(Seq[Integer](leaderBrokerId).asJava)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(leaderNode, followerNode).asJava).build()

replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ())

val metadata = new DefaultClientMetadata("rack-b", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")

val consumerResult = fetchPartitionAsConsumer(
replicaManager,
tidp0,
new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()),
clientMetadata = Some(metadata)
)

// Fetch from leader succeeds
assertTrue(consumerResult.hasFired)
dajac marked this conversation as resolved.
Show resolved Hide resolved

// PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR
val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 0, 0))
val partitionView = replicaManager.replicaSelectorOpt.get
.asInstanceOf[MockReplicaSelector].getPartitionViewArgument

assertTrue(partitionView.isDefined)
assertEquals(expectedReplicaViews.asJava, partitionView.get.replicas)
} finally {
replicaManager.shutdown()
}
}

@Test
def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
Expand Down Expand Up @@ -4174,11 +4244,14 @@ class ReplicaManagerTest {
class MockReplicaSelector extends ReplicaSelector {

private val selectionCount = new AtomicLong()
private var partitionViewArgument: Option[PartitionView] = None

def getSelectionCount: Long = selectionCount.get
def getPartitionViewArgument: Option[PartitionView] = partitionViewArgument

override def select(topicPartition: TopicPartition, clientMetadata: ClientMetadata, partitionView: PartitionView): Optional[ReplicaView] = {
selectionCount.incrementAndGet()
partitionViewArgument = Some(partitionView)
Optional.of(partitionView.leader)
}
}