Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5107: remove preferred replica election state from ControllerContext #2927

Closed
wants to merge 3 commits into from

Conversation

onurkaraman
Copy link
Contributor

KAFKA-5028 moves the controller to a single-threaded model, so we would no longer have work interleaved between preferred replica leader election, meaning we don't need to keep its state.

This patch additionally addresses a bug from KAFKA-5028 where it made onPreferredReplicaElection keep the line calling topicDeletionManager.markTopicIneligibleForDeletion but removes the line calling topicDeletionManager.resumeDeletionForTopics

…ntext

KAFKA-5028 moves the controller to a single-threaded model, so we would no longer have work interleaved between preferred replica leader election, meaning we don't need to keep its state.

This patch additionally addresses a bug from KAFKA-5028 where it made onPreferredReplicaElection keep the line calling topicDeletionManager.markTopicIneligibleForDeletion but removes the line calling topicDeletionManager.resumeDeletionForTopics
@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3222/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3229/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3236/
Test PASSed (JDK 8 and Scala 2.11).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM. I'll wait for @junrao to take a look as well.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman : Thanks for the patch. Left a couple of comments.

val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case (_, replicas) =>
replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
val topicsForWhichPreferredReplicaElectionIsInProgress = pendingPreferredReplicaElections.map(_.topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments. (1) I think topic deletion should have higher priority than preferred replication election. So, if a topic is marked for deletion, we should stop all pending preferred leader election, instead of letting that block the topic deletion. (2) It seems that we don't need to include topicsWithReplicasOnDeadBrokers in topicsIneligibleForDeletion. The same logic is already done in TopicDeletionManager.startReplicaDeletion() when the topic deletion is resumed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed (1) but I'm not sure about the impact of (2) so I left it as is for now.

@@ -248,6 +247,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState

initializeControllerContext()

val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress(pendingPreferredReplicaElections)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably needs to be computed after maybeTriggerPartitionReassignment(), which may complete some of the partition reassignment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR's refactoring of TopicDeletionManager's construction may have complicated things a bit.

Positioning these lines is a bit tricky. We want to load up topic deletion state into TopicDeletionManager prior to calling PartitionStateMachine.startup, as triggerOnlinePartitionStateChange checks if !controller.topicDeletionManager.isTopicQueuedUpForDeletion(topicAndPartition.topic).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. One way to do that is to de-couple the initialization of the state in TopicDeletionManager from the handling of topic deletion. So, the sequence will be
(1) TopicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion) // don't call resumeDeletions.
(2) replicaStateMachine.startup()
(3) partitionStateMachine.startup()
(4) maybeTriggerPartitionReassignment()
(5) topicDeletionManager.resumeDeletions().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the solution I had been working on. Glad to see we're on the same page!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

… and prioritize deletions over preferred replica leader elections
@asfbot
Copy link

asfbot commented May 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3340/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3335/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3331/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman : Thanks for the update. Just one minor comment.

@@ -84,6 +83,12 @@ class TopicDeletionManager(controller: KafkaController) extends Logging {
}
}

def start(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having init(), start(), shutdown() together can be a bit confusing. How about rename them to init(), tryTopicDeletion() and reset()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completely agree. updated the pr.

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3349/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3340/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3344/
Test FAILed (JDK 7 and Scala 2.10).

@onurkaraman
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3352/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3347/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3343/
Test FAILed (JDK 8 and Scala 2.12).

@junrao
Copy link
Contributor

junrao commented May 2, 2017

@onurkaraman : Thanks for the patch. LGTM

@asfgit asfgit closed this in b2fcf73 May 2, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants