Skip to content

Commit

Permalink
KAFKA-12403; Ensure local state deleted on RemoveTopicRecord receiv…
Browse files Browse the repository at this point in the history
…ed (#10252)

This patch implements additional handling logic for `RemoveTopic` records:

- Update `MetadataPartitions` to ensure addition of deleted partitions to `localRemoved` set
- Ensure topic configs are removed from `ConfigRepository`
- Propagate deleted partitions to `GroupCoordinator` so that corresponding offset commits can be removed

This patch also changes the controller topic id generation logic to use `Uuid.randomUuid` rather than `Random`.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
hachikuji committed Mar 8, 2021
1 parent cf329cb commit 31a121c
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 79 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Expand Up @@ -266,7 +266,6 @@ class BrokerServer(
groupCoordinator,
replicaManager,
transactionCoordinator,
logManager,
threadNamePrefix,
clientQuotaMetadataManager)

Expand Down
Expand Up @@ -20,7 +20,6 @@ import java.util
import java.util.concurrent.TimeUnit
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{RaftReplicaManager, RequestHandlerHelper}
import org.apache.kafka.common.config.ConfigResource
Expand All @@ -45,7 +44,6 @@ class BrokerMetadataListener(brokerId: Int,
groupCoordinator: GroupCoordinator,
replicaManager: RaftReplicaManager,
txnCoordinator: TransactionCoordinator,
logManager: LogManager,
threadNamePrefix: Option[String],
clientQuotaManager: ClientQuotaMetadataManager
) extends MetaLogListener with KafkaMetricsGroup {
Expand Down Expand Up @@ -79,6 +77,11 @@ class BrokerMetadataListener(brokerId: Int,
eventQueue.append(new HandleCommitsEvent(lastOffset, records))
}

// Visible for testing. It's useful to execute events synchronously
private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
new HandleCommitsEvent(lastOffset, records).run()
}

class HandleCommitsEvent(lastOffset: Long,
records: util.List[ApiMessage])
extends EventQueue.FailureLoggingEvent(log) {
Expand Down Expand Up @@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
case e: Exception => throw new RuntimeException("Unknown metadata record type " +
s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
}
recordType match {
case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
record.asInstanceOf[RegisterBrokerRecord])
case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder,
record.asInstanceOf[UnregisterBrokerRecord])
case TOPIC_RECORD => handleTopicRecord(imageBuilder,
record.asInstanceOf[TopicRecord])
case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
record.asInstanceOf[PartitionRecord])
case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord])
case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
record.asInstanceOf[PartitionChangeRecord])
case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
record.asInstanceOf[FenceBrokerRecord])
case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
record.asInstanceOf[UnfenceBrokerRecord])
case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
record.asInstanceOf[RemoveTopicRecord])
case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
record.asInstanceOf[QuotaRecord])
// TODO: handle FEATURE_LEVEL_RECORD
case _ => throw new RuntimeException(s"Unsupported record type ${recordType}")

record match {
case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec)
case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec)
case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec)
case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)
case rec: PartitionChangeRecord => handlePartitionChangeRecord(imageBuilder, rec)
case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec)
case rec: ConfigRecord => handleConfigRecord(rec)
case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec)
case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType")
}
}

Expand Down Expand Up @@ -222,9 +216,16 @@ class BrokerMetadataListener(brokerId: Int,

def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
record: RemoveTopicRecord): Unit = {
val removedPartitions = imageBuilder.partitionsBuilder().
removeTopicById(record.topicId())
groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
imageBuilder.topicIdToName(record.topicId()) match {
case None =>
throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId}")

case Some(topicName) =>
info(s"Processing deletion of topic $topicName with id ${record.topicId}")
val removedPartitions = imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, topicName))
}
}

def handleQuotaRecord(imageBuilder: MetadataImageBuilder,
Expand Down
Expand Up @@ -96,11 +96,16 @@ class CachedConfigRepository extends ConfigRepository {

override def config(configResource: ConfigResource): Properties = {
val properties = new Properties()
Option(configMap.get(configResource)).foreach {
_.entrySet().iterator().asScala.foreach { case e =>
properties.put(e.getKey, e.getValue)
Option(configMap.get(configResource)).foreach { resourceConfigMap =>
resourceConfigMap.entrySet.iterator.asScala.foreach { entry =>
properties.put(entry.getKey, entry.getValue)
}
}
properties
}

def remove(configResource: ConfigResource): Unit = {
configMap.remove(configResource)
}

}
95 changes: 71 additions & 24 deletions core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Expand Up @@ -116,21 +116,48 @@ class MetadataPartitionsBuilder(val brokerId: Int,
val prevPartitions: MetadataPartitions) {
private var newNameMap = prevPartitions.copyNameMap()
private var newIdMap = prevPartitions.copyIdMap()
private var newReverseIdMap = prevPartitions.copyReverseIdMap()
private val changed = Collections.newSetFromMap[Any](new util.IdentityHashMap())
private val _localChanged = new util.HashSet[MetadataPartition]
private val _localRemoved = new util.HashSet[MetadataPartition]

def topicIdToName(id: Uuid): Option[String] = Option(newIdMap.get(id))

def topicNameToId(name: String): Option[Uuid] = Option(newReverseIdMap.get(name))

def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
Option(newIdMap.remove(id)) match {
case None => throw new RuntimeException(s"Unable to locate topic with ID $id")
case Some(name) => newNameMap.remove(name).values().asScala
val name = Option(newIdMap.remove(id)).getOrElse {
throw new RuntimeException(s"Unable to locate topic with ID $id")
}

newReverseIdMap.remove(name)

val prevPartitionMap = newNameMap.remove(name)
if (prevPartitionMap == null) {
Seq.empty
} else {
changed.remove(prevPartitionMap)

val removedPartitions = prevPartitionMap.values
if (prevImageHasTopicId(id)) {
removedPartitions.forEach { partition =>
if (partition.isReplicaFor(brokerId)) {
_localRemoved.add(partition)
}
}
} else {
removedPartitions.forEach { partition =>
if (partition.isReplicaFor(brokerId)) {
_localChanged.remove(partition)
}
}
}
removedPartitions.asScala
}
}

def handleChange(record: PartitionChangeRecord): Unit = {
Option(newIdMap.get(record.topicId())) match {
topicIdToName(record.topicId) match {
case None => throw new RuntimeException(s"Unable to locate topic with ID ${record.topicId()}")
case Some(name) => Option(newNameMap.get(name)) match {
case None => throw new RuntimeException(s"Unable to locate topic with name $name")
Expand All @@ -144,10 +171,14 @@ class MetadataPartitionsBuilder(val brokerId: Int,

def addUuidMapping(name: String, id: Uuid): Unit = {
newIdMap.put(id, name)
newReverseIdMap.put(name, id)
}

def removeUuidMapping(id: Uuid): Unit = {
newIdMap.remove(id)
val topicName = newIdMap.remove(id)
if (topicName != null) {
newReverseIdMap.remove(topicName)
}
}

def get(topicName: String, partitionId: Int): Option[MetadataPartition] = {
Expand All @@ -171,42 +202,58 @@ class MetadataPartitionsBuilder(val brokerId: Int,
val prevPartition = newPartitionMap.put(partition.partitionIndex, partition)
if (partition.isReplicaFor(brokerId)) {
_localChanged.add(partition)
} else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
_localRemoved.add(prevPartition)
} else if (prevPartition != null) {
maybeAddToLocalRemoved(prevPartition)
}
newNameMap.put(partition.topicName, newPartitionMap)
}

private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
if (partition.isReplicaFor(brokerId)) {
val currentTopicId = newReverseIdMap.get(partition.topicName)
val prevImageHasTopic = if (currentTopicId != null) {
prevImageHasTopicId(currentTopicId)
} else {
prevPartitions.allTopicNames().contains(partition.topicName)
}

if (prevImageHasTopic) {
_localRemoved.add(partition)
}
}
}

private def prevImageHasTopicId(topicId: Uuid): Boolean = {
prevPartitions.topicIdToName(topicId).isDefined
}

def remove(topicName: String, partitionId: Int): Unit = {
val prevPartitionMap = newNameMap.get(topicName)
if (prevPartitionMap != null) {
if (changed.contains(prevPartitionMap)) {
val prevPartition = prevPartitionMap.remove(partitionId)
if (prevPartition.isReplicaFor(brokerId)) {
_localRemoved.add(prevPartition)
}
val removedPartition = if (changed.contains(prevPartitionMap)) {
Option(prevPartitionMap.remove(partitionId))
} else {
Option(prevPartitionMap.get(partitionId)).foreach { prevPartition =>
if (prevPartition.isReplicaFor(brokerId)) {
_localRemoved.add(prevPartition)
}
Option(prevPartitionMap.get(partitionId)).map { prevPartition =>
val newPartitionMap = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() - 1)
prevPartitionMap.forEach { (prevPartitionId, prevPartition) =>
if (!prevPartitionId.equals(partitionId)) {
if (prevPartitionId != partitionId) {
newPartitionMap.put(prevPartitionId, prevPartition)
}
}
changed.add(newPartitionMap)
newNameMap.put(topicName, newPartitionMap)
prevPartition
}
}
removedPartition.foreach(maybeAddToLocalRemoved)
}
}

def build(): MetadataPartitions = {
val result = MetadataPartitions(newNameMap, newIdMap)
val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)
newNameMap = Collections.unmodifiableMap(newNameMap)
newIdMap = Collections.unmodifiableMap(newIdMap)
newReverseIdMap = Collections.unmodifiableMap(newReverseIdMap)
result
}

Expand All @@ -232,15 +279,15 @@ case class MetadataPartitions(private val nameMap: util.Map[String, util.Map[Int
def topicNameToId(name: String): Option[Uuid] = Option(reverseIdMap.get(name))

def copyNameMap(): util.Map[String, util.Map[Int, MetadataPartition]] = {
val copy = new util.HashMap[String, util.Map[Int, MetadataPartition]](nameMap.size())
copy.putAll(nameMap)
copy
new util.HashMap(nameMap)
}

def copyIdMap(): util.Map[Uuid, String] = {
val copy = new util.HashMap[Uuid, String](idMap.size())
copy.putAll(idMap)
copy
new util.HashMap(idMap)
}

def copyReverseIdMap(): util.Map[String, Uuid] = {
new util.HashMap(reverseIdMap)
}

def allPartitions(): Iterator[MetadataPartition] = new AllPartitionsIterator(nameMap).asScala
Expand Down

0 comments on commit 31a121c

Please sign in to comment.