From f2266e42b1d3fc7058574f16b0c0d8de08b3a3f6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Jan 2016 15:23:19 +0000 Subject: [PATCH 1/5] Update leader to -1 before throwing `NoReplicaOnlineException` Suggested by Guozhang. --- .../scala/kafka/controller/KafkaController.scala | 2 +- .../kafka/controller/PartitionLeaderSelector.scala | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 103f6cf575d4..3a9baac38da0 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -167,7 +167,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) var deleteTopicManager: TopicDeletionManager = null - val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) + val offlinePartitionSelector = new OfflinePartitionLeaderSelector(zkUtils, controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 5eed3829ff3c..a9b13cc9e3a3 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -19,7 +19,7 @@ package kafka.controller import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr import kafka.log.LogConfig -import kafka.utils.Logging +import kafka.utils.{ZkUtils, ReplicationUtils, Logging} import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.server.{ConfigType, KafkaConfig} @@ -63,9 +63,15 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi // for unclean leader election. if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { - throw new NoReplicaOnlineException(("No broker in ISR for partition " + - "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + - " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) + var zkUpdateSucceeded = false + while (!zkUpdateSucceeded) { + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topicAndPartition.topic, + topicAndPartition.partition, currentLeaderAndIsr.copy(leader = -1), controllerContext.epoch, + currentLeaderAndIsr.zkVersion) + zkUpdateSucceeded = updateSucceeded + } + throw new NoReplicaOnlineException(s"No broker in ISR for partition $topicAndPartition is alive." + + s" Live brokers are: [${liveBrokerIds.mkString(",")}], ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]") } debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" From a8cbf88b37665c9669f0e9beae608794d82a810c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 8 Jan 2016 16:48:47 +0000 Subject: [PATCH 2/5] Fix bug in `waitUntilLeaderIsElectedOrChanged` and simplify result type The bug was for the following case: `leader.isDefined && oldLeaderOpt.isEmpty && newLeaderOpt.isDefined && newLeaderOpt.get != leader.get` We would consider it a successful election even though we should not. I also changed the result type is we never return `None` (we throw an exception instead). --- .../kafka/api/BaseProducerSendTest.scala | 7 +-- .../kafka/api/ProducerBounceTest.scala | 1 - .../integration/kafka/api/QuotasTest.scala | 5 +- .../unit/kafka/admin/AddPartitionsTest.scala | 16 ++--- .../scala/unit/kafka/admin/AdminTest.scala | 4 +- .../unit/kafka/admin/DeleteTopicTest.scala | 6 +- .../kafka/integration/RollingBounceTest.scala | 4 +- .../UncleanLeaderElectionTest.scala | 8 +-- .../kafka/producer/AsyncProducerTest.scala | 7 +-- .../kafka/server/LeaderElectionTest.scala | 26 ++++---- .../unit/kafka/server/LogRecoveryTest.scala | 14 ++--- .../scala/unit/kafka/utils/TestUtils.scala | 61 +++++++++++-------- 12 files changed, 79 insertions(+), 80 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 29291d410bd4..9d4de1fd4575 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -203,7 +203,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { // make sure leaders exist val leader1 = leaders(partition) - assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) val responses = for (i <- 1 to numRecords) @@ -221,7 +220,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } // make sure the fetched messages also respect the partitioning and ordering - val fetchResponse1 = if(leader1.get == configs(0).brokerId) { + val fetchResponse1 = if (leader1 == configs(0).brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) @@ -309,7 +308,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage) } } - val fetchResponse = if (leader0.get == configs(0).brokerId) { + val fetchResponse = if (leader0 == configs(0).brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) @@ -351,7 +350,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { producer.flush() assertTrue("All request are complete.", responses.forall(_.isDone())) // Check the messages received by broker. - val fetchResponse = if (leader.get == configs(0).brokerId) { + val fetchResponse = if (leader == configs(0).brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 369c3b7cd60c..62a132106d95 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -88,7 +88,6 @@ class ProducerBounceTest extends KafkaServerTestHarness { def testBrokerFailure() { val numPartitions = 3 val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers) - assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) val scheduler = new ProducerScheduler() scheduler.start diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 23be1208af10..957659cc0d76 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -85,9 +85,8 @@ class QuotasTest extends KafkaServerTestHarness { val numPartitions = 1 val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers) - leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1) - followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1) - assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) + leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1) + followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1) // Create consumers val consumerProps = new Properties diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 0fce611aef12..b351fef6f4bf 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -88,12 +88,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness { def testIncrementPartitions { AdminUtils.addPartitions(zkUtils, topic1, 3) // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2) + val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1) + val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2) val leader1FromZk = zkUtils.getLeaderForPartition(topic1, 1).get val leader2FromZk = zkUtils.getLeaderForPartition(topic1, 2).get - assertEquals(leader1.get, leader1FromZk) - assertEquals(leader2.get, leader2FromZk) + assertEquals(leader1, leader1FromZk) + assertEquals(leader2, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) @@ -114,12 +114,12 @@ class AddPartitionsTest extends ZooKeeperTestHarness { def testManualAssignmentOfReplicas { AdminUtils.addPartitions(zkUtils, topic2, 3, "1:2,0:1,2:3") // wait until leader is elected - var leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1) - var leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2) + val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1) + val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 2) val leader1FromZk = zkUtils.getLeaderForPartition(topic2, 1).get val leader2FromZk = zkUtils.getLeaderForPartition(topic2, 2).get - assertEquals(leader1.get, leader1FromZk) - assertEquals(leader2.get, leader2FromZk) + assertEquals(leader1, leader1FromZk) + assertEquals(leader2, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 3aa971b5d408..2e6539383040 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -319,11 +319,11 @@ class AdminTest extends ZooKeeperTestHarness with Logging { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first - val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None).get + val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = None) // trigger preferred replica election val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkUtils, Set(TopicAndPartition(topic, partition))) preferredReplicaElection.moveLeaderToPreferredReplica() - val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)).get + val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partition, oldLeaderOpt = Some(currentLeader)) assertEquals("Preferred replica election failed", preferredReplica, newLeader) servers.foreach(_.shutdown()) } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index d28ca698f396..f3245c825f97 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -188,8 +188,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // re-create topic on same replicas AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) // wait until leader is elected - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) - assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) // check if all replica logs are created TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") @@ -211,8 +210,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // test the topic path exists assertTrue("Topic test mistakenly deleted", zkUtils.pathExists(getTopicPath(topic))) // topic test should have a leader - val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) - assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000) servers.foreach(_.shutdown()) } diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index b9315684481b..0d5c571ae830 100755 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -86,9 +86,7 @@ class RollingBounceTest extends ZooKeeperTestHarness { servers((startIndex + 1) % 4).shutdown() prevLeader = (startIndex + 1) % 4 } - var newleader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) - // Ensure the new leader is different from the old - assertTrue("Leader transition did not happen for " + topic, newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader)) + waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(prevLeader)) // Start the server back up again servers(prevLeader).startup() } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 8e72ad34e6fe..786cd4fcba67 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -170,9 +170,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { def verifyUncleanLeaderElectionEnabled { // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get + val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) @@ -205,9 +203,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { def verifyUncleanLeaderElectionDisabled { // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get + val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 60d25881751f..afd664cca57d 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -30,14 +30,13 @@ import kafka.message._ import kafka.producer.async._ import kafka.serializer._ import kafka.server.KafkaConfig -import kafka.utils.TestUtils._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import kafka.utils._ class AsyncProducerTest { // One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks - val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534)) + val props = Seq(TestUtils.createBrokerConfig(1, "127.0.0.1:1", port = 65534)) val configs = props.map(KafkaConfig.fromProps) val brokerList = configs.map(c => org.apache.kafka.common.utils.Utils.formatAddress(c.hostName, c.port)).mkString(",") @@ -79,7 +78,7 @@ class AsyncProducerTest { @Test def testProduceAfterClosed() { val produceData = getProduceData(10) - val producer = createProducer[String, String]( + val producer = TestUtils.createProducer[String, String]( brokerList, encoder = classOf[StringEncoder].getName) @@ -301,7 +300,7 @@ class AsyncProducerTest { val props = new Properties() // no need to retry since the send will always fail props.put("message.send.max.retries", "0") - val producer= createProducer[String, String]( + val producer = TestUtils.createProducer[String, String]( brokerList = brokerList, encoder = classOf[DefaultEncoder].getName, keyEncoder = classOf[DefaultEncoder].getName, diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 704f77622a28..52e7f284d533 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -73,22 +73,21 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId) debug("leader Epoc: " + leaderEpoch1) - debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) - assertTrue("Leader should get elected", leader1.isDefined) + debug("Leader is elected to be: %s".format(leader1)) // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1) assertEquals("First epoch value should be 0", 0, leaderEpoch1) // kill the server hosting the preferred replica servers.last.shutdown() // check if leader moves to the other server val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, - oldLeaderOpt = if(leader1.get == 0) None else leader1) + oldLeaderOpt = if (leader1 == 0) None else Some(leader1)) val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId) - debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader is elected to be: %s".format(leader1)) debug("leader Epoc: " + leaderEpoch2) - assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1)) - if(leader1.get == leader2.get) + assertEquals("Leader must move to broker 0", 0, leader2) + if (leader1 == leader2) assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2) else assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2) @@ -97,12 +96,12 @@ class LeaderElectionTest extends ZooKeeperTestHarness { servers.head.shutdown() Thread.sleep(zookeeper.tickTime) val leader3 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, - oldLeaderOpt = if(leader2.get == 1) None else leader2) + oldLeaderOpt = if (leader2 == 1) None else Some(leader2)) val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId) debug("leader Epoc: " + leaderEpoch3) - debug("Leader is elected to be: %s".format(leader3.getOrElse(-1))) - assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1)) - if(leader2.get == leader3.get) + debug("Leader is elected to be: %s".format(leader3)) + assertEquals("Leader must return to 1", 1, leader3) + if (leader2 == leader3) assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3) else assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3) @@ -119,10 +118,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId) debug("leader Epoc: " + leaderEpoch1) - debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) - assertTrue("Leader should get elected", leader1.isDefined) + debug("Leader is elected to be: %s".format(leader1)) // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 0 or broker 1", leader1 == 0 || leader1 == 1) assertEquals("First epoch value should be 0", 0, leaderEpoch1) // start another controller diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index d11c40f3443d..5302a2327072 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -129,8 +129,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) // check if leader moves to the other server - leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader) - assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) + leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader)) + assertEquals("Leader must move to broker 1", 1, leader) // bring the preferred replica back server1.startup() @@ -139,7 +139,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0", - leader.isDefined && (leader.get == 0 || leader.get == 1)) + leader == 0 || leader == 1) assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet @@ -148,9 +148,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.startup() updateProducer() - leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader) + leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader)) assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1", - leader.isDefined && (leader.get == 0 || leader.get == 1)) + leader == 0 || leader == 1) sendMessages(1) hw += 1 @@ -201,8 +201,8 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.startup() updateProducer() // check if leader moves to the other server - leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader) - assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) + leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader)) + assertEquals("Leader must move to broker 1", 1, leader) assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ac053149fd11..e37027756a25 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -236,7 +236,7 @@ object TestUtils extends Logging { numPartitions: Int = 1, replicationFactor: Int = 1, servers: Seq[KafkaServer], - topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = { + topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = { // create topic AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig) // wait until the update metadata request for new topic reaches all servers @@ -252,7 +252,7 @@ object TestUtils extends Logging { * Return the leader for each partition. */ def createTopic(zkUtils: ZkUtils, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], - servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = { // create topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaAssignment) // wait until the update metadata request for new topic reaches all servers @@ -666,50 +666,63 @@ object TestUtils extends Logging { } /** - * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected. - * If oldLeaderOpt is defined, it waits until the new leader is different from the old leader. - * If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader. - * @return The new leader or assertion failure if timeout is reached. - */ + * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the leader of a partition is elected. + * If oldLeaderOpt is defined, it waits until the new leader is different from the old leader. + * If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader. + * + * @return The new leader (note that negative values are used to indicate conditions like NoLeader and + * LeaderDuringDelete) or assertion failure if timeout is reached. + */ def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int, timeoutMs: Long = 5000L, - oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = { + oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Int = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis() - var isLeaderElectedOrChanged = false trace("Waiting for leader to be elected or changed for partition [%s,%d], older leader is %s, new leader is %s" .format(topic, partition, oldLeaderOpt, newLeaderOpt)) var leader: Option[Int] = None - while (!isLeaderElectedOrChanged && System.currentTimeMillis() < startTime + timeoutMs) { + var electedOrChangedLeader: Option[Int] = None + while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) { // check if leader is elected leader = zkUtils.getLeaderForPartition(topic, partition) leader match { - case Some(l) => - if (newLeaderOpt.isDefined && newLeaderOpt.get == l) { + case Some(l) => (newLeaderOpt, oldLeaderOpt) match { + case (Some(newLeader), _) if newLeader == l => trace("Expected new leader %d is elected for partition [%s,%d]".format(l, topic, partition)) - isLeaderElectedOrChanged = true - } else if (oldLeaderOpt.isDefined && oldLeaderOpt.get != l) { + electedOrChangedLeader = leader + case (_, Some(oldLeader)) if oldLeader != l => trace("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition, oldLeaderOpt.get, l)) - isLeaderElectedOrChanged = true - } else if (!oldLeaderOpt.isDefined) { + electedOrChangedLeader = leader + case (None, None) => trace("Leader %d is elected for partition [%s,%d]".format(l, topic, partition)) - isLeaderElectedOrChanged = true - } else { + electedOrChangedLeader = leader + case _ => trace("Current leader for partition [%s,%d] is %d".format(topic, partition, l)) - } + } case None => trace("Leader for partition [%s,%d] is not elected yet".format(topic, partition)) } Thread.sleep(timeoutMs.min(100L)) } - if (!isLeaderElectedOrChanged) - fail("Timing out after %d ms since leader is not elected or changed for partition [%s,%d]" - .format(timeoutMs, topic, partition)) - - leader + electedOrChangedLeader.getOrElse { + val errorMessage = (newLeaderOpt, oldLeaderOpt) match { + case (Some(newLeader), _) => + s"Timing out after $timeoutMs ms since expected new leader $newLeader was not elected for partition [$topic,$partition], leader is ${leader.get}" + case (_, Some(oldLeader)) => + s"Timing out after $timeoutMs ms since leader did not change from $oldLeader for partition [$topic,$partition]" + case _ => + s"Timing out after $timeoutMs ms since leader was not elected for partition [$topic,$partition]" + } + fail(errorMessage) + } } + /** + * Like `Assert.fail`, but returns `Nothing` so that the compiler knows that this method always throws an exception. + */ + def fail(message: String): Nothing = throw new AssertionError(message) + /** * Execute the given block. If it throws an assert error, retry. Repeat * until no error is thrown or the time limit elapses From 21568618d8ac82d3f801307b0145baa75240b0a4 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Jan 2016 14:25:40 +0000 Subject: [PATCH 3/5] Various mechanical clean-ups --- .../kafka/controller/KafkaController.scala | 6 +- .../controller/PartitionStateMachine.scala | 17 ++---- .../controller/ReplicaStateMachine.scala | 3 +- .../scala/kafka/server/KafkaHealthcheck.scala | 1 - .../scala/kafka/utils/ReplicationUtils.scala | 3 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 13 +---- .../UncleanLeaderElectionTest.scala | 58 +++++++++---------- 7 files changed, 39 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3a9baac38da0..ff95b4afaecd 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -137,7 +137,7 @@ object KafkaController extends Logging { Json.parseFull(controllerInfoString) match { case Some(m) => val controllerInfo = m.asInstanceOf[Map[String, Any]] - return controllerInfo.get("brokerid").get.asInstanceOf[Int] + controllerInfo.get("brokerid").get.asInstanceOf[Int] case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString)) } } catch { @@ -146,7 +146,7 @@ object KafkaController extends Logging { warn("Failed to parse the controller info as json. " + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) try { - return controllerInfoString.toInt + controllerInfoString.toInt } catch { case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) } @@ -471,7 +471,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() // filter out the replicas that belong to topics that are being deleted - var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet) + val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet) val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) // handle dead replicas replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 73b173e57c28..f62c28297e25 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -26,7 +26,6 @@ import kafka.utils.{Logging, ReplicationUtils} import kafka.utils.ZkUtils._ import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException -import kafka.controller.Callbacks.CallbackBuilder import kafka.utils.CoreUtils._ /** @@ -117,8 +116,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { for((topicAndPartition, partitionState) <- partitionState if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) { if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) - handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector, - (new CallbackBuilder).build) + handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector) } brokerRequestBatch.sendRequestsToBrokers(controller.epoch) } catch { @@ -137,16 +135,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @param targetState The state that the partitions should be moved to */ def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState, - leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector, - callbacks: Callbacks = (new CallbackBuilder).build) { + leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) { info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(","))) try { brokerRequestBatch.newBatch() partitions.foreach { topicAndPartition => - handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) + handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector) } brokerRequestBatch.sendRequestsToBrokers(controller.epoch) - }catch { + } catch { case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions } @@ -176,8 +173,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @param targetState The end state that the partition should be moved to */ private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, - leaderSelector: PartitionLeaderSelector, - callbacks: Callbacks) { + leaderSelector: PartitionLeaderSelector) { val topicAndPartition = TopicAndPartition(topic, partition) if (!hasStarted.get) throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + @@ -187,7 +183,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { try { targetState match { case NewPartition => - // pre: partition did not exist before this assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) partitionState.put(topicAndPartition, NewPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") @@ -213,7 +208,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) // post: partition has a leader case OfflinePartition => - // pre: partition should be in New or Online state assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition) // should be called when the leader for a partition is no longer alive stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" @@ -221,7 +215,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitionState.put(topicAndPartition, OfflinePartition) // post: partition has no alive leader case NonExistentPartition => - // pre: partition should be in Offline state assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 8eba704eb950..924789df14c2 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -149,8 +149,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * * ReplicaDeletionSuccessful -> NonExistentReplica * -- remove the replica from the in memory partition replica assignment cache - - + * * @param partitionAndReplica The replica for which the state transition is invoked * @param targetState The end state that the replica should be moved to */ diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 928ff43e1193..165425929e3e 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -37,7 +37,6 @@ class KafkaHealthcheck(private val brokerId: Int, private val advertisedEndpoints: Map[SecurityProtocol, EndPoint], private val zkUtils: ZkUtils) extends Logging { - val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener def startup() { diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 4074c0f3b183..b64a65b90602 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -36,8 +36,7 @@ object ReplicationUtils extends Logging { val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId) val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - val updatePersistentPath: (Boolean, Int) = zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) - updatePersistentPath + zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) } def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicAndPartition]): Unit = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 7061333c422d..cb626bf251a8 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,10 +17,6 @@ package kafka.utils -import java.io.File -import java.net.URI -import java.security.URIParameter -import javax.security.auth.login.Configuration import java.util.concurrent.CountDownLatch import kafka.cluster._ @@ -50,7 +46,6 @@ import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback} import org.apache.zookeeper.CreateMode import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code -import org.apache.zookeeper.ZooDefs.Ids import org.apache.zookeeper.ZooKeeper object ZkUtils { @@ -173,7 +168,7 @@ class ZkUtils(val zkClient: ZkClient, def getAllBrokersInCluster(): Seq[Broker] = { val brokerIds = getChildrenParentMayNotExist(BrokerIdsPath).sorted - brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get) + brokerIds.map(_.toInt).flatMap(getBrokerInfo) } def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = { @@ -267,11 +262,6 @@ class ZkUtils(val zkClient: ZkClient, /** * Register brokers with v2 json format (which includes multiple endpoints). * This format also includes default endpoints for compatibility with older clients. - * @param zkClient - * @param id - * @param advertisedEndpoints - * @param timeout - * @param jmxPort */ def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) { val brokerIdPath = BrokerIdsPath + "/" + id @@ -755,7 +745,6 @@ class ZkUtils(val zkClient: ZkClient, * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker * or throws an exception if the broker dies before the query to zookeeper finishes * @param brokerId The broker id - * @param zkClient The zookeeper client connection * @return An optional Broker object encapsulating the broker metadata */ def getBrokerInfo(brokerId: Int): Option[Broker] = { diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 786cd4fcba67..cf1f5788a60b 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -44,8 +44,8 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { var configProps1: Properties = null var configProps2: Properties = null - var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig] - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var configs = Seq.empty[KafkaConfig] + var servers = Seq.empty[KafkaServer] val random = new Random() val topic = "topic" + random.nextLong @@ -64,9 +64,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { configProps2 = createBrokerConfig(brokerId2, zkConnect) for (configProps <- List(configProps1, configProps2)) { - configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) - configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) - configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) + configProps.put("controlled.shutdown.enable", enableControlledShutdown.toString) + configProps.put("controlled.shutdown.max.retries", "1") + configProps.put("controlled.shutdown.retry.backoff.ms", "1000") } // temporarily set loggers to a higher level so that tests run quietly @@ -78,7 +78,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @After override def tearDown() { - servers.foreach(server => shutdownServer(server)) + servers.foreach(shutdownServer) servers.foreach(server => CoreUtils.rm(server.config.logDirs)) // restore log levels @@ -93,9 +93,8 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { private def startBrokers(cluster: Seq[Properties]) { for (props <- cluster) { val config = KafkaConfig.fromProps(props) - val server = createServer(config) configs ++= List(config) - servers ++= List(server) + servers ++= List(createServer(config)) } } @@ -112,10 +111,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @Test def testUncleanLeaderElectionDisabled { - // disable unclean leader election - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) - startBrokers(Seq(configProps1, configProps2)) + val clusterProps = Seq(configProps1, configProps2) + clusterProps.foreach(_.put("unclean.leader.election.enable", "false")) + startBrokers(clusterProps) // create topic with 1 partition, 2 replicas, one on each broker AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) @@ -126,13 +124,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @Test def testUncleanLeaderElectionEnabledByTopicOverride { // disable unclean leader election globally, but enable for our specific test topic - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) - startBrokers(Seq(configProps1, configProps2)) + val clusterProps = Seq(configProps1, configProps2) + clusterProps.foreach(_.put("unclean.leader.election.enable", "false")) + startBrokers(clusterProps) // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", String.valueOf(true)) + topicProps.put("unclean.leader.election.enable", "true") AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), topicProps) @@ -142,9 +140,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @Test def testCleanLeaderElectionDisabledByTopicOverride { // enable unclean leader election globally, but disable for our specific test topic - configProps1.put("unclean.leader.election.enable", String.valueOf(true)) - configProps2.put("unclean.leader.election.enable", String.valueOf(true)) - startBrokers(Seq(configProps1, configProps2)) + val clusterProps = Seq(configProps1, configProps2) + clusterProps.foreach(_.put("unclean.leader.election.enable", "true")) + startBrokers(clusterProps) // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled val topicProps = new Properties() @@ -183,14 +181,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) + servers.filter(_.config.brokerId == followerId).map(shutdownServer) sendMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + servers.filter(_.config.brokerId == leaderId).map(shutdownServer) + servers.filter(_.config.brokerId == followerId).map(_.startup()) // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId)) @@ -204,26 +202,26 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { def verifyUncleanLeaderElectionDisabled { // wait until leader is elected val leaderId = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) - debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) - assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) + debug(s"Leader for $topic is elected to be: $leaderId") + assertTrue(s"Leader id is set to expected value for topic: $topic", leaderId == brokerId1 || leaderId == brokerId2) // the non-leader broker is the follower val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 - debug("Follower for " + topic + " is: %s".format(followerId)) + debug(s"Follower for $topic is: $followerId") sendMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) + servers.filter(_.config.brokerId == followerId).map(shutdownServer) sendMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + servers.filter(_.config.brokerId == leaderId).map(shutdownServer) + servers.filter(_.config.brokerId == followerId).map(_.startup()) // verify that unclean election to non-ISR follower does not occur waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(-1)) @@ -235,12 +233,12 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { assertEquals(List.empty[String], consumeAllMessages(topic)) // restart leader temporarily to send a successfully replicated message - servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) + servers.filter(_.config.brokerId == leaderId).map(_.startup()) waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId)) sendMessage(servers, topic, "third") waitUntilMetadataIsPropagated(servers, topic, partitionId) - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) + servers.filter(_.config.brokerId == leaderId).map(shutdownServer) // verify clean leader transition to ISR follower waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId)) From 7b608c44f17536274606654f6faa2cc45d890c3c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Jan 2016 15:46:50 +0000 Subject: [PATCH 4/5] Make logging more regular in `PartitionLeaderSelector.scala` and other minor clean-ups --- .../controller/PartitionLeaderSelector.scala | 94 +++++++++---------- 1 file changed, 45 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index a9b13cc9e3a3..441ac7f0d75f 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -44,21 +44,21 @@ trait PartitionLeaderSelector { * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException * Replicas to receive LeaderAndIsr request = live assigned replicas - * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ -class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) +class OfflinePartitionLeaderSelector(zkUtils: ZkUtils, controllerContext: ControllerContext, config: KafkaConfig) extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some(assignedReplicas) => - val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) - val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) + val liveBrokerIds = controllerContext.liveBrokerIds + val liveAssignedReplicas = assignedReplicas.filter(liveBrokerIds.contains) + val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(liveBrokerIds.contains) val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion - val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { - case true => + val newLeaderAndIsr = + if (liveBrokersInIsr.isEmpty) { // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // for unclean leader election. if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils, @@ -74,31 +74,31 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi s" Live brokers are: [${liveBrokerIds.mkString(",")}], ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]") } - debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" - .format(topicAndPartition, liveAssignedReplicas.mkString(","))) - liveAssignedReplicas.isEmpty match { - case true => - throw new NoReplicaOnlineException(("No replica for partition " + - "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + - " Assigned replicas are: [%s]".format(assignedReplicas)) - case false => - ControllerStats.uncleanLeaderElectionRate.mark() - val newLeader = liveAssignedReplicas.head - warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." - .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) - new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) + debug(s"No broker in ISR is alive for $topicAndPartition. Pick the leader from the live assigned replicas:" + + s" [${liveAssignedReplicas.mkString(",")}]") + liveAssignedReplicas.headOption.map { newLeader => + ControllerStats.uncleanLeaderElectionRate.mark() + warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live brokers" + + s" [${liveAssignedReplicas.mkString(",")}]. There's potential data loss.") + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) + }.getOrElse { + throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live brokers" + + s" are: [${liveBrokerIds.mkString(",")}], Assigned replicas are: [${assignedReplicas.mkString(",")}]") } - case false => - val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) - val newLeader = liveReplicasInIsr.head - debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." - .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) + } + else { + val newLeader = liveAssignedReplicas.find(liveBrokersInIsr.contains).getOrElse { + throw new IllegalStateException(s"Could not find any broker in both live assigned replicas" + + s" [${liveAssignedReplicas.mkString}] and ISR [${liveBrokersInIsr.mkString(",")}]") + } + debug(s"Some broker in ISR is alive for $topicAndPartition. Select $newLeader from ISR" + + s" [${liveBrokersInIsr.mkString(",")}] to be the leader.") new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) - } - info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) + } + info(s"Selected new leader and ISR $newLeaderAndIsr for offline partition $topicAndPartition") (newLeaderAndIsr, liveAssignedReplicas) case None => - throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) + throw new NoReplicaOnlineException(s"Partition $topicAndPartition doesn't have replicas assigned to it") } } } @@ -120,18 +120,17 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)) - val newLeaderOpt = aliveReassignedInSyncReplicas.headOption - newLeaderOpt match { + aliveReassignedInSyncReplicas.headOption match { case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) case None => reassignedInSyncReplicas.size match { case 0 => - throw new NoReplicaOnlineException("List of reassigned replicas for partition " + - " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) + throw new NoReplicaOnlineException("List of reassigned replicas for partition" + + s" $topicAndPartition is empty. Current leader and ISR: [$currentLeaderAndIsr]") case _ => - throw new NoReplicaOnlineException("None of the reassigned replicas for partition " + - "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) + throw new NoReplicaOnlineException("None of the reassigned replicas for partition" + + s" $topicAndPartition are in-sync with the leader. Current leader and ISR: [$currentLeaderAndIsr]") } } } @@ -152,18 +151,18 @@ with Logging { // check if preferred replica is the current leader val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if (currentLeader == preferredReplica) { - throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" - .format(preferredReplica, topicAndPartition)) + throw new LeaderElectionNotNeededException(s"Preferred replica $preferredReplica is already the current leader" + + s" for partition $topicAndPartition") } else { - info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + - " Trigerring preferred replica leader election") + info(s"Current leader $currentLeader for partition $topicAndPartition is not the preferred replica." + + " Triggering preferred replica leader election") // check if preferred replica is not the current leader and is alive and in the isr if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1), assignedReplicas) } else { - throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + - "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) + throw new StateChangeFailedException(s"Preferred replica $preferredReplica for partition $topicAndPartition" + + s" is either not alive or not in the ISR. Current leader and ISR: [$currentLeaderAndIsr]") } } } @@ -188,19 +187,16 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds - val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) + val liveAssignedReplicas = assignedReplicas.filter(liveOrShuttingDownBrokerIds.contains) - val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) - val newLeaderOpt = newIsr.headOption - newLeaderOpt match { + val newIsr = currentLeaderAndIsr.isr.filter(!controllerContext.shuttingDownBrokerIds.contains(_)) + newIsr.headOption match { case Some(newLeader) => - debug("Partition %s : current leader = %d, new leader = %d" - .format(topicAndPartition, currentLeader, newLeader)) - (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), - liveAssignedReplicas) + debug(s"Partition $topicAndPartition : current leader = $currentLeader, new leader = $newLeader") + (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => - throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + - " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) + throw new StateChangeFailedException(s"No other replicas in ISR [${currentLeaderAndIsr.isr.mkString(",")}] for" + + s" $topicAndPartition besides shutting down brokers [${controllerContext.shuttingDownBrokerIds.mkString(",")}]") } } } From c20e2399396e465861e148073f20993d986c5e25 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 13 Jan 2016 17:28:54 +0000 Subject: [PATCH 5/5] Use `NoLeader` instead of `-1` As suggested by Guozhang. --- .../main/scala/kafka/controller/PartitionLeaderSelector.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 441ac7f0d75f..30e2768eb44e 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -66,7 +66,7 @@ class OfflinePartitionLeaderSelector(zkUtils: ZkUtils, controllerContext: Contro var zkUpdateSucceeded = false while (!zkUpdateSucceeded) { val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topicAndPartition.topic, - topicAndPartition.partition, currentLeaderAndIsr.copy(leader = -1), controllerContext.epoch, + topicAndPartition.partition, currentLeaderAndIsr.copy(leader = LeaderAndIsr.NoLeader), controllerContext.epoch, currentLeaderAndIsr.zkVersion) zkUpdateSucceeded = updateSucceeded }