From 180a5b50cd6f50725717a0fdf1c8efc5a261d11d Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Wed, 10 Feb 2016 13:04:28 -0600 Subject: [PATCH] KAFKA-3226: Replicas collections should use List instead of Set in order to maintain order --- .../common/requests/LeaderAndIsrRequest.java | 11 +++++----- .../requests/UpdateMetadataRequest.java | 11 +++++----- .../common/requests/RequestResponseTest.java | 12 +++++------ .../main/scala/kafka/api/LeaderAndIsr.scala | 6 +++--- .../controller/ControllerChannelManager.scala | 4 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 4 ++-- .../RequestResponseSerializationTest.scala | 8 ++++---- .../integration/BaseTopicMetadataTest.scala | 20 ++++++++++--------- .../kafka/server/LeaderElectionTest.scala | 2 +- 9 files changed, 41 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index a77a7cbd6287f..b02a391c43c50 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,15 +41,15 @@ public static class PartitionState { public final int leaderEpoch; public final List isr; public final int zkVersion; - public final Set replicas; + public final List replicas; - public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List isr, int zkVersion, Set replicas) { + public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List isr, int zkVersion, List replicas) { this.controllerEpoch = controllerEpoch; this.leader = leader; this.leaderEpoch = leaderEpoch; - this.isr = isr; + this.isr = new ArrayList<>(new LinkedHashSet<>(isr)); this.zkVersion = zkVersion; - this.replicas = replicas; + this.replicas = new ArrayList<>(new LinkedHashSet<>(replicas)); } } @@ -150,7 +151,7 @@ public LeaderAndIsrRequest(Struct struct) { int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME); Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME); - Set replicas = new HashSet<>(replicasArray.length); + List replicas = new ArrayList<>(replicasArray.length); for (Object r : replicasArray) replicas.add((Integer) r); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 808161c65dc35..4ba940c63cfb3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,15 +39,15 @@ public static final class PartitionState { public final int leaderEpoch; public final List isr; public final int zkVersion; - public final Set replicas; + public final List replicas; - public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List isr, int zkVersion, Set replicas) { + public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List isr, int zkVersion, List replicas) { this.controllerEpoch = controllerEpoch; this.leader = leader; this.leaderEpoch = leaderEpoch; - this.isr = isr; + this.isr = new ArrayList<>(new LinkedHashSet<>(isr)); this.zkVersion = zkVersion; - this.replicas = replicas; + this.replicas = new ArrayList<>(new LinkedHashSet<>(replicas)); } } @@ -217,7 +218,7 @@ public UpdateMetadataRequest(Struct struct) { int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME); Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME); - Set replicas = new HashSet<>(replicasArray.length); + List replicas = new ArrayList<>(replicasArray.length); for (Object r : replicasArray) replicas.add((Integer) r); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 789cca79f5eb2..206de22e9b8a2 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -335,11 +335,11 @@ private AbstractRequest createLeaderAndIsrRequest() { List isr = Arrays.asList(1, 2); List replicas = Arrays.asList(1, 2, 3, 4); partitionStates.put(new TopicPartition("topic5", 105), - new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new ArrayList<>(replicas))); partitionStates.put(new TopicPartition("topic5", 1), - new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new ArrayList<>(replicas))); partitionStates.put(new TopicPartition("topic20", 1), - new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new ArrayList<>(replicas))); Set leaders = new HashSet<>(Arrays.asList( new LeaderAndIsrRequest.EndPoint(0, "test0", 1223), @@ -360,11 +360,11 @@ private AbstractRequest createUpdateMetadataRequest(int version) { List isr = Arrays.asList(1, 2); List replicas = Arrays.asList(1, 2, 3, 4); partitionStates.put(new TopicPartition("topic5", 105), - new UpdateMetadataRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new UpdateMetadataRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new ArrayList<>(replicas))); partitionStates.put(new TopicPartition("topic5", 1), - new UpdateMetadataRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new UpdateMetadataRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new ArrayList<>(replicas))); partitionStates.put(new TopicPartition("topic20", 1), - new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas))); + new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new ArrayList<>(replicas))); if (version == 0) { Set liveBrokers = new HashSet<>(Arrays.asList( diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala index 5de527c50963b..7c7e7ba52f4ce 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala @@ -51,12 +51,12 @@ object PartitionStateInfo { val replicationFactor = buffer.getInt val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch), - replicas.toSet) + replicas.toList) } } case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - allReplicas: Set[Int]) { + allReplicas: List[Int]) { def replicationFactor = allReplicas.size def writeTo(buffer: ByteBuffer) { @@ -67,7 +67,7 @@ case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControlle leaderIsrAndControllerEpoch.leaderAndIsr.isr.foreach(buffer.putInt(_)) buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion) buffer.putInt(replicationFactor) - allReplicas.foreach(buffer.putInt(_)) + allReplicas.distinct.foreach(buffer.putInt(_)) } def sizeInBytes(): Int = { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index e52a9d30f8e59..565e934ecd39a 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -274,7 +274,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging brokerIds.filter(_ >= 0).foreach { brokerId => val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) - result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) + result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toList)) } addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, @@ -303,7 +303,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => - val replicas = controllerContext.partitionReplicaAssignment(partition).toSet + val replicas = controllerContext.partitionReplicaAssignment(partition).toList val partitionStateInfo = if (beingDeleted) { val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr) PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index db2040f544fa8..962ff21f5261d 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -185,7 +185,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { } private def createUpdateMetadataRequest = { - val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava + val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, List(brokerId).asJava)).asJava val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers) @@ -214,7 +214,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { private def createLeaderAndIsrRequest = { new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue, - Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, + Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, List(brokerId).asJava)).asJava, Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava) } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cfbca004fc461..c519d6ce33f60 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -104,10 +104,10 @@ object SerializationTestUtils { private val leaderIsrAndControllerEpoch2 = new LeaderIsrAndControllerEpoch(leaderAndIsr2, controllerEpoch = 0) private val leaderIsrAndControllerEpoch3 = new LeaderIsrAndControllerEpoch(leaderAndIsr3, controllerEpoch = 0) - private val partitionStateInfo0 = new PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id).toSet) - private val partitionStateInfo1 = new PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id).toSet) - private val partitionStateInfo2 = new PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id).toSet) - private val partitionStateInfo3 = new PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id).toSet) + private val partitionStateInfo0 = new PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id)) + private val partitionStateInfo1 = new PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id)) + private val partitionStateInfo2 = new PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id)) + private val partitionStateInfo3 = new PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id)) private val updateMetadataRequestPartitionStateInfo = collection.immutable.Map( TopicAndPartition(topic1,0) -> partitionStateInfo0, diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala index 1ef26f66fd5eb..2400cfbb672cb 100644 --- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala @@ -253,17 +253,19 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { // Assert that topic metadata at new brokers is updated correctly servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x => - waitUntilTrue(() => - topicMetadata == ClientUtils.fetchTopicMetadata( - Set.empty, - Seq(new Broker(x.config.brokerId, - x.config.hostName, - x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), - "TopicMetadataTest-testBasicTopicMetadata", - 2000, 0), "Topic metadata is not correctly updated")) + waitUntilTrue(() => { + val foundMetadata = ClientUtils.fetchTopicMetadata( + Set.empty, + Seq(new Broker(x.config.brokerId, + x.config.hostName, + x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), + "TopicMetadataTest-testBasicTopicMetadata", 2000, 0) + topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) && + topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic) + }, + s"Topic metadata is not correctly updated")) } - @Test def testAliveBrokerListWithNoTopics { checkMetadata(Seq(server1), 1) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 704f77622a28c..b1bd82f415dbb 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -145,7 +145,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val partitionStates = Map( new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch, Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion, - Set(0, 1).map(Integer.valueOf).asJava) + List(0, 1).map(Integer.valueOf).asJava) ) val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch, partitionStates.asJava, brokerEndPoints.toSet.asJava)