Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception #8479

Merged
merged 12 commits into from Jul 6, 2020
23 changes: 21 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Expand Up @@ -29,6 +29,7 @@ import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import kafka.zookeeper.ZooKeeperClientException
import org.apache.kafka.common.errors._
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
Expand Down Expand Up @@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = addingReplicas,
removingReplicas = removingReplicas
)
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
try {
this.createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
andrewchoi5 marked this conversation as resolved.
Show resolved Hide resolved
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.info(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
s"state change for the partition with leader epoch: $leaderEpoch ", e)
error(s"ZooKeeper client error occurred while this partition was becoming the leader for $topicPartition.", e)
andrewchoi5 marked this conversation as resolved.
Show resolved Hide resolved

return false
}

val leaderLog = localLogOrException
val leaderEpochStartOffset = leaderLog.logEndOffset
Expand Down Expand Up @@ -570,7 +580,16 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
)
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
try {
this.createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.info(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
error(s"ZooKeeper client error occurred while this partition was becoming the follower for $topicPartition.", e)

return false
}

val followerLog = localLogOrException
val leaderEpochEndOffset = followerLog.logEndOffset
Expand Down