Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14190: Update Zk TopicId from locally stored cache in controller #13111

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 21 additions & 13 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1664,11 +1664,21 @@ class KafkaController(val config: KafkaConfig,
}

private def processTopicIds(topicIdAssignments: Set[TopicIdReplicaAssignment]): Unit = {
// Create topic IDs for topics missing them if we are using topic IDs
// Create topic IDs or update with locally stored topicIDs for topics missing them if we are using topic IDs
// Otherwise, maintain what we have in the topicZNode
val updatedTopicIdAssignments = if (config.usesTopicId) {
val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined)
withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion)
val (withTopicIds, withoutTopicIds, withLocalTopicIds) = topicIdAssignments.foldLeft((Set.empty[TopicIdReplicaAssignment], Set.empty[TopicIdReplicaAssignment], Set.empty[TopicIdReplicaAssignment])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readability is a bit tricky here. I'm also wondering if there was an operation you could have done besides making singleton sets and unioning them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we go through all the partitions again to check the topic ID and add to the replica assignment I wonder if we could have kept the partition and then a separate part for the ones with local IDs. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering if there was an operation you could have done besides making singleton sets and unioning them.

I chose this approach because this was the only one where we are able to 3-way partition the topicIdAssignments in a single iterations. All other approaches require multiple iterations.

We can work on improving this and favouring readibility once we have a consensus on whether we will proceed with this change or not.

case ((wt, wo, wl), t) =>
if (t.topicId.isDefined) (wt union Set(t), wo, wl)
else if (controllerContext.topicIds.contains(t.topic)) (wt, wo, wl union Set(t))
else (wt, wo union Set(t), wl)
}

val topicIdsForZkUpdate = withoutTopicIds ++ withLocalTopicIds.map { t =>
TopicIdReplicaAssignment(t.topic, controllerContext.topicIds.get(t.topic), t.assignment)
}

withTopicIds ++ zkClient.setTopicIds(topicIdsForZkUpdate, controllerContext.epochZkVersion)
} else {
topicIdAssignments
}
Expand Down Expand Up @@ -1698,7 +1708,7 @@ class KafkaController(val config: KafkaConfig,
private def processPartitionModifications(topic: String): Unit = {
def restorePartitionReplicaAssignment(
topic: String,
newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]
newPartitionReplicaAssignment: Set[(TopicPartition, ReplicaAssignment)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we make this a set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we changed the call to Zk from zkClient.getFullReplicaAssignmentForTopics to zkClient.getReplicaAssignmentAndTopicIdForTopics since the former dud not provide the topic Id. As a result of this change we needed to use flatMap at line 1730 and hence it results in a Set.

We can work on improving the readibility as you suggested once we have a consensus on whether we even want to make this change at all.

): Unit = {
info("Restoring the partition replica assignment for topic %s".format(topic))

Expand All @@ -1716,27 +1726,25 @@ class KafkaController(val config: KafkaConfig,
}

if (!isActive) return
val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
}
val partitions = zkClient.getReplicaAssignmentAndTopicIdForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitions.flatMap(_.assignment).filter(t => controllerContext.partitionReplicaAssignment(t._1).isEmpty)

if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {
if (partitionsToBeAdded.nonEmpty) {
warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))

restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
restorePartitionReplicaAssignment(topic, partitions.flatMap(_.assignment))
} else {
// This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
info("Ignoring partition change during topic deletion as no new partitions are added")
}
} else if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
}
onNewPartitionCreation(partitionsToBeAdded.keySet)
processTopicIds(partitions)
partitionsToBeAdded.foreach(tuple => controllerContext.updatePartitionFullReplicaAssignment(tuple._1, tuple._2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think switching this to "tuple" makes it harder to read and less clear what the variables are.


onNewPartitionCreation(partitionsToBeAdded.map(_._1))
}
}

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,13 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
val updatedAssignments = topicIdReplicaAssignments.map {
case TopicIdReplicaAssignment(topic, None, assignments) =>
TopicIdReplicaAssignment(topic, Some(Uuid.randomUuid()), assignments)
case TopicIdReplicaAssignment(topic, Some(_), _) =>
throw new IllegalArgumentException("TopicIdReplicaAssignment for " + topic + " already contains a topic ID.")
case t => t
divijvaidya marked this conversation as resolved.
Show resolved Hide resolved
}.toSet

val setDataRequests = updatedAssignments.map { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicIdOpt, assignments), ZkVersion.MatchAnyVersion)
}.toSeq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spacing

retryRequestsUntilConnected(setDataRequests, expectedControllerEpochZkVersion)
updatedAssignments
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_6_IV0, IBP_2_7_IV0, IBP_3_2_IV0}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.log4j.Level
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
Expand Down Expand Up @@ -300,12 +300,63 @@ class ControllerIntegrationTest extends QuorumTestHarness {
tp0 -> ReplicaAssignment(Seq(0), Seq(), Seq()),
tp1 -> ReplicaAssignment(Seq(0), Seq(), Seq()))
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion)

// Newer clients (>=2.8) will fetch the topic IDs from Zk and use them with expanded assignment,
// see AdminZkClient#writeTopicPartitionAssignment()
val topicIds = zkClient.getTopicIdsForTopics(Set(tp0.topic))
zkClient.setTopicAssignment(tp0.topic, topicIds.get(tp0.topic), expandedAssignment, firstControllerEpochZkVersion)
waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitForPartitionMetadata(servers, tp1.topic, tp1.partition)
}

/**
* This tests a scenario where client < 2.8 are used to increase the partitions for a topic by directly modifying
* assignment at Zk. This direct modification overwrites existing topicIds and replaces them with empty.
*
* This test verifies that the topicId in Zk is rebuilt by the controller as long as controller failover doesn't
* happen. We currently don't handle the scenario when controller failover occurs between Zk overwriting the topicId
* and controller updating it with it's locally stored value.
*/
@Test
def testTopicPartitionExpansionWithOlderClients(): Unit = {
val tp0 = new TopicPartition("t", 0)
val tp1 = new TopicPartition("t", 1)
val initialAssignment = Map(tp0.partition -> Seq(0))
val expandedAssignment = Map(
tp0 -> ReplicaAssignment(Seq(0), Seq(), Seq()),
tp1 -> ReplicaAssignment(Seq(0), Seq(), Seq()))

servers = makeServers(1)

TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = initialAssignment, servers = servers)
val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp0.topic())).get(tp0.topic())
val controllerDuringTopicCreation = getController()
val topicIdStoredByController = getController().kafkaController.controllerContext.topicIds.get(tp0.topic)

// Note that topic ID for the topic should be non-empty
assertTrue(topicIdAfterCreate.nonEmpty)
assertEquals(topicIdAfterCreate, topicIdStoredByController, "topic ID stored in Zk does not match with topicID in controller")

// This mimics a TopicCommand.ZookeeperTopicService# call from a client <2.8 which passes an empty value for topicId.
zkClient.setTopicAssignment(tp0.topic, /*empty topic Id*/ Option.empty, expandedAssignment, firstControllerEpochZkVersion)

waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")

// The assertions of this test is only valid if controller hasn't failed over
if (controllerDuringTopicCreation.config.brokerId == getController().config.brokerId) {
// verify that the topic Id stored in Zk after the expansion is same as the one which was assigned on topic creation
val topicIdAfterPartitionExpansion = zkClient.getTopicIdsForTopics(Set(tp0.topic())).get(tp0.topic())
assertTrue(topicIdAfterPartitionExpansion.nonEmpty, "TopicId should not be empty, controller should have re-created it.")
assertEquals(topicIdAfterCreate, topicIdAfterPartitionExpansion, "TopicId has changed for the topic after partition expansion")

TestUtils.waitForPartitionMetadata(servers, tp1.topic, tp1.partition)
} else {
fail("Controller failover occurred during this test which is not expected. Please re-try the test.")
}
}

@Test
def testTopicPartitionExpansionWithOfflineReplica(): Unit = {
servers = makeServers(2)
Expand All @@ -320,7 +371,10 @@ class ControllerIntegrationTest extends QuorumTestHarness {
TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
zkClient.setTopicAssignment(tp0.topic, Some(Uuid.randomUuid()), expandedAssignment, firstControllerEpochZkVersion)
// Newer clients (>=2.8) will fetch the topic IDs from Zk and use them with expanded assignment,
// see AdminZkClient#writeTopicPartitionAssignment()
val topicIds = zkClient.getTopicIdsForTopics(Set(tp0.topic))
zkClient.setTopicAssignment(tp0.topic, topicIds.get(tp0.topic), expandedAssignment, firstControllerEpochZkVersion)
waitForPartitionState(tp1, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch,
"failed to get expected partition state upon topic partition expansion")
TestUtils.waitForPartitionMetadata(Seq(servers(controllerId)), tp1.topic, tp1.partition)
Expand Down