-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller #4668
KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller #4668
Conversation
@onurkaraman Can you please take a look at this patch? Thanks. |
@onurkaraman @junrao Can you please take a look at this patch? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gitlw Thanks for the patch. Sorry for the delay. A few comments below.
.getOrElse(topicPartition.partition, Seq.empty) | ||
} | ||
|
||
def clearPartitionReplicaAssignment() = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, for methods with no return value, we want to specify Unit =. Also, should we make this method more general to clear other fields such as allTopics, partitionsBeingReassigned and replicasOnOfflineDirs, instead of letting the caller do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the return type Unit and converted the clearing method to clear other topic states.
.put(topicPartition.partition, newReplicas) | ||
} | ||
|
||
def partitionReplicaAssignmentForTopic(topic : String) : mutable.Map[TopicPartition, Seq[Int]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better not to return a mutable map.
} | ||
} | ||
|
||
def removePartitionReplicaAssignmentForTopic(topic : String) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this is the same method as removeTopic? The latter seems more complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this method and replaced it with removeTopic instead.
@@ -100,7 +140,7 @@ class ControllerContext { | |||
|
|||
def removeTopic(topic: String) = { | |||
partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicPartition, _) => topicPartition.topic != topic } | |||
partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicPartition, _) => topicPartition.topic != topic } | |||
partitionReplicaAssignmentUnderlying.remove(topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also update replicasOnOfflineDirs accordingly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should update replicasOnOfflineDirs accordingly. However I don't want to simply iterate through the current replicasOnOfflineDirs to filter out the topic since doing that will be very slow in a large cluster.
Since clearing the replicasOnOfflineDirs is an orthogonal change, I'll think more about whether it's important and if so how to do it. I feel a separate PR is probably better. Comment?
partitionReplicaAssignmentUnderlying = mutable.Map.empty | ||
} | ||
|
||
def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas : Seq[Int]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, we want to remove the space before :.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
}.keySet.map(_.topic) | ||
val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => { | ||
val replicasForTopic = controllerContext.replicasForTopic(topic) | ||
replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, new TopicPartition(topic, r.partition))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we could just use r.topicPartition instead of creating a new TopicPartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
@@ -1155,11 +1161,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti | |||
if (!isActive) { | |||
0 | |||
} else { | |||
controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) => | |||
controllerContext.allPartitions.count { topicAndPartition => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicAndPartition => topicPartition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
@@ -1312,14 +1320,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti | |||
if (!isActive) return | |||
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) | |||
val partitionsToBeAdded = partitionReplicaAssignment.filter(p => | |||
!controllerContext.partitionReplicaAssignment.contains(p._1)) | |||
controllerContext.partitionReplicaAssignment(p._1).isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can change p to case (topicPartion, _) and use topicPartition instead of p._1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's better.
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) | ||
error(s"Skipping adding partitions ${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " + | ||
"since it is currently being deleted") | ||
else { | ||
if (partitionsToBeAdded.nonEmpty) { | ||
info(s"New partitions to be added $partitionsToBeAdded") | ||
controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded | ||
partitionsToBeAdded.foreach { case (topicAndPartition, assignedReplicas) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicAndPartition => topicPartition ?
@ijuma Yes, sorry about the delay. I'll update this PR soon. |
@junrao Can you please take another look? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gitlw : Thanks for the updated patch. Just a couple of more minor comments.
.getOrElse(topicPartition.partition, Seq.empty) | ||
} | ||
|
||
def clearTopicsState(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty | ||
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty | ||
|
||
private var liveBrokersUnderlying: Set[Broker] = Set.empty | ||
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty | ||
|
||
def partitionReplicaAssignment(topicPartition: TopicPartition) : Seq[Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we remove the space before : Seq[Int] and some other places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gitlw : Thanks for the patch. LGTM
This patch tries to speed up the inefficient functions identified in Kafka-6630 by grouping partitions in the ControllerContext.partitionReplicaAssignment variable by topics. Hence trying to find all replicas for a topic won't need to go through all the replicas in the cluster.
Passed all tests using "gradle testAll"
Committer Checklist (excluded from commit message)