Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13127; Fix stray topic partition deletion for kraft #11118

Merged
merged 2 commits into from Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/log/Log.scala
Expand Up @@ -259,7 +259,7 @@ class Log(@volatile var logStartOffset: Long,

import kafka.log.Log._

this.logIdent = s"[Log partition=$topicPartition, dir=$parentDir] "
this.logIdent = s"[Log partition=$topicPartition, dir=${parentDir}${topicId.map(id => s", topicId=$id").getOrElse("")}] "
hachikuji marked this conversation as resolved.
Show resolved Hide resolved

/* A lock that guards all modifications to the log */
private val lock = new Object
Expand Down Expand Up @@ -1635,6 +1635,7 @@ class Log(@volatile var logStartOffset: Long,
override def toString: String = {
val logString = new StringBuilder
logString.append(s"Log(dir=$dir")
topicId.foreach(id => logString.append(s", topicId=$id"))
logString.append(s", topic=${topicPartition.topic}")
logString.append(s", partition=${topicPartition.partition}")
logString.append(s", highWatermark=$highWatermark")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Expand Up @@ -2220,7 +2220,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
}

def deleteGhostReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
hachikuji marked this conversation as resolved.
Show resolved Hide resolved
stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
case (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
Expand Down
Expand Up @@ -26,12 +26,12 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}

import scala.collection.mutable


object BrokerMetadataPublisher {
object BrokerMetadataPublisher extends Logging {
/**
* Given a topic name, find out if it changed. Note: if a topic named X was deleted and
* then re-created, this method will return just the re-creation. The deletion will show
Expand All @@ -56,29 +56,35 @@ object BrokerMetadataPublisher {
/**
* Find logs which should not be on the current broker, according to the metadata image.
*
* @param brokerId The ID of the current broker.
* @param newImage The metadata image.
* @param logs A collection of Log objects.
* @param brokerId The ID of the current broker.
* @param newTopicsImage The new topics image after broker has been reloaded
* @param logs A collection of Log objects.
*
* @return The topic partitions which are no longer needed on this broker.
*/
def findGhostReplicas(brokerId: Int,
newImage: MetadataImage,
logs: Iterable[Log]): Iterable[TopicPartition] = {
def findStrayPartitions(brokerId: Int,
newTopicsImage: TopicsImage,
logs: Iterable[Log]): Iterable[TopicPartition] = {
logs.flatMap { log =>
log.topicId match {
case None => throw new RuntimeException(s"Topic ${log.name} does not have a topic ID, " +
val topicId = log.topicId.getOrElse {
throw new RuntimeException(s"The log dir $log does not have a topic ID, " +
"which is not allowed when running in KRaft mode.")
case Some(topicId) =>
val partitionId = log.topicPartition.partition()
Option(newImage.topics().getPartition(topicId, partitionId)) match {
case None => None
case Some(partition) => if (partition.replicas.contains(brokerId)) {
Some(log.topicPartition)
} else {
None
}
}

val partitionId = log.topicPartition.partition()
Option(newTopicsImage.getPartition(topicId, partitionId)) match {
case Some(partition) =>
if (!partition.replicas.contains(brokerId)) {
info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " +
s"does not contain the local brokerId $brokerId.")
Some(log.topicPartition)
} else {
None
}

case None =>
info(s"Found stray log dir $log: the topicId $topicId does not exist in the metadata image")
Some(log.topicPartition)
}
}
}
Expand Down Expand Up @@ -239,9 +245,9 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Delete log directories which we're not supposed to have, according to the
// latest metadata. This is only necessary to do when we're first starting up. If
// we have to load a snapshot later, these topics will appear in deletedTopicIds.
val ghostReplicas = findGhostReplicas(brokerId, newImage, logManager.allLogs)
if (ghostReplicas.nonEmpty) {
replicaManager.deleteGhostReplicas(ghostReplicas)
val strayPartitions = findStrayPartitions(brokerId, newImage.topics, logManager.allLogs)
if (strayPartitions.nonEmpty) {
replicaManager.deleteStrayReplicas(strayPartitions)
}

// Make sure that the high water mark checkpoint thread is running for the replica
Expand Down
Expand Up @@ -29,15 +29,16 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}

import org.junit.jupiter.api.{Tag, Test, Timeout}
import java.util
import java.util.{Arrays, Collections, Optional}

import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._

@Timeout(120)
@Tag("integration")
class KRaftClusterTest {

@Test
Expand Down
Expand Up @@ -24,10 +24,13 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions._

import java.util.Properties

import org.junit.jupiter.api.Tag
hachikuji marked this conversation as resolved.
Show resolved Hide resolved

import scala.jdk.CollectionConverters._

@Tag("integration")
abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {

def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = {
Expand Down
Expand Up @@ -32,12 +32,14 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith

import scala.jdk.CollectionConverters._

@ClusterTestDefaults(clusterType = Type.BOTH)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Tag("integration")
class ClientQuotasRequestTest(cluster: ClusterInstance) {
private val ConsumerByteRateProp = QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG
private val ProducerByteRateProp = QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG
Expand Down
Expand Up @@ -26,13 +26,15 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT)
@Tag("integration")
class DescribeQuorumRequestTest(cluster: ClusterInstance) {

@ClusterTest(clusterType = Type.ZK)
Expand Down
Expand Up @@ -17,9 +17,17 @@

package unit.kafka.server.metadata

import kafka.log.Log
import kafka.server.metadata.BrokerMetadataPublisher
import org.apache.kafka.image.MetadataImageTest
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.assertEquals

import org.mockito.Mockito

import scala.jdk.CollectionConverters._

class BrokerMetadataPublisherTest {
@Test
Expand All @@ -39,4 +47,99 @@ class BrokerMetadataPublisherTest {
MetadataImageTest.IMAGE1,
MetadataImageTest.DELTA1).isDefined, "Expected to see delta for changed topic")
}

@Test
def testFindStrayReplicas(): Unit = {
val brokerId = 0

// Topic has been deleted
val deletedTopic = "a"
val deletedTopicId = Uuid.randomUuid()
val deletedTopicPartition1 = new TopicPartition(deletedTopic, 0)
val deletedTopicLog1 = mockLog(deletedTopicId, deletedTopicPartition1)
val deletedTopicPartition2 = new TopicPartition(deletedTopic, 1)
val deletedTopicLog2 = mockLog(deletedTopicId, deletedTopicPartition2)

// Topic was deleted and recreated
val recreatedTopic = "b"
val recreatedTopicPartition = new TopicPartition(recreatedTopic, 0)
val recreatedTopicLog = mockLog(Uuid.randomUuid(), recreatedTopicPartition)
val recreatedTopicImage = topicImage(Uuid.randomUuid(), recreatedTopic, Map(
recreatedTopicPartition.partition -> Seq(0, 1, 2)
))

// Topic exists, but some partitions were reassigned
val reassignedTopic = "c"
val reassignedTopicId = Uuid.randomUuid()
val reassignedTopicPartition = new TopicPartition(reassignedTopic, 0)
val reassignedTopicLog = mockLog(reassignedTopicId, reassignedTopicPartition)
val retainedTopicPartition = new TopicPartition(reassignedTopic, 1)
val retainedTopicLog = mockLog(reassignedTopicId, retainedTopicPartition)

val reassignedTopicImage = topicImage(reassignedTopicId, reassignedTopic, Map(
reassignedTopicPartition.partition -> Seq(1, 2, 3),
retainedTopicPartition.partition -> Seq(0, 2, 3)
))

val logs = Seq(
deletedTopicLog1,
deletedTopicLog2,
recreatedTopicLog,
reassignedTopicLog,
retainedTopicLog
)

val image = topicsImage(Seq(
recreatedTopicImage,
reassignedTopicImage
))

val expectedStrayPartitions = Set(
deletedTopicPartition1,
deletedTopicPartition2,
recreatedTopicPartition,
reassignedTopicPartition
)

val strayPartitions = BrokerMetadataPublisher.findStrayPartitions(brokerId, image, logs).toSet
assertEquals(expectedStrayPartitions, strayPartitions)
}

private def mockLog(
topicId: Uuid,
topicPartition: TopicPartition
): Log = {
val log = Mockito.mock(classOf[Log])
Mockito.when(log.topicId).thenReturn(Some(topicId))
Mockito.when(log.topicPartition).thenReturn(topicPartition)
log
}

private def topicImage(
topicId: Uuid,
topic: String,
partitions: Map[Int, Seq[Int]]
): TopicImage = {
val partitionRegistrations = partitions.map { case (partitionId, replicas) =>
Int.box(partitionId) -> new PartitionRegistration(
replicas.toArray,
replicas.toArray,
Array.empty[Int],
Array.empty[Int],
replicas.head,
0,
0
)
}
new TopicImage(topic, topicId, partitionRegistrations.asJava)
}

private def topicsImage(
topics: Seq[TopicImage]
): TopicsImage = {
val idsMap = topics.map(t => t.id -> t).toMap
val namesMap = topics.map(t => t.name -> t).toMap
new TopicsImage(idsMap.asJava, namesMap.asJava)
}

}