From a731d18f404af815e2c7ee20585ef15ff3b9af22 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 17 Jan 2017 21:25:24 +0100 Subject: [PATCH 01/10] KAFKA-4596: add support for moving a subset of partitions with throttle engaged --- .../admin/ReassignPartitionsCommand.scala | 5 +++- .../scala/unit/kafka/admin/AdminTest.scala | 4 +-- .../admin/ReassignPartitionsCommandTest.scala | 25 +++++++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index dc707e56d23d3..f7bfd51f724a6 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -361,7 +361,9 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA private def postRebalanceReplicasThatMoved(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { //For each partition in the proposed list, filter out any replicas that exist now (i.e. are in the proposed list and hence are not moving) - existing.map { case (tp, current) => + existing + .filter{ case (tp, current) => proposed.contains(tp)} + .map { case (tp, current) => tp -> (proposed(tp).toSet -- current).toSeq } } @@ -369,6 +371,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { //Throttle all existing replicas (as any one might be a leader). So just filter out those which aren't moving existing.filter { case (tp, current) => + proposed.contains(tp) && (proposed(tp).toSet -- current).nonEmpty } } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index f9eb61c00aad7..1d7a6f7a22fbd 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -167,7 +167,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @Test def testPartitionReassignmentWithLeaderInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) @@ -178,7 +178,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) - assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions()) + assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions(1000L)) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala index 924daf8676749..914fbd41be45b 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -57,6 +57,31 @@ class ReassignPartitionsCommandTest extends Logging { assertEquals(1, calls) } + @Test + def shouldSupportProposedAsSubsetOfExisting() { + val assigner = new ReassignPartitionsCommand(null, null) + + //Given we have more existing partitions than we are proposing + val existingSuperset = Map( + TopicAndPartition("topic1", 0) -> Seq(100, 101), + TopicAndPartition("topic1", 1) -> Seq(100, 102), + TopicAndPartition("foo", 0) -> Seq(100, 102) + ) + val proposedSubset = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102)) + + + val mock = new TestAdminUtils { + override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = { + assertEquals("0:102", configChange.get(FollowerReplicationThrottledReplicasProp)) + assertEquals("0:100,0:101", configChange.get(LeaderReplicationThrottledReplicasProp)) + calls += 1 + } + } + + assigner.assignThrottledReplicas(existingSuperset, proposedSubset, mock) + assertEquals(1, calls) + } + @Test def shouldFindMovingReplicasMultiplePartitions() { val control = TopicAndPartition("topic1", 2) -> Seq(100, 102) From 3e523a544c9d32637672507c8533864d2c8046fe Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 18 Jan 2017 11:21:28 +0000 Subject: [PATCH 02/10] KAFKA-4596: comment --- .../scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala index 914fbd41be45b..83cf4a9308bc0 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -69,7 +69,6 @@ class ReassignPartitionsCommandTest extends Logging { ) val proposedSubset = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102)) - val mock = new TestAdminUtils { override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = { assertEquals("0:102", configChange.get(FollowerReplicationThrottledReplicasProp)) @@ -78,6 +77,7 @@ class ReassignPartitionsCommandTest extends Logging { } } + //Then replicas should assign correctly (based on the proposed map) assigner.assignThrottledReplicas(existingSuperset, proposedSubset, mock) assertEquals(1, calls) } From a7018d9a26d665c83a7585a0cb2eaa4dc9426739 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 18 Jan 2017 14:08:56 +0000 Subject: [PATCH 03/10] KAFKA-4596: Added richer functinoal test for reassigment. --- .../scala/unit/kafka/admin/AdminTest.scala | 55 ++++++++++++++++++- .../admin/ReassignPartitionsCommandTest.scala | 4 +- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 1d7a6f7a22fbd..6dbcf6fdc5cdd 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -36,7 +36,7 @@ import java.util import kafka.utils.TestUtils._ import kafka.admin.AdminUtils._ -import scala.collection.{Map, immutable} +import scala.collection.{Map, Seq, immutable} import kafka.utils.CoreUtils._ import org.apache.kafka.common.TopicPartition @@ -167,7 +167,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @Test def testPartitionReassignmentWithLeaderInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) @@ -178,7 +178,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { val partitionToBeReassigned = 0 val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, Map(topicAndPartition -> newReplicas)) - assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions(1000L)) + assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions()) // wait until reassignment is completed TestUtils.waitUntilTrue(() => { val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) @@ -257,6 +257,55 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { servers.foreach(_.shutdown()) } + @Test + def shouldPerformThrottledReassignmentOverVariousTopics() { + val throttle = 1000L + + //Given four brokers + TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf))) + + //With up several small topics + createTopic("orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))) + createTopic("payments", Map(0 -> List(0, 1), 1 -> List(0, 1))) + createTopic("deliveries", Map(0 -> List(0))) + createTopic("customers", Map(0 -> List(0), 1 -> List(1), 2 -> List(2), 3 -> List(3))) + + //Define a move for some of them + val move = Map( + TopicAndPartition("orders", 0) -> Seq(0, 2, 3),//moves + TopicAndPartition("orders", 1) -> Seq(0, 1, 2),//stays + TopicAndPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving + TopicAndPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor + ) + + //When we run a throttled reassignment + new ReassignPartitionsCommand(zkUtils, move).reassignPartitions(throttle) + + TestUtils.waitUntilTrue(() => { + val current = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) + move.forall { case (tp, reps) => + ReassignmentCompleted == ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, tp, move, current) + } + }, "Partition reassignment didn't complete within timeout") + + //Check moved replicas did move + assertEquals(zkUtils.getReplicasForPartition("orders", 0), Seq(0, 2, 3)) + assertEquals(zkUtils.getReplicasForPartition("orders", 1), Seq(0, 1, 2)) + assertEquals(zkUtils.getReplicasForPartition("payments", 1), Seq(1, 2)) + assertEquals(zkUtils.getReplicasForPartition("deliveries", 0), Seq(1, 2)) + + //Check untouched replicas are still there + assertEquals(zkUtils.getReplicasForPartition("payments", 0), Seq(0, 1)) + assertEquals(zkUtils.getReplicasForPartition("customers", 0), Seq(0)) + assertEquals(zkUtils.getReplicasForPartition("customers", 1), Seq(1)) + assertEquals(zkUtils.getReplicasForPartition("customers", 2), Seq(2)) + assertEquals(zkUtils.getReplicasForPartition("customers", 3), Seq(3)) + } + + def createTopic(topic: String, replicaAssignment: Map[Int, Seq[Int]]) = { + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment) + } + @Test def testReassigningNonExistingPartition() { val topic = "test" diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala index 83cf4a9308bc0..6c19a5f21d49e 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -58,14 +58,14 @@ class ReassignPartitionsCommandTest extends Logging { } @Test - def shouldSupportProposedAsSubsetOfExisting() { + def shouldFindMovingReplicasWhenProposedIsSubsetOfExisting() { val assigner = new ReassignPartitionsCommand(null, null) //Given we have more existing partitions than we are proposing val existingSuperset = Map( TopicAndPartition("topic1", 0) -> Seq(100, 101), TopicAndPartition("topic1", 1) -> Seq(100, 102), - TopicAndPartition("foo", 0) -> Seq(100, 102) + TopicAndPartition("topic2", 0) -> Seq(100, 101, 102) ) val proposedSubset = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102)) From 863c94fe12f086d0df925127bfe26f7693391212 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 18 Jan 2017 17:03:36 +0000 Subject: [PATCH 04/10] KAFKA-4596: Cleaned up logic --- .../admin/ReassignPartitionsCommand.scala | 18 ++++--- .../scala/unit/kafka/admin/AdminTest.scala | 49 ------------------- .../admin/ReassignPartitionsClusterTest.scala | 49 ++++++++++++++++++- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index f7bfd51f724a6..66a6db27c38cc 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -359,20 +359,22 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA } } + private def postRebalanceReplicasThatMoved(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { //For each partition in the proposed list, filter out any replicas that exist now (i.e. are in the proposed list and hence are not moving) - existing - .filter{ case (tp, current) => proposed.contains(tp)} - .map { case (tp, current) => - tp -> (proposed(tp).toSet -- current).toSeq + proposed.map { case (tp, proposedReplicas) => + tp -> (proposedReplicas.toSet -- existing(tp)).toSeq } } private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { - //Throttle all existing replicas (as any one might be a leader). So just filter out those which aren't moving - existing.filter { case (tp, current) => - proposed.contains(tp) && - (proposed(tp).toSet -- current).nonEmpty + //For any moving partition, throttle all the original (pre move) replicas (as any one might be a leader) + + def moving(postMoveReplicas: Seq[Int], preMoveReplicas: Seq[Int]) = + (postMoveReplicas.toSet -- preMoveReplicas.toSet).nonEmpty + + existing.filter { case (tp, preMoveReplicas) => + proposed.contains(tp) && moving(preMoveReplicas, proposed(tp)) } } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 6dbcf6fdc5cdd..f5b6214e8bdb3 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -257,55 +257,6 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { servers.foreach(_.shutdown()) } - @Test - def shouldPerformThrottledReassignmentOverVariousTopics() { - val throttle = 1000L - - //Given four brokers - TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf))) - - //With up several small topics - createTopic("orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))) - createTopic("payments", Map(0 -> List(0, 1), 1 -> List(0, 1))) - createTopic("deliveries", Map(0 -> List(0))) - createTopic("customers", Map(0 -> List(0), 1 -> List(1), 2 -> List(2), 3 -> List(3))) - - //Define a move for some of them - val move = Map( - TopicAndPartition("orders", 0) -> Seq(0, 2, 3),//moves - TopicAndPartition("orders", 1) -> Seq(0, 1, 2),//stays - TopicAndPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving - TopicAndPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor - ) - - //When we run a throttled reassignment - new ReassignPartitionsCommand(zkUtils, move).reassignPartitions(throttle) - - TestUtils.waitUntilTrue(() => { - val current = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas) - move.forall { case (tp, reps) => - ReassignmentCompleted == ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkUtils, tp, move, current) - } - }, "Partition reassignment didn't complete within timeout") - - //Check moved replicas did move - assertEquals(zkUtils.getReplicasForPartition("orders", 0), Seq(0, 2, 3)) - assertEquals(zkUtils.getReplicasForPartition("orders", 1), Seq(0, 1, 2)) - assertEquals(zkUtils.getReplicasForPartition("payments", 1), Seq(1, 2)) - assertEquals(zkUtils.getReplicasForPartition("deliveries", 0), Seq(1, 2)) - - //Check untouched replicas are still there - assertEquals(zkUtils.getReplicasForPartition("payments", 0), Seq(0, 1)) - assertEquals(zkUtils.getReplicasForPartition("customers", 0), Seq(0)) - assertEquals(zkUtils.getReplicasForPartition("customers", 1), Seq(1)) - assertEquals(zkUtils.getReplicasForPartition("customers", 2), Seq(2)) - assertEquals(zkUtils.getReplicasForPartition("customers", 3), Seq(3)) - } - - def createTopic(topic: String, replicaAssignment: Map[Int, Seq[Int]]) = { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment) - } - @Test def testReassigningNonExistingPartition() { val topic = "test" diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 95ee5d386ccae..abab1f7092c4f 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -16,12 +16,13 @@ import kafka.common.{AdminCommandFailedException, TopicAndPartition} import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.TestUtils._ import kafka.utils.ZkUtils._ -import kafka.utils.{CoreUtils, Logging, ZkUtils} +import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.junit.Assert.{assertEquals, assertTrue} import org.junit.{After, Before, Test} import kafka.admin.ReplicationQuotaUtils._ -import scala.collection.Seq + +import scala.collection.{Map, Seq} class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { @@ -246,6 +247,50 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}""") } + @Test + def shouldPerformThrottledReassignmentOverVariousTopics() { + val throttle = 1000L + + //Given four brokers + servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf))) + + //With up several small topics + createTopicWith("orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))) + createTopicWith("payments", Map(0 -> List(0, 1), 1 -> List(0, 1))) + createTopicWith("deliveries", Map(0 -> List(0))) + createTopicWith("customers", Map(0 -> List(0), 1 -> List(1), 2 -> List(2), 3 -> List(3))) + + //Define a move for some of them + val move = Map( + TopicAndPartition("orders", 0) -> Seq(0, 2, 3),//moves + TopicAndPartition("orders", 1) -> Seq(0, 1, 2),//stays + TopicAndPartition("payments", 1) -> Seq(1, 2), //only define one partition as moving + TopicAndPartition("deliveries", 0) -> Seq(1, 2) //increase replication factor + ) + + //When we run a throttled reassignment + new ReassignPartitionsCommand(zkUtils, move).reassignPartitions(throttle) + + waitForReassignmentToComplete() + + //Check moved replicas did move + assertEquals(zkUtils.getReplicasForPartition("orders", 0), Seq(0, 2, 3)) + assertEquals(zkUtils.getReplicasForPartition("orders", 1), Seq(0, 1, 2)) + assertEquals(zkUtils.getReplicasForPartition("payments", 1), Seq(1, 2)) + assertEquals(zkUtils.getReplicasForPartition("deliveries", 0), Seq(1, 2)) + + //Check untouched replicas are still there + assertEquals(zkUtils.getReplicasForPartition("payments", 0), Seq(0, 1)) + assertEquals(zkUtils.getReplicasForPartition("customers", 0), Seq(0)) + assertEquals(zkUtils.getReplicasForPartition("customers", 1), Seq(1)) + assertEquals(zkUtils.getReplicasForPartition("customers", 2), Seq(2)) + assertEquals(zkUtils.getReplicasForPartition("customers", 3), Seq(3)) + } + + def createTopicWith(topic: String, replicaAssignment: Map[Int, Seq[Int]]) = { + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment) + } + def waitForReassignmentToComplete() { waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode ${ZkUtils.ReassignPartitionsPath} wasn't deleted") } From 37d2da4e578ae2852cae8037a8ab7e2da607e30f Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 18 Jan 2017 17:06:38 +0000 Subject: [PATCH 05/10] KAFKA-4596: tidy --- .../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 66a6db27c38cc..b92bb4da9c125 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -368,11 +368,9 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA } private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { - //For any moving partition, throttle all the original (pre move) replicas (as any one might be a leader) - - def moving(postMoveReplicas: Seq[Int], preMoveReplicas: Seq[Int]) = - (postMoveReplicas.toSet -- preMoveReplicas.toSet).nonEmpty + def moving(before: Seq[Int], after: Seq[Int]) = (before.toSet -- after.toSet).nonEmpty + //For any moving partition, throttle all the original (pre move) replicas (as any one might be a leader) existing.filter { case (tp, preMoveReplicas) => proposed.contains(tp) && moving(preMoveReplicas, proposed(tp)) } From bba5492cf5ce05ae1d71290e9c1a255ba0813cfd Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 18 Jan 2017 17:09:54 +0000 Subject: [PATCH 06/10] KAFKA-4596: revert imports --- core/src/test/scala/unit/kafka/admin/AdminTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index f5b6214e8bdb3..f9eb61c00aad7 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -36,7 +36,7 @@ import java.util import kafka.utils.TestUtils._ import kafka.admin.AdminUtils._ -import scala.collection.{Map, Seq, immutable} +import scala.collection.{Map, immutable} import kafka.utils.CoreUtils._ import org.apache.kafka.common.TopicPartition From 3c136fc70a582748cd6e5deabfcba237de3f1adc Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 18 Jan 2017 17:13:47 +0000 Subject: [PATCH 07/10] KAFKA-4596: whitespace --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index b92bb4da9c125..357daf44d9382 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -359,7 +359,6 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA } } - private def postRebalanceReplicasThatMoved(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { //For each partition in the proposed list, filter out any replicas that exist now (i.e. are in the proposed list and hence are not moving) proposed.map { case (tp, proposedReplicas) => @@ -369,7 +368,6 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { def moving(before: Seq[Int], after: Seq[Int]) = (before.toSet -- after.toSet).nonEmpty - //For any moving partition, throttle all the original (pre move) replicas (as any one might be a leader) existing.filter { case (tp, preMoveReplicas) => proposed.contains(tp) && moving(preMoveReplicas, proposed(tp)) From e626187988830d11ff6c4c0750e8bfa2f5bd608a Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 18 Jan 2017 19:12:43 +0000 Subject: [PATCH 08/10] KAFKA-4596: one more test --- .../admin/ReassignPartitionsClusterTest.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index abab1f7092c4f..b0c78ca7bb14b 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -104,6 +104,42 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101)) } + @Test + def shouldMoveSubsetOfPartitions() { + //Given partitions on 3 of 3 brokers + val brokers = Array(100, 101, 102) + startBrokers(brokers) + createTopic(zkUtils, "topic1", Map( + 0 -> Seq(100, 101), + 1 -> Seq(101, 102), + 2 -> Seq(102, 100) + ), servers = servers) + createTopic(zkUtils, "topic2", Map( + 0 -> Seq(100, 101), + 1 -> Seq(101, 102), + 2 -> Seq(102, 100) + ), servers = servers) + + val proposed: Map[TopicAndPartition, Seq[Int]] = Map( + TopicAndPartition("topic1", 0) -> Seq(100, 102), + TopicAndPartition("topic1", 2) -> Seq(100, 102), + TopicAndPartition("topic2", 2) -> Seq(100, 102) + ) + + //When rebalancing + ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(proposed)) + waitForReassignmentToComplete() + + //Then the proposed changes should have been made + val actual = zkUtils.getPartitionAssignmentForTopics(Seq("topic1", "topic2")) + assertEquals(actual("topic1")(0), Seq(100, 102))//changed + assertEquals(actual("topic1")(1), Seq(101, 102)) + assertEquals(actual("topic1")(2), Seq(100, 102))//changed + assertEquals(actual("topic2")(0), Seq(100, 101)) + assertEquals(actual("topic2")(1), Seq(101, 102)) + assertEquals(actual("topic2")(2), Seq(100, 102))//changed + } + @Test def shouldExecuteThrottledReassignment() { //Given partitions on 3 of 3 brokers From dfb9d08e740b824018318e7d76df5b3a8c695474 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 25 Jan 2017 11:08:03 +0000 Subject: [PATCH 09/10] KAFKA-4596: ismeal's review feedback --- .../admin/ReassignPartitionsCommand.scala | 4 +- .../admin/ReassignPartitionsClusterTest.scala | 52 +++++++++---------- .../admin/ReassignPartitionsCommandTest.scala | 1 + 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 357daf44d9382..647bb90867828 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -360,14 +360,14 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA } private def postRebalanceReplicasThatMoved(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { - //For each partition in the proposed list, filter out any replicas that exist now (i.e. are in the proposed list and hence are not moving) + //For each partition in the proposed list, filter out any replicas that exist now, and hence aren't being moved. proposed.map { case (tp, proposedReplicas) => tp -> (proposedReplicas.toSet -- existing(tp)).toSeq } } private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { - def moving(before: Seq[Int], after: Seq[Int]) = (before.toSet -- after.toSet).nonEmpty + def moving(before: Seq[Int], after: Seq[Int]) = before.toSet != after.toSet //For any moving partition, throttle all the original (pre move) replicas (as any one might be a leader) existing.filter { case (tp, preMoveReplicas) => proposed.contains(tp) && moving(preMoveReplicas, proposed(tp)) diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index b0c78ca7bb14b..488c9ba794130 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -59,7 +59,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { waitForReassignmentToComplete() //Then the replica should be on 101 - assertEquals(zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition), Seq(101)) + assertEquals(Seq(101), zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition)) } @Test @@ -80,7 +80,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { //Then the replicas should span all three brokers val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName) - assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101, 102)) + assertEquals(Seq(100, 101, 102), actual.values.flatten.toSeq.distinct.sorted) } @Test @@ -101,7 +101,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { //Then replicas should only span the first two brokers val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName) - assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101)) + assertEquals( Seq(100, 101), actual.values.flatten.toSeq.distinct.sorted) } @Test @@ -132,12 +132,12 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { //Then the proposed changes should have been made val actual = zkUtils.getPartitionAssignmentForTopics(Seq("topic1", "topic2")) - assertEquals(actual("topic1")(0), Seq(100, 102))//changed - assertEquals(actual("topic1")(1), Seq(101, 102)) - assertEquals(actual("topic1")(2), Seq(100, 102))//changed - assertEquals(actual("topic2")(0), Seq(100, 101)) - assertEquals(actual("topic2")(1), Seq(101, 102)) - assertEquals(actual("topic2")(2), Seq(100, 102))//changed + assertEquals(Seq(100, 102), actual("topic1")(0))//changed + assertEquals(Seq(101, 102), actual("topic1")(1)) + assertEquals(Seq(100, 102), actual("topic1")(2))//changed + assertEquals(Seq(100, 101), actual("topic2")(0)) + assertEquals(Seq(101, 102), actual("topic2")(1)) + assertEquals(Seq(100, 102), actual("topic2")(2))//changed } @Test @@ -172,7 +172,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { //Check move occurred val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName) - assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(101, 102)) + assertEquals(Seq(101, 102), actual.values.flatten.toSeq.distinct.sorted) //Then command should have taken longer than the throttle rate assertTrue(s"Expected replication to be > ${expectedDurationSecs * 0.9 * 1000} but was $took", took > expectedDurationSecs * 0.9 * 1000) @@ -270,7 +270,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { //Check move occurred val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName) - assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(101, 102)) + assertEquals(Seq(101, 102), actual.values.flatten.toSeq.distinct.sorted) } @Test(expected = classOf[AdminCommandFailedException]) @@ -291,10 +291,10 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(conf => TestUtils.createServer(KafkaConfig.fromProps(conf))) //With up several small topics - createTopicWith("orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))) - createTopicWith("payments", Map(0 -> List(0, 1), 1 -> List(0, 1))) - createTopicWith("deliveries", Map(0 -> List(0))) - createTopicWith("customers", Map(0 -> List(0), 1 -> List(1), 2 -> List(2), 3 -> List(3))) + createTopic(zkUtils, "orders", Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)), servers) + createTopic(zkUtils, "payments", Map(0 -> List(0, 1), 1 -> List(0, 1)), servers) + createTopic(zkUtils, "deliveries", Map(0 -> List(0)), servers) + createTopic(zkUtils, "customers", Map(0 -> List(0), 1 -> List(1), 2 -> List(2), 3 -> List(3)), servers) //Define a move for some of them val move = Map( @@ -310,21 +310,17 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { waitForReassignmentToComplete() //Check moved replicas did move - assertEquals(zkUtils.getReplicasForPartition("orders", 0), Seq(0, 2, 3)) - assertEquals(zkUtils.getReplicasForPartition("orders", 1), Seq(0, 1, 2)) - assertEquals(zkUtils.getReplicasForPartition("payments", 1), Seq(1, 2)) - assertEquals(zkUtils.getReplicasForPartition("deliveries", 0), Seq(1, 2)) + assertEquals(Seq(0, 2, 3), zkUtils.getReplicasForPartition("orders", 0)) + assertEquals(Seq(0, 1, 2), zkUtils.getReplicasForPartition("orders", 1)) + assertEquals(Seq(1, 2), zkUtils.getReplicasForPartition("payments", 1)) + assertEquals(Seq(1, 2), zkUtils.getReplicasForPartition("deliveries", 0)) //Check untouched replicas are still there - assertEquals(zkUtils.getReplicasForPartition("payments", 0), Seq(0, 1)) - assertEquals(zkUtils.getReplicasForPartition("customers", 0), Seq(0)) - assertEquals(zkUtils.getReplicasForPartition("customers", 1), Seq(1)) - assertEquals(zkUtils.getReplicasForPartition("customers", 2), Seq(2)) - assertEquals(zkUtils.getReplicasForPartition("customers", 3), Seq(3)) - } - - def createTopicWith(topic: String, replicaAssignment: Map[Int, Seq[Int]]) = { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment) + assertEquals(Seq(0, 1), zkUtils.getReplicasForPartition("payments", 0)) + assertEquals(Seq(0), zkUtils.getReplicasForPartition("customers", 0)) + assertEquals(Seq(1), zkUtils.getReplicasForPartition("customers", 1)) + assertEquals(Seq(2), zkUtils.getReplicasForPartition("customers", 2)) + assertEquals(Seq(3), zkUtils.getReplicasForPartition("customers", 3)) } def waitForReassignmentToComplete() { diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala index 6c19a5f21d49e..ce96a0631378a 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -73,6 +73,7 @@ class ReassignPartitionsCommandTest extends Logging { override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = { assertEquals("0:102", configChange.get(FollowerReplicationThrottledReplicasProp)) assertEquals("0:100,0:101", configChange.get(LeaderReplicationThrottledReplicasProp)) + assertEquals("topic1", topic) calls += 1 } } From 0e2ac93f7dba6c0e2e257d6b3bd7e2839a32c4b2 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 25 Jan 2017 11:32:10 +0000 Subject: [PATCH 10/10] KAFKA-4596: ismeal's review feedback --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 647bb90867828..4e7b4e0c3937c 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -367,7 +367,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA } private def preRebalanceReplicaForMovingPartitions(existing: Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { - def moving(before: Seq[Int], after: Seq[Int]) = before.toSet != after.toSet + def moving(before: Seq[Int], after: Seq[Int]) = (after.toSet -- before.toSet).nonEmpty //For any moving partition, throttle all the original (pre move) replicas (as any one might be a leader) existing.filter { case (tp, preMoveReplicas) => proposed.contains(tp) && moving(preMoveReplicas, proposed(tp))