Skip to content

Commit

Permalink
KAFKA-14458: Introduce RPC support during ZK migration #13028
Browse files Browse the repository at this point in the history
Add infrastructure for sending UpdateMetadataRequest and LeaderAndIsr RPCs during the migration
process from ZK to KRaft. The new classes use ControllerChannelManager to send the RPCs.  The
information to send comes from MetadataDelta and MetadataImage.

Reviewers: David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
akhileshchg authored and cmccabe committed Jan 5, 2023
1 parent ef3b581 commit 0e51a20
Show file tree
Hide file tree
Showing 17 changed files with 763 additions and 117 deletions.
19 changes: 18 additions & 1 deletion core/src/main/scala/kafka/cluster/Broker.scala
Expand Up @@ -18,17 +18,18 @@
package kafka.cluster

import java.util

import kafka.common.BrokerEndPointNotAvailableException
import kafka.server.KafkaConfig
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.feature.Features._
import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.metadata.{BrokerRegistration, VersionRange}
import org.apache.kafka.server.authorizer.AuthorizerServerInfo

import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

object Broker {
Expand All @@ -41,6 +42,22 @@ object Broker {
def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = {
new Broker(id, endPoints, rack, emptySupportedFeatures)
}

private def supportedFeatures(features: java.util.Map[String, VersionRange]): java.util
.Map[String, SupportedVersionRange] = {
features.asScala.map { case (name, range) =>
name -> new SupportedVersionRange(range.min(), range.max())
}.asJava
}

def fromBrokerRegistration(registration: BrokerRegistration): Broker = {
new Broker(
registration.id(),
registration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
registration.rack().asScala,
Features.supportedFeatures(supportedFeatures(registration.supportedFeatures()))
)
}
}

/**
Expand Down
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller

import kafka.cluster.Broker
import org.apache.kafka.common.{TopicPartition, Uuid}

trait ControllerChannelContext {
def isTopicDeletionInProgress(topicName: String): Boolean

def topicIds: collection.Map[String, Uuid]

def liveBrokerIdAndEpochs: collection.Map[Int, Long]

def liveOrShuttingDownBrokers: collection.Set[Broker]

def isTopicQueuedUpForDeletion(topic: String): Boolean

def isReplicaOnline(brokerId: Int, partition: TopicPartition): Boolean

def partitionReplicaAssignment(partition: TopicPartition): collection.Seq[Int]

def leaderEpoch(topicPartition: TopicPartition): Int

def liveOrShuttingDownBrokerIds: collection.Set[Int]

def partitionLeadershipInfo(topicPartition: TopicPartition): Option[LeaderIsrAndControllerEpoch]
}
243 changes: 146 additions & 97 deletions core/src/main/scala/kafka/controller/ControllerChannelManager.scala

Large diffs are not rendered by default.

26 changes: 23 additions & 3 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Expand Up @@ -17,6 +17,7 @@

package kafka.controller

import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.utils.Implicits._
import org.apache.kafka.common.{TopicPartition, Uuid}
Expand Down Expand Up @@ -72,7 +73,7 @@ case class ReplicaAssignment private (replicas: Seq[Int],
s"removingReplicas=${removingReplicas.mkString(",")})"
}

class ControllerContext {
class ControllerContext extends ControllerChannelContext {
val stats = new ControllerStats
var offlinePartitionCount = 0
var preferredReplicaImbalanceCount = 0
Expand Down Expand Up @@ -230,7 +231,11 @@ class ControllerContext {
}.toSet
}

def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition, includeShuttingDownBrokers: Boolean = false): Boolean = {
def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition): Boolean = {
isReplicaOnline(brokerId, topicPartition, includeShuttingDownBrokers = false)
}

def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition, includeShuttingDownBrokers: Boolean): Boolean = {
val brokerOnline = {
if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId)
else liveBrokerIds.contains(brokerId)
Expand Down Expand Up @@ -445,6 +450,22 @@ class ControllerContext {
Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
}

def partitionLeaderAndIsr(partition: TopicPartition): Option[LeaderAndIsr] = {
partitionLeadershipInfo.get(partition).map(_.leaderAndIsr)
}

def leaderEpoch(partition: TopicPartition): Int = {
// A sentinel (-2) is used as an epoch if the topic is queued for deletion. It overrides
// any existing epoch.
if (isTopicQueuedUpForDeletion(partition.topic)) {
LeaderAndIsr.EpochDuringDelete
} else {
partitionLeadershipInfo.get(partition)
.map(_.leaderAndIsr.leaderEpoch)
.getOrElse(LeaderAndIsr.NoEpoch)
}
}

def partitionLeadershipInfo(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
partitionLeadershipInfo.get(partition)
}
Expand Down Expand Up @@ -524,5 +545,4 @@ class ControllerContext {

private def isValidPartitionStateTransition(partition: TopicPartition, targetState: PartitionState): Boolean =
targetState.validPreviousStates.contains(partitionStates(partition))

}
12 changes: 9 additions & 3 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Expand Up @@ -90,8 +90,14 @@ class KafkaController(val config: KafkaConfig,
private val isAlterPartitionEnabled = config.interBrokerProtocolVersion.isAlterPartitionSupported
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
stateChangeLogger, threadNamePrefix)
var controllerChannelManager = new ControllerChannelManager(
() => controllerContext.epoch,
config,
time,
metrics,
stateChangeLogger,
threadNamePrefix
)

// have a separate scheduler for the controller to be able to start and stop independently of the kafka server
// visible for testing
Expand Down Expand Up @@ -928,7 +934,7 @@ class KafkaController(val config: KafkaConfig,
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
controllerChannelManager.startup()
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")
Expand Down
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.migration

import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{ControllerChannelContext, LeaderIsrAndControllerEpoch}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.MetadataImage

import scala.jdk.CollectionConverters._

object MigrationControllerChannelContext {
def isReplicaOnline(image: MetadataImage, brokerId: Int, replicaAssignment: Set[Int]): Boolean = {
val brokerOnline = image.cluster().containsBroker(brokerId)
brokerOnline && replicaAssignment.contains(brokerId)
}

def partitionReplicaAssignment(image: MetadataImage, tp: TopicPartition): collection.Seq[Int] = {
image.topics().topicsByName().asScala.get(tp.topic()) match {
case Some(topic) => topic.partitions().asScala.get(tp.partition()) match {
case Some(partition) => partition.replicas.toSeq
case None => collection.Seq.empty
}
case None => collection.Seq.empty
}
}

def partitionLeadershipInfo(image: MetadataImage, topicPartition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
image.topics().topicsByName().asScala.get(topicPartition.topic()) match {
case Some(topic) => topic.partitions().asScala.get(topicPartition.partition()) match {
case Some(partition) =>
val leaderAndIsr = LeaderAndIsr(partition.leader, partition.leaderEpoch, partition.isr.toList,
partition.leaderRecoveryState, partition.partitionEpoch)
Some(LeaderIsrAndControllerEpoch(leaderAndIsr, image.highestOffsetAndEpoch().epoch()))
case None => None
}
case None => None
}
}
}

sealed class MigrationControllerChannelContext(
val image: MetadataImage
) extends ControllerChannelContext {
override def isTopicDeletionInProgress(topicName: String): Boolean = {
!image.topics().topicsByName().containsKey(topicName)
}

override val topicIds: collection.Map[String, Uuid] = {
image.topics().topicsByName().asScala.map {
case (name, topic) => name -> topic.id()
}.toMap
}

override val liveBrokerIdAndEpochs: collection.Map[Int, Long] = {
image.cluster().zkBrokers().asScala.map {
case (brokerId, broker) => brokerId.intValue() -> broker.epoch()
}
}

override val liveOrShuttingDownBrokers: collection.Set[Broker] = {
image.cluster().zkBrokers().asScala.values.map { registration =>
Broker.fromBrokerRegistration(registration)
}.toSet
}

override def isTopicQueuedUpForDeletion(topic: String): Boolean = {
!image.topics().topicsByName().containsKey(topic)
}

override def isReplicaOnline(brokerId: Int, partition: TopicPartition): Boolean = {
MigrationControllerChannelContext.isReplicaOnline(
image, brokerId, partitionReplicaAssignment(partition).toSet)
}

override def partitionReplicaAssignment(tp: TopicPartition): collection.Seq[Int] = {
MigrationControllerChannelContext.partitionReplicaAssignment(image, tp)
}

override def leaderEpoch(topicPartition: TopicPartition): Int = {
// Topic is deleted use a special sentinel -2 to the indicate the same.
if (isTopicQueuedUpForDeletion(topicPartition.topic())) {
LeaderAndIsr.EpochDuringDelete
} else {
image.topics().topicsByName.asScala.get(topicPartition.topic()) match {
case Some(topic) => topic.partitions().asScala.get(topicPartition.partition()) match {
case Some(partition) => partition.leaderEpoch
case None => LeaderAndIsr.NoEpoch
}
case None => LeaderAndIsr.NoEpoch
}
}
}

override val liveOrShuttingDownBrokerIds: collection.Set[Int] = liveBrokerIdAndEpochs.keySet

override def partitionLeadershipInfo(topicPartition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
MigrationControllerChannelContext.partitionLeadershipInfo(image, topicPartition)
}
}

0 comments on commit 0e51a20

Please sign in to comment.