Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12403; Ensure local state deleted on RemoveTopicRecord received #10252

Merged
merged 7 commits into from Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Expand Up @@ -266,7 +266,6 @@ class BrokerServer(
groupCoordinator,
replicaManager,
transactionCoordinator,
logManager,
threadNamePrefix,
clientQuotaMetadataManager)

Expand Down
Expand Up @@ -20,7 +20,6 @@ import java.util
import java.util.concurrent.TimeUnit
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{RaftReplicaManager, RequestHandlerHelper}
import org.apache.kafka.common.config.ConfigResource
Expand All @@ -45,7 +44,6 @@ class BrokerMetadataListener(brokerId: Int,
groupCoordinator: GroupCoordinator,
replicaManager: RaftReplicaManager,
txnCoordinator: TransactionCoordinator,
logManager: LogManager,
threadNamePrefix: Option[String],
clientQuotaManager: ClientQuotaMetadataManager
) extends MetaLogListener with KafkaMetricsGroup {
Expand Down Expand Up @@ -79,6 +77,11 @@ class BrokerMetadataListener(brokerId: Int,
eventQueue.append(new HandleCommitsEvent(lastOffset, records))
}

// Visible for testing. It's useful to execute events synchronously
private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
new HandleCommitsEvent(lastOffset, records).run()
}

class HandleCommitsEvent(lastOffset: Long,
records: util.List[ApiMessage])
extends EventQueue.FailureLoggingEvent(log) {
Expand Down Expand Up @@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
case e: Exception => throw new RuntimeException("Unknown metadata record type " +
s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
}
recordType match {
case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
record.asInstanceOf[RegisterBrokerRecord])
case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
record.asInstanceOf[UnregisterBrokerRecord])
case TOPIC_RECORD => handleTopicRecord(imageBuilder,
record.asInstanceOf[TopicRecord])
case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
record.asInstanceOf[PartitionRecord])
case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
record.asInstanceOf[PartitionChangeRecord])
case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
record.asInstanceOf[FenceBrokerRecord])
case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
record.asInstanceOf[UnfenceBrokerRecord])
case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
record.asInstanceOf[RemoveTopicRecord])
case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
record.asInstanceOf[QuotaRecord])
// TODO: handle FEATURE_LEVEL_RECORD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO no longer relevant?

case _ => throw new RuntimeException(s"Unsupported record type ${recordType}")

record match {
case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
case rec: PartitionChangeRecord => handlePartitionChangeRecord(imageBuilder, rec)
case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec)
case rec: ConfigRecord => handleConfigRecord(rec)
case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec)
case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType")
}
}

Expand Down Expand Up @@ -222,9 +216,16 @@ class BrokerMetadataListener(brokerId: Int,

def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
record: RemoveTopicRecord): Unit = {
val removedPartitions = imageBuilder.partitionsBuilder().
removeTopicById(record.topicId())
groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
imageBuilder.topicIdToName(record.topicId()) match {
case None =>
throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")

case Some(topicName) =>
info(s"Processing deletion of topic $topicName with id ${record.topicId}")
val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
}
}

def handleQuotaRecord(imageBuilder: MetadataImageBuilder,
Expand Down
Expand Up @@ -96,11 +96,16 @@ class CachedConfigRepository extends ConfigRepository {

override def config(configResource: ConfigResource): Properties = {
val properties = new Properties()
Option(configMap.get(configResource)).foreach {
_.entrySet().iterator().asScala.foreach { case e =>
properties.put(e.getKey, e.getValue)
Option(configMap.get(configResource)).foreach { resourceConfigMap =>
resourceConfigMap.entrySet.iterator.asScala.foreach { entry =>
properties.put(entry.getKey, entry.getValue)
}
}
properties
}

def remove(configResource: ConfigResource): Unit = {
configMap.remove(configResource)
}

}
93 changes: 70 additions & 23 deletions core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Expand Up @@ -116,21 +116,48 @@ class MetadataPartitionsBuilder(val brokerId: Int,
val prevPartitions: MetadataPartitions) {
private var newNameMap = prevPartitions.copyNameMap()
private var newIdMap = prevPartitions.copyIdMap()
private var newReverseIdMap = prevPartitions.copyReverseIdMap()
private val changed = Collections.newSetFromMap[Any](new util.IdentityHashMap())
private val _localChanged = new util.HashSet[MetadataPartition]
private val _localRemoved = new util.HashSet[MetadataPartition]

def topicIdToName(id: Uuid): Option[String] = Option(newIdMap.get(id))

def topicNameToId(name: String): Option[Uuid] = Option(newReverseIdMap.get(name))

def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
Option(newIdMap.remove(id)) match {
case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
case Some(name) => newNameMap.remove(name).values().asScala
val name = Option(newIdMap.remove(id)).getOrElse {
throw new RuntimeException(s"Unable to locate topic with ID $id")
}

newReverseIdMap.remove(name)

val prevPartitionMap = newNameMap.remove(name)
if (prevPartitionMap == null) {
Seq.empty
} else {
changed.remove(prevPartitionMap)

val removedPartitions = prevPartitionMap.values
if (prevImageHasTopicId(id)) {
removedPartitions.forEach { partition =>
if (partition.isReplicaFor(brokerId)) {
_localRemoved.add(partition)
}
}
} else {
removedPartitions.forEach { partition =>
if (partition.isReplicaFor(brokerId)) {
_localChanged.remove(partition)
}
}
}
removedPartitions.asScala
}
}

def handleChange(record: PartitionChangeRecord): Unit = {
Option(newIdMap.get(record.topicId())) match {
topicIdToName(record.topicId) match {
case None => throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId()}")
case Some(name) => Option(newNameMap.get(name)) match {
case None => throw new RuntimeException(s"Unable to locate topic with name $name")
Expand All @@ -144,10 +171,14 @@ class MetadataPartitionsBuilder(val brokerId: Int,

def addUuidMapping(name: String, id: Uuid): Unit = {
newIdMap.put(id, name)
newReverseIdMap.put(name, id)
}

def removeUuidMapping(id: Uuid): Unit = {
newIdMap.remove(id)
val topicName = newIdMap.remove(id)
if (topicName != null) {
newReverseIdMap.remove(topicName)
}
}

def get(topicName: String, partitionId: Int): Option[MetadataPartition] = {
Expand All @@ -171,25 +202,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
if (partition.isReplicaFor(brokerId)) {
_localChanged.add(partition)
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
} else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
_localRemoved.add(prevPartition)
} else if (prevPartition != null) {
maybeAddToLocalRemoved(prevPartition)
}
newNameMap.put(partition.topicName, newPartitionMap)
}

private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
if (partition.isReplicaFor(brokerId)) {
val currentTopicId = newReverseIdMap.get(partition.topicName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that newReverseIdMap has no related id? For example, PartitionRecord is processed before TopicRecord or TopicRecord was discarded (due to error)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For another, could it be replaced by prevPartitions.contains(partition.topicName)? It seems all we want to check is the existence of topic name in previous image.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent is to only return the change in _localRemoved if the topic existed in the previous image. If we only check topic name, then successive deletions and recreations might leave some partitions in _localRemoved that were not in the previous image.

It's worth noting that this is strictly more defensive than the current replay logic requires. A new image is built for each batch of records from the controller, and we would never see a topic deleted and recreated (or vice versa) in the same batch. This is an implicit contract though and not protected by the builder API, so I thought we might as well try to make the logic more resilient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for nice explanation. I have another question for this check. It seems to me three collections in prevPartitions should be consistent. For example: a topic which exists one of collection should also exists in other two (vice versa). If above comment is right, why we need this if-else? Calling prevPartitions.contains(partition.topicName) appears to be enough?

val prevImageHasTopic = if (currentTopicId != null) {
prevImageHasTopicId(currentTopicId)
} else {
prevPartitions.allTopicNames().contains(partition.topicName)
}

if (prevImageHasTopic) {
_localRemoved.add(partition)
}
}
}

private def prevImageHasTopicId(topicId: Uuid): Boolean = {
prevPartitions.topicIdToName(topicId).isDefined
}

def remove(topicName: String, partitionId: Int): Unit = {
val prevPartitionMap = newNameMap.get(topicName)
if (prevPartitionMap != null) {
if (changed.contains(prevPartitionMap)) {
val prevPartition = prevPartitionMap.remove(partitionId)
if (prevPartition.isReplicaFor(brokerId)) {
_localRemoved.add(prevPartition)
}
val removedPartition = if (changed.contains(prevPartitionMap)) {
Option(prevPartitionMap.remove(partitionId))
} else {
Option(prevPartitionMap.get(partitionId)).foreach { prevPartition =>
if (prevPartition.isReplicaFor(brokerId)) {
_localRemoved.add(prevPartition)
}
Option(prevPartitionMap.get(partitionId)).map { prevPartition =>
val newPartitionMap = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() - 1)
prevPartitionMap.forEach { (prevPartitionId, prevPartition) =>
if (!prevPartitionId.equals(partitionId)) {
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -198,15 +242,18 @@ class MetadataPartitionsBuilder(val brokerId: Int,
}
changed.add(newPartitionMap)
newNameMap.put(topicName, newPartitionMap)
prevPartition
}
}
removedPartition.foreach(maybeAddToLocalRemoved)
}
}

def build(): MetadataPartitions = {
val result = MetadataPartitions(newNameMap, newIdMap)
val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
newNameMap = Collections.unmodifiableMap(newNameMap)
chia7712 marked this conversation as resolved.
Show resolved Hide resolved
newIdMap = Collections.unmodifiableMap(newIdMap)
newReverseIdMap = Collections.unmodifiableMap(newReverseIdMap)
result
}

Expand All @@ -232,15 +279,15 @@ case class MetadataPartitions(private val nameMap: util.Map[String, util.Map[Int
def topicNameToId(name: String): Option[Uuid] = Option(reverseIdMap.get(name))

def copyNameMap(): util.Map[String, util.Map[Int, MetadataPartition]] = {
val copy = new util.HashMap[String, util.Map[Int, MetadataPartition]](nameMap.size())
copy.putAll(nameMap)
copy
new util.HashMap(nameMap)
}

def copyIdMap(): util.Map[Uuid, String] = {
val copy = new util.HashMap[Uuid, String](idMap.size())
copy.putAll(idMap)
copy
new util.HashMap(idMap)
}

def copyReverseIdMap(): util.Map[String, Uuid] = {
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
new util.HashMap(reverseIdMap)
}

def allPartitions(): Iterator[MetadataPartition] = new AllPartitionsIterator(nameMap).asScala
Expand Down