New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14617: Fill broker epochs to the AlterPartitionRequest #13489
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CalvinConfluent : Thanks for the PR. A few comments below.
// 1. It is not fenced. | ||
// 2. It is not in controlled shutdown. | ||
// 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch | ||
// request broker epoch is -1 which bypass the epoch verification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bypass => bypasses
@@ -992,6 +1003,11 @@ class Partition(val topicPartition: TopicPartition, | |||
} | |||
} | |||
|
|||
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = { | |||
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined && | |||
(storedBrokerEpoch.get == -1 || (storedBrokerEpoch == cachedBrokerEpoch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for the second nested brackets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of storedBrokerEpoch and cachedBrokerEpoch are both null, it should return false. Without the second bracket, it will return true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just meant that could we change the code to the following?
(storedBrokerEpoch.get == -1 || storedBrokerEpoch == cachedBrokerEpoch)
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
Outdated
Show resolved
Hide resolved
@@ -1367,6 +1368,10 @@ class PartitionTest extends AbstractPartitionTest { | |||
assertEquals(alterPartitionManager.isrUpdates.size, 1) | |||
val isrItem = alterPartitionManager.isrUpdates.head | |||
assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId)) | |||
isrItem.leaderAndIsr.isrWithBrokerEpoch.foreach(brokerState => { | |||
// In ZK mode, the broker epochs in the leaderAndIsr should be -1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we know this is ZK mode only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the PartitionTest, if the test uses default partition, then it is implemented with ZK metadata cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CalvinConfluent : Thanks for the updated PR. A few more comments.
core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
Outdated
Show resolved
Hide resolved
@@ -992,6 +1003,11 @@ class Partition(val topicPartition: TopicPartition, | |||
} | |||
} | |||
|
|||
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = { | |||
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined && | |||
(storedBrokerEpoch.get == -1 || (storedBrokerEpoch == cachedBrokerEpoch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just meant that could we change the code to the following?
(storedBrokerEpoch.get == -1 || storedBrokerEpoch == cachedBrokerEpoch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CalvinConfluent : Thanks for the latest PR. LGTM
As the third part of the KIP-903, it fills the broker epochs from the Fetch request into the AlterPartitionRequest. Also, before generating the alterPartitionRequest, the partition will check whether the broker epoch from the FetchRequest matches with the broker epoch recorded in the metadata cache. If not, the ISR change will be delayed.
https://issues.apache.org/jira/browse/KAFKA-14617