New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12403; Ensure local state deleted on RemoveTopicRecord
received
#10252
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thanks for this fix. a couple of comments. Please take a look.
core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Outdated
Show resolved
Hide resolved
record.asInstanceOf[RemoveTopicRecord]) | ||
case QUOTA_RECORD => handleQuotaRecord(imageBuilder, | ||
record.asInstanceOf[QuotaRecord]) | ||
// TODO: handle FEATURE_LEVEL_RECORD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this TODO no longer relevant?
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji thanks for your response and updates. Left some questions for this PR.
} | ||
newNameMap.put(partition.topicName, newPartitionMap) | ||
} | ||
|
||
private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = { | ||
if (partition.isReplicaFor(brokerId)) { | ||
val currentTopicId = newReverseIdMap.get(partition.topicName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that newReverseIdMap
has no related id? For example, PartitionRecord
is processed before TopicRecord
or TopicRecord
was discarded (due to error)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For another, could it be replaced by prevPartitions.contains(partition.topicName)
? It seems all we want to check is the existence of topic name in previous image.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent is to only return the change in _localRemoved
if the topic existed in the previous image. If we only check topic name, then successive deletions and recreations might leave some partitions in _localRemoved
that were not in the previous image.
It's worth noting that this is strictly more defensive than the current replay logic requires. A new image is built for each batch of records from the controller, and we would never see a topic deleted and recreated (or vice versa) in the same batch. This is an implicit contract though and not protected by the builder API, so I thought we might as well try to make the logic more resilient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for nice explanation. I have another question for this check. It seems to me three collections in prevPartitions
should be consistent. For example: a topic which exists one of collection should also exists in other two (vice versa). If above comment is right, why we need this if-else
? Calling prevPartitions.contains(partition.topicName)
appears to be enough?
core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
Outdated
Show resolved
Hide resolved
I am working on some integration tests here. Hopefully then we can wrap this up. |
Well, I was going to write some integration tests, but it seems we are still awaiting some infrastructure for that. I tested it out manually and it works correctly. I see the brokers deleting the partition data as expected. I tested deletion as well as recreation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji thanks for updating code. a couple of comments are left. Otherwise, LGTM.
core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for this nice patch. LGTM. one trivial question is left.
import scala.collection.mutable | ||
import scala.jdk.CollectionConverters._ | ||
|
||
class BrokerMetadataListenerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems not all messages are covered in this class? If so, is there a jira to complete those test cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I filed https://issues.apache.org/jira/browse/KAFKA-12437.
…ed (#10252) This patch implements additional handling logic for `RemoveTopic` records: - Update `MetadataPartitions` to ensure addition of deleted partitions to `localRemoved` set - Ensure topic configs are removed from `ConfigRepository` - Propagate deleted partitions to `GroupCoordinator` so that corresponding offset commits can be removed This patch also changes the controller topic id generation logic to use `Uuid.randomUuid` rather than `Random`. Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This patch implements additional handling logic for
RemoveTopic
records:MetadataPartitions
to ensure addition of deleted partitions tolocalRemoved
setConfigRepository
GroupCoordinator
so that corresponding offset commits can be removedThis patch also changes the controller topic id generation logic to use
Uuid.randomUuid
rather thanRandom
.Committer Checklist (excluded from commit message)