Skip to content

Commit

Permalink
KAFKA-13127; Fix stray topic partition deletion for kraft (#11118)
Browse files Browse the repository at this point in the history
This patch fixes BrokerMetadataPublisher.findGhostReplicas (renamed to findStrayPartitions)
so that it returns the stray partitions. Previously it was returning the non-stray partitions. This
caused all of these partitions to get deleted on startup by mistake.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@gmail.com>
  • Loading branch information
hachikuji authored and cmccabe committed Jul 23, 2021
1 parent bdc75b2 commit 4ab6c4a
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 30 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/log/Log.scala
Expand Up @@ -1890,6 +1890,7 @@ class Log(@volatile private var _dir: File,
override def toString: String = {
val logString = new StringBuilder
logString.append(s"Log(dir=$dir")
topicId.foreach(id => logString.append(s", topicId=$id"))
logString.append(s", topic=${topicPartition.topic}")
logString.append(s", partition=${topicPartition.partition}")
logString.append(s", highWatermark=$highWatermark")
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Expand Up @@ -2201,15 +2201,15 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
}

def deleteGhostReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
case (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
stateChangeLogger.error(s"Unable to delete ghost replica ${topicPartition} because " +
stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
"the local replica for the partition is in an offline log directory")
} else {
stateChangeLogger.error(s"Unable to delete ghost replica ${topicPartition} because " +
s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}")
stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}", e)
}
}
}
Expand Down
Expand Up @@ -26,12 +26,12 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}

import scala.collection.mutable


object BrokerMetadataPublisher {
object BrokerMetadataPublisher extends Logging {
/**
* Given a topic name, find out if it changed. Note: if a topic named X was deleted and
* then re-created, this method will return just the re-creation. The deletion will show
Expand All @@ -56,29 +56,35 @@ object BrokerMetadataPublisher {
/**
* Find logs which should not be on the current broker, according to the metadata image.
*
* @param brokerId The ID of the current broker.
* @param newImage The metadata image.
* @param logs A collection of Log objects.
* @param brokerId The ID of the current broker.
* @param newTopicsImage The new topics image after broker has been reloaded
* @param logs A collection of Log objects.
*
* @return The topic partitions which are no longer needed on this broker.
*/
def findGhostReplicas(brokerId: Int,
newImage: MetadataImage,
logs: Iterable[Log]): Iterable[TopicPartition] = {
def findStrayPartitions(brokerId: Int,
newTopicsImage: TopicsImage,
logs: Iterable[Log]): Iterable[TopicPartition] = {
logs.flatMap { log =>
log.topicId match {
case None => throw new RuntimeException(s"Topic ${log.name} does not have a topic ID, " +
val topicId = log.topicId.getOrElse {
throw new RuntimeException(s"The log dir $log does not have a topic ID, " +
"which is not allowed when running in KRaft mode.")
case Some(topicId) =>
val partitionId = log.topicPartition.partition()
Option(newImage.topics().getPartition(topicId, partitionId)) match {
case None => None
case Some(partition) => if (partition.replicas.contains(brokerId)) {
Some(log.topicPartition)
} else {
None
}
}

val partitionId = log.topicPartition.partition()
Option(newTopicsImage.getPartition(topicId, partitionId)) match {
case Some(partition) =>
if (!partition.replicas.contains(brokerId)) {
info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " +
s"does not contain the local brokerId $brokerId.")
Some(log.topicPartition)
} else {
None
}

case None =>
info(s"Found stray log dir $log: the topicId $topicId does not exist in the metadata image")
Some(log.topicPartition)
}
}
}
Expand Down Expand Up @@ -239,9 +245,9 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Delete log directories which we're not supposed to have, according to the
// latest metadata. This is only necessary to do when we're first starting up. If
// we have to load a snapshot later, these topics will appear in deletedTopicIds.
val ghostReplicas = findGhostReplicas(brokerId, newImage, logManager.allLogs)
if (ghostReplicas.nonEmpty) {
replicaManager.deleteGhostReplicas(ghostReplicas)
val strayPartitions = findStrayPartitions(brokerId, newImage.topics, logManager.allLogs)
if (strayPartitions.nonEmpty) {
replicaManager.deleteStrayReplicas(strayPartitions)
}

// Make sure that the high water mark checkpoint thread is running for the replica
Expand Down
Expand Up @@ -29,15 +29,16 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}

import org.junit.jupiter.api.{Tag, Test, Timeout}
import java.util
import java.util.{Arrays, Collections, Optional}

import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._

@Timeout(120)
@Tag("integration")
class KRaftClusterTest {

@Test
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package kafka.server

import java.util.Properties

import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
Expand All @@ -24,10 +26,11 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag

import java.util.Properties
import scala.jdk.CollectionConverters._

@Tag("integration")
abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {

def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
Expand Down
Expand Up @@ -32,12 +32,14 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith

import scala.jdk.CollectionConverters._

@ClusterTestDefaults(clusterType = Type.BOTH)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Tag("integration")
class ClientQuotasRequestTest(cluster: ClusterInstance) {
private val ConsumerByteRateProp = QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG
private val ProducerByteRateProp = QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG
Expand Down
Expand Up @@ -26,13 +26,15 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT)
@Tag("integration")
class DescribeQuorumRequestTest(cluster: ClusterInstance) {

@ClusterTest(clusterType = Type.ZK)
Expand Down
Expand Up @@ -17,9 +17,17 @@

package unit.kafka.server.metadata

import kafka.log.Log
import kafka.server.metadata.BrokerMetadataPublisher
import org.apache.kafka.image.MetadataImageTest
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.assertEquals

import org.mockito.Mockito

import scala.jdk.CollectionConverters._

class BrokerMetadataPublisherTest {
@Test
Expand All @@ -39,4 +47,99 @@ class BrokerMetadataPublisherTest {
MetadataImageTest.IMAGE1,
MetadataImageTest.DELTA1).isDefined, "Expected to see delta for changed topic")
}

@Test
def testFindStrayReplicas(): Unit = {
val brokerId = 0

// Topic has been deleted
val deletedTopic = "a"
val deletedTopicId = Uuid.randomUuid()
val deletedTopicPartition1 = new TopicPartition(deletedTopic, 0)
val deletedTopicLog1 = mockLog(deletedTopicId, deletedTopicPartition1)
val deletedTopicPartition2 = new TopicPartition(deletedTopic, 1)
val deletedTopicLog2 = mockLog(deletedTopicId, deletedTopicPartition2)

// Topic was deleted and recreated
val recreatedTopic = "b"
val recreatedTopicPartition = new TopicPartition(recreatedTopic, 0)
val recreatedTopicLog = mockLog(Uuid.randomUuid(), recreatedTopicPartition)
val recreatedTopicImage = topicImage(Uuid.randomUuid(), recreatedTopic, Map(
recreatedTopicPartition.partition -> Seq(0, 1, 2)
))

// Topic exists, but some partitions were reassigned
val reassignedTopic = "c"
val reassignedTopicId = Uuid.randomUuid()
val reassignedTopicPartition = new TopicPartition(reassignedTopic, 0)
val reassignedTopicLog = mockLog(reassignedTopicId, reassignedTopicPartition)
val retainedTopicPartition = new TopicPartition(reassignedTopic, 1)
val retainedTopicLog = mockLog(reassignedTopicId, retainedTopicPartition)

val reassignedTopicImage = topicImage(reassignedTopicId, reassignedTopic, Map(
reassignedTopicPartition.partition -> Seq(1, 2, 3),
retainedTopicPartition.partition -> Seq(0, 2, 3)
))

val logs = Seq(
deletedTopicLog1,
deletedTopicLog2,
recreatedTopicLog,
reassignedTopicLog,
retainedTopicLog
)

val image = topicsImage(Seq(
recreatedTopicImage,
reassignedTopicImage
))

val expectedStrayPartitions = Set(
deletedTopicPartition1,
deletedTopicPartition2,
recreatedTopicPartition,
reassignedTopicPartition
)

val strayPartitions = BrokerMetadataPublisher.findStrayPartitions(brokerId, image, logs).toSet
assertEquals(expectedStrayPartitions, strayPartitions)
}

private def mockLog(
topicId: Uuid,
topicPartition: TopicPartition
): Log = {
val log = Mockito.mock(classOf[Log])
Mockito.when(log.topicId).thenReturn(Some(topicId))
Mockito.when(log.topicPartition).thenReturn(topicPartition)
log
}

private def topicImage(
topicId: Uuid,
topic: String,
partitions: Map[Int, Seq[Int]]
): TopicImage = {
val partitionRegistrations = partitions.map { case (partitionId, replicas) =>
Int.box(partitionId) -> new PartitionRegistration(
replicas.toArray,
replicas.toArray,
Array.empty[Int],
Array.empty[Int],
replicas.head,
0,
0
)
}
new TopicImage(topic, topicId, partitionRegistrations.asJava)
}

private def topicsImage(
topics: Seq[TopicImage]
): TopicsImage = {
val idsMap = topics.map(t => t.id -> t).toMap
val namesMap = topics.map(t => t.name -> t).toMap
new TopicsImage(idsMap.asJava, namesMap.asJava)
}

}

0 comments on commit 4ab6c4a

Please sign in to comment.