Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15799 Handle full metadata updates on ZK brokers #14719

Merged
merged 5 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,34 @@
// Abstract class for all control requests including UpdateMetadataRequest, LeaderAndIsrRequest and StopReplicaRequest
public abstract class AbstractControlRequest extends AbstractRequest {

/**
* Indicates if a controller request is incremental, full, or unknown.
* Used by LeaderAndIsrRequest.Type and UpdateMetadataRequest.Type fields.
*/
public enum Type {
UNKNOWN(0),
INCREMENTAL(1),
FULL(2);

private final byte type;
private Type(int type) {
this.type = (byte) type;
}

public byte toByte() {
return type;
}

public static Type fromByte(byte type) {
for (Type t : Type.values()) {
if (t.type == type) {
return t;
}
}
return UNKNOWN;
}
}

public static final long UNKNOWN_BROKER_EPOCH = -1L;

public static abstract class Builder<T extends AbstractRequest> extends AbstractRequest.Builder<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,6 @@

public class LeaderAndIsrRequest extends AbstractControlRequest {

public enum Type {
UNKNOWN(0),
INCREMENTAL(1),
FULL(2);

private final byte type;
private Type(int type) {
this.type = (byte) type;
}

public byte toByte() {
return type;
}

public static Type fromByte(byte type) {
for (Type t : Type.values()) {
if (t.type == type) {
return t;
}
}
return UNKNOWN;
}
}

public static class Builder extends AbstractControlRequest.Builder<LeaderAndIsrRequest> {

private final List<LeaderAndIsrPartitionState> partitionStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,28 @@ public static class Builder extends AbstractControlRequest.Builder<UpdateMetadat
private final List<UpdateMetadataPartitionState> partitionStates;
private final List<UpdateMetadataBroker> liveBrokers;
private final Map<String, Uuid> topicIds;
private final Type updateType;

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers,
Map<String, Uuid> topicIds) {
this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates,
liveBrokers, topicIds, false);
liveBrokers, topicIds, false, Type.UNKNOWN);
}

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch,
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers,
Map<String, Uuid> topicIds, boolean kraftController) {
Map<String, Uuid> topicIds, boolean kraftController, Type updateType) {
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch, kraftController);
this.partitionStates = partitionStates;
this.liveBrokers = liveBrokers;
this.topicIds = topicIds;

if (version >= 8) {
this.updateType = updateType;
} else {
this.updateType = Type.UNKNOWN;
}
}

@Override
Expand Down Expand Up @@ -95,6 +102,7 @@ public UpdateMetadataRequest build(short version) {

if (version >= 8) {
data.setIsKRaftController(kraftController);
data.setType(updateType.toByte());
}

if (version >= 5) {
Expand Down Expand Up @@ -129,6 +137,8 @@ public String toString() {
bld.append("(type: UpdateMetadataRequest=").
append(", controllerId=").append(controllerId).
append(", controllerEpoch=").append(controllerEpoch).
append(", kraftController=").append(kraftController).
append(", type=").append(updateType).
append(", brokerEpoch=").append(brokerEpoch).
append(", partitionStates=").append(partitionStates).
append(", liveBrokers=").append(Utils.join(liveBrokers, ", ")).
Expand Down Expand Up @@ -196,6 +206,10 @@ public boolean isKRaftController() {
return data.isKRaftController();
}

public Type updateType() {
return Type.fromByte(data.type());
}

@Override
public int controllerEpoch() {
return data.controllerEpoch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
"about": "The controller id." },
{ "name": "isKRaftController", "type": "bool", "versions": "8+", "default": "false",
"about": "If KRaft controller id is used during migration. See KIP-866" },
{ "name": "Type", "type": "int8", "versions": "8+",
"default": 0, "tag": 0, "taggedVersions": "8+",
"about": "Indicates if this request is a Full metadata snapshot (2), Incremental (1), or Unknown (0). Using during ZK migration, see KIP-866"},
{ "name": "ControllerEpoch", "type": "int32", "versions": "0+",
"about": "The controller epoch." },
{ "name": "BrokerEpoch", "type": "int64", "versions": "5+", "ignorable": true, "default": "-1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]]
val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState]
private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.UNKNOWN
private var updateType: AbstractControlRequest.Type = AbstractControlRequest.Type.UNKNOWN
private var metadataInstance: ControllerChannelContext = _

def sendRequest(brokerId: Int,
Expand All @@ -399,7 +399,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
metadataInstance = metadataProvider()
}

def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = {
def setUpdateType(updateType: AbstractControlRequest.Type): Unit = {
this.updateType = updateType
}

Expand All @@ -409,7 +409,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
updateMetadataRequestBrokerSet.clear()
updateMetadataRequestPartitionInfoMap.clear()
metadataInstance = null
updateType = LeaderAndIsrRequest.Type.UNKNOWN
updateType = AbstractControlRequest.Type.UNKNOWN
}

def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
Expand Down Expand Up @@ -567,7 +567,6 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
}
}
leaderAndIsrRequestMap.clear()
updateType = LeaderAndIsrRequest.Type.UNKNOWN
mumrah marked this conversation as resolved.
Show resolved Hide resolved
}

def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit
Expand Down Expand Up @@ -621,8 +620,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.distinct
.filter(metadataInstance.topicIds.contains)
.map(topic => (topic, metadataInstance.topicIds(topic))).toMap
val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion,
controllerId, controllerEpoch, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, topicIds.asJava, kraftController)
val updateMetadataRequestBuilder = new UpdateMetadataRequest.Builder(
updateMetadataRequestVersion,
controllerId,
controllerEpoch,
brokerEpoch,
partitionStates.asJava,
liveBrokers.asJava,
topicIds.asJava,
kraftController,
updateType
)
sendRequest(broker, updateMetadataRequestBuilder, (r: AbstractResponse) => {
val updateMetadataResponse = r.asInstanceOf[UpdateMetadataResponse]
handleUpdateMetadataResponse(updateMetadataResponse, broker)
Expand Down Expand Up @@ -736,6 +744,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
sendStopReplicaRequests(controllerEpoch, stateChangeLog)
this.updateType = AbstractControlRequest.Type.UNKNOWN
} catch {
case e: Throwable =>
if (leaderAndIsrRequestMap.nonEmpty) {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/migration/MigrationPropagator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.controller.{ControllerChannelContext, ControllerChannelManager, Rep
import kafka.server.KafkaConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.AbstractControlRequest
import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
Expand Down Expand Up @@ -138,6 +138,7 @@ class MigrationPropagator(
}
requestBatch.sendRequestsToBrokers(zkControllerEpoch)
requestBatch.newBatch()
requestBatch.setUpdateType(AbstractControlRequest.Type.INCREMENTAL)

// Now send LISR, UMR and StopReplica requests for both new zk brokers and existing zk
// brokers based on the topic changes.
Expand Down Expand Up @@ -226,7 +227,7 @@ class MigrationPropagator(
requestBatch.sendRequestsToBrokers(zkControllerEpoch)

requestBatch.newBatch()
requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL)
requestBatch.setUpdateType(AbstractControlRequest.Type.FULL)
// When we need to send RPCs from the image, we're sending 'full' requests meaning we let
// every broker know about all the metadata and all the LISR requests it needs to handle.
// Note that we cannot send StopReplica requests from the image. We don't have any state
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ class KafkaServer(
config.brokerId,
config.interBrokerProtocolVersion,
brokerFeatures,
kraftControllerNodes)
kraftControllerNodes,
config.migrationEnabled)
val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config)

/* initialize feature change listener */
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ object MetadataCache {
def zkMetadataCache(brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(),
kraftControllerNodes: collection.Seq[Node] = collection.Seq.empty[Node])
kraftControllerNodes: collection.Seq[Node] = collection.Seq.empty[Node],
zkMigrationEnabled: Boolean = false)
: ZkMetadataCache = {
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, kraftControllerNodes)
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, kraftControllerNodes, zkMigrationEnabled)
}

def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1883,7 +1883,7 @@ class ReplicaManager(val config: KafkaConfig,
if (
config.migrationEnabled &&
leaderAndIsrRequest.isKRaftController &&
leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.FULL
leaderAndIsrRequest.requestType() == AbstractControlRequest.Type.FULL
) {
updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest))
}
Expand Down
89 changes: 78 additions & 11 deletions core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataPartitionState, UpdateMetadataTopicState}
import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
import org.apache.kafka.common.message.UpdateMetadataRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.{Features, MetadataVersion}

Expand All @@ -55,6 +56,60 @@ trait ZkFinalizedFeatureCache {
def getFeatureOption: Option[Features]
}

case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
topicIds: Map[String, Uuid],
controllerId: Option[CachedControllerId],
aliveBrokers: mutable.LongMap[Broker],
aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) {
val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) }
}

object ZkMetadataCache {
/**
* Create topic deletions (leader=-2) for topics that are missing in a FULL UpdateMetadataRequest coming from a
* KRaft controller during a ZK migration. This will modify the UpdateMetadataRequest object passed into this method.
*/
def maybeInjectDeletedPartitionsFromFullMetadataRequest(
currentMetadata: MetadataSnapshot,
requestControllerEpoch: Int,
requestTopicStates: util.List[UpdateMetadataTopicState],
): Seq[Uuid] = {
val prevTopicIds = currentMetadata.topicIds.values.toSet
val requestTopics = requestTopicStates.asScala.map { topicState =>
topicState.topicName() -> topicState.topicId()
}.toMap

val deleteTopics = prevTopicIds -- requestTopics.values.toSet
if (deleteTopics.isEmpty) {
return Seq.empty
}

deleteTopics.foreach { deletedTopicId =>
val topicName = currentMetadata.topicNames(deletedTopicId)
val topicState = new UpdateMetadataRequestData.UpdateMetadataTopicState()
.setTopicId(deletedTopicId)
.setTopicName(topicName)
.setPartitionStates(new util.ArrayList())

currentMetadata.partitionStates(topicName).foreach { case (partitionId, partitionState) =>
val lisr = LeaderAndIsr.duringDelete(partitionState.isr().asScala.map(_.intValue()).toList)
val newPartitionState = new UpdateMetadataPartitionState()
.setPartitionIndex(partitionId.toInt)
.setTopicName(topicName)
.setLeader(lisr.leader)
.setLeaderEpoch(lisr.leaderEpoch)
.setControllerEpoch(requestControllerEpoch)
.setReplicas(partitionState.replicas())
.setZkVersion(lisr.partitionEpoch)
.setIsr(lisr.isr.map(Integer.valueOf).asJava)
topicState.partitionStates().add(newPartitionState)
}
requestTopicStates.add(topicState)
}
deleteTopics.toSeq
}
}

/**
* A cache for the state (e.g., current leader) of each partition. This cache is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
Expand All @@ -63,7 +118,8 @@ class ZkMetadataCache(
brokerId: Int,
metadataVersion: MetadataVersion,
brokerFeatures: BrokerFeatures,
kraftControllerNodes: Seq[Node] = Seq.empty)
kraftControllerNodes: Seq[Node] = Seq.empty,
zkMigrationEnabled: Boolean = false)
extends MetadataCache with ZkFinalizedFeatureCache with Logging {

private val partitionMetadataLock = new ReentrantReadWriteLock()
Expand Down Expand Up @@ -376,6 +432,25 @@ class ZkMetadataCache(
// This method returns the deleted TopicPartitions received from UpdateMetadataRequest
def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
inWriteLock(partitionMetadataLock) {
if (
updateMetadataRequest.isKRaftController &&
updateMetadataRequest.updateType() == AbstractControlRequest.Type.FULL
) {
if (!zkMigrationEnabled) {
stateChangeLogger.error(s"Received UpdateMetadataRequest with Type=FULL (2), but ZK migrations " +
s"are not enabled on this broker. Not treating this as a full metadata update")
} else {
val deletedTopicIds = ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
metadataSnapshot, updateMetadataRequest.controllerEpoch(), updateMetadataRequest.topicStates())
if (deletedTopicIds.isEmpty) {
stateChangeLogger.trace(s"Received UpdateMetadataRequest with Type=FULL (2), " +
s"but no deleted topics were detected.")
} else {
stateChangeLogger.debug(s"Received UpdateMetadataRequest with Type=FULL (2), " +
s"found ${deletedTopicIds.size} deleted topic ID(s): $deletedTopicIds.")
}
}
}

val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
Expand Down Expand Up @@ -477,14 +552,6 @@ class ZkMetadataCache(
}
}

case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
topicIds: Map[String, Uuid],
controllerId: Option[CachedControllerId],
aliveBrokers: mutable.LongMap[Broker],
aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) {
val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) }
}

override def metadataVersion(): MetadataVersion = metadataVersion

override def features(): Features = _features match {
Expand Down