diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index a77bf8cbb862..e32d0b6cd435 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -697,8 +697,26 @@ public class Protocol { public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0; - public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1}; - public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1}; + public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V2 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V1; + + public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V2 = UPDATE_METADATA_REQUEST_END_POINT_V1; + + public static final Schema UPDATE_METADATA_REQUEST_BROKER_V2 = + new Schema(new Field("id", INT32, "The broker id."), + new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V2)), + new Field("rack", NULLABLE_STRING, "The rack")); + + public static final Schema UPDATE_METADATA_REQUEST_V2 = + new Schema(new Field("controller_id", INT32, "The controller id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V2)), + new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V2))); + + public static final Schema UPDATE_METADATA_RESPONSE_V2 = UPDATE_METADATA_RESPONSE_V1; + + + public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2}; + public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2}; /* an array of all requests and responses with all schema versions; a null value in the inner array means that the * particular version is not supported */ diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 4902f25d1757..79f0638c11e2 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -311,7 +311,10 @@ public int hashCode() { for (Object arrayItem: arrayObject) result = prime * result + arrayItem.hashCode(); } else { - result = prime * result + this.get(f).hashCode(); + Object field = this.get(f); + if (field != null) { + result = prime * result + field.hashCode(); + } } } return result; @@ -330,11 +333,13 @@ public boolean equals(Object obj) { return false; for (int i = 0; i < this.values.length; i++) { Field f = this.schema.get(i); - Boolean result; + boolean result; if (f.type() instanceof ArrayOf) { - result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f)); + result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f)); } else { - result = this.get(f).equals(other.get(f)); + Object thisField = this.get(f); + Object otherField = other.get(f); + result = (thisField == null && otherField == null) || thisField.equals(otherField); } if (!result) return false; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index d8d801387112..4c3d0a74740d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -49,16 +49,22 @@ public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List endPoints; + public final String rack; - public Broker(int id, Map endPoints) { + public Broker(int id, Map endPoints, String rack) { this.id = id; this.endPoints = endPoints; + this.rack = rack; + } + + @Deprecated + public Broker(int id, Map endPoints) { + this(id, endPoints, null); } } @@ -91,6 +97,7 @@ public EndPoint(String host, int port) { // Broker key names private static final String BROKER_ID_KEY_NAME = "id"; private static final String ENDPOINTS_KEY_NAME = "end_points"; + private static final String RACK_KEY_NAME = "rack"; // EndPoint key names private static final String HOST_KEY_NAME = "host"; @@ -117,20 +124,20 @@ private static Set brokerEndPointsToBrokers(Set brokerEn for (BrokerEndPoint brokerEndPoint : brokerEndPoints) { Map endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT, new EndPoint(brokerEndPoint.host(), brokerEndPoint.port())); - brokers.add(new Broker(brokerEndPoint.id(), endPoints)); + brokers.add(new Broker(brokerEndPoint.id(), endPoints, null)); } return brokers; } /** - * Constructor for version 1. + * Constructor for version 2. */ public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map partitionStates, Set liveBrokers) { - this(1, controllerId, controllerEpoch, partitionStates, liveBrokers); + this(2, controllerId, controllerEpoch, partitionStates, liveBrokers); } - private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map partitionStates, Set liveBrokers) { super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version))); struct.set(CONTROLLER_ID_KEY_NAME, controllerId); @@ -173,6 +180,9 @@ private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch } brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray()); + if (version >= 2) { + brokerData.set(RACK_KEY_NAME, broker.rack); + } } brokersData.add(brokerData); @@ -226,8 +236,8 @@ public UpdateMetadataRequest(Struct struct) { int port = brokerData.getInt(PORT_KEY_NAME); Map endPoints = new HashMap<>(1); endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port)); - liveBrokers.add(new Broker(brokerId, endPoints)); - } else { // V1 + liveBrokers.add(new Broker(brokerId, endPoints, null)); + } else { // V1 or V2 Map endPoints = new HashMap<>(); for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) { Struct endPointData = (Struct) endPointDataObj; @@ -236,11 +246,13 @@ public UpdateMetadataRequest(Struct struct) { short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME); endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port)); } - liveBrokers.add(new Broker(brokerId, endPoints)); + String rack = null; + if (brokerData.hasField(RACK_KEY_NAME)) { // V2 + rack = brokerData.getString(RACK_KEY_NAME); + } + liveBrokers.add(new Broker(brokerId, endPoints, rack)); } - } - controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME); controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME); this.partitionStates = partitionStates; @@ -249,14 +261,11 @@ public UpdateMetadataRequest(Struct struct) { @Override public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - switch (versionId) { - case 0: - case 1: - return new UpdateMetadataResponse(Errors.forException(e).code()); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id))); - } + if (versionId <= 2) + return new UpdateMetadataResponse(Errors.forException(e).code()); + else + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id))); } public int controllerId() { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 7ccf07980ae2..b556b4647468 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -86,8 +86,9 @@ public void testSerialization() throws Exception { createStopReplicaRequest(), createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()), createStopReplicaResponse(), - createUpdateMetadataRequest(1), - createUpdateMetadataRequest(1).getErrorResponse(1, new UnknownServerException()), + createUpdateMetadataRequest(2, "rack1"), + createUpdateMetadataRequest(2, null), + createUpdateMetadataRequest(2, "rack1").getErrorResponse(2, new UnknownServerException()), createUpdateMetadataResponse(), createLeaderAndIsrRequest(), createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()), @@ -97,8 +98,11 @@ public void testSerialization() throws Exception { for (AbstractRequestResponse req : requestResponseList) checkSerialization(req, null); - checkSerialization(createUpdateMetadataRequest(0), 0); - checkSerialization(createUpdateMetadataRequest(0).getErrorResponse(0, new UnknownServerException()), 0); + checkSerialization(createUpdateMetadataRequest(0, null), 0); + checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()), 0); + checkSerialization(createUpdateMetadataRequest(1, null), 1); + checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1); + checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1); } private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception { @@ -120,7 +124,7 @@ private void checkSerialization(AbstractRequestResponse req, Integer version) th @Test public void produceResponseVersionTest() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP)); ProduceResponse v0Response = new ProduceResponse(responseData); ProduceResponse v1Response = new ProduceResponse(responseData, 10, 1); @@ -138,7 +142,7 @@ public void produceResponseVersionTest() { @Test public void fetchResponseVersionTest() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); FetchResponse v0Response = new FetchResponse(responseData); @@ -192,14 +196,14 @@ private AbstractRequestResponse createGroupCoordinatorResponse() { } private AbstractRequest createFetchRequest() { - Map fetchData = new HashMap(); + Map fetchData = new HashMap<>(); fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000)); fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000)); return new FetchRequest(-1, 100, 100000, fetchData); } private AbstractRequestResponse createFetchResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10))); return new FetchResponse(responseData, 0); } @@ -259,13 +263,13 @@ private AbstractRequestResponse createLeaveGroupResponse() { } private AbstractRequest createListOffsetRequest() { - Map offsetData = new HashMap(); + Map offsetData = new HashMap<>(); offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); return new ListOffsetRequest(-1, offsetData); } private AbstractRequestResponse createListOffsetResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L))); return new ListOffsetResponse(responseData); } @@ -289,13 +293,13 @@ private AbstractRequestResponse createMetadataResponse() { } private AbstractRequest createOffsetCommitRequest() { - Map commitData = new HashMap(); + Map commitData = new HashMap<>(); commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, "")); return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData); } private AbstractRequestResponse createOffsetCommitResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), Errors.NONE.code()); return new OffsetCommitResponse(responseData); } @@ -305,19 +309,19 @@ private AbstractRequest createOffsetFetchRequest() { } private AbstractRequestResponse createOffsetFetchResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code())); return new OffsetFetchResponse(responseData); } private AbstractRequest createProduceRequest() { - Map produceData = new HashMap(); + Map produceData = new HashMap<>(); produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10)); return new ProduceRequest((short) 1, 5000, produceData); } private AbstractRequestResponse createProduceResponse() { - Map responseData = new HashMap(); + Map responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP)); return new ProduceResponse(responseData, 0); } @@ -371,7 +375,7 @@ private AbstractRequestResponse createLeaderAndIsrResponse() { } @SuppressWarnings("deprecation") - private AbstractRequest createUpdateMetadataRequest(int version) { + private AbstractRequest createUpdateMetadataRequest(int version, String rack) { Map partitionStates = new HashMap<>(); List isr = Arrays.asList(1, 2); List replicas = Arrays.asList(1, 2, 3, 4); @@ -397,11 +401,10 @@ private AbstractRequest createUpdateMetadataRequest(int version) { endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244)); endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234)); - Set liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1), - new UpdateMetadataRequest.Broker(1, endPoints2) + Set liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack), + new UpdateMetadataRequest.Broker(1, endPoints2, rack) )); - - return new UpdateMetadataRequest(1, 10, partitionStates, liveBrokers); + return new UpdateMetadataRequest(version, 1, 10, partitionStates, liveBrokers); } } diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 3fb44d320afa..24174bea9546 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -19,7 +19,6 @@ package kafka.admin import kafka.common._ import kafka.cluster.Broker - import kafka.log.LogConfig import kafka.server.ConfigType import kafka.utils._ @@ -32,14 +31,12 @@ import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopi import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.requests.MetadataResponse -import scala.Predef._ import scala.collection._ -import scala.collection.JavaConverters._ -import scala.collection.mutable +import JavaConverters._ import mutable.ListBuffer +import scala.collection.mutable import collection.Map import collection.Set - import org.I0Itec.zkclient.exception.ZkNodeExistsException object AdminUtils extends Logging { @@ -48,11 +45,13 @@ object AdminUtils extends Logging { val EntityConfigChangeZnodePrefix = "config_change_" /** - * There are 2 goals of replica assignment: + * There are 3 goals of replica assignment: + * * 1. Spread the replicas evenly among brokers. * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers. + * 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible * - * To achieve this goal, we: + * To achieve this goal for replica assignment without considering racks, we: * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. * 2. Assign the remaining replicas of each partition with an increasing shift. * @@ -64,39 +63,177 @@ object AdminUtils extends Logging { * p8 p9 p5 p6 p7 (2nd replica) * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) + * + * To create rack aware assignment, this API will first create a rack alternated broker list. For example, + * from this brokerID -> rack mapping: + * + * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1" + * + * The rack alternated list will be: + * + * 0, 3, 1, 5, 4, 2 + * + * Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment + * will be: + * + * 0 -> 0,3,1 + * 1 -> 3,1,5 + * 2 -> 1,5,4 + * 3 -> 5,4,2 + * 4 -> 4,2,0 + * 5 -> 2,0,3 + * + * Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start + * shifting the followers. This is to ensure we will not always get the same set of sequences. + * In this case, if there is another partition to assign (partition #6), the assignment will be: + * + * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0) + * + * The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated + * broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have + * any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on + * the broker list. + * + * As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that + * each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect + * situation where the number of replicas is the same as the number of racks and each rack has the same number of + * brokers, it guarantees that the replica distribution is even across brokers and racks. + * + * @return a Map from partition id to replica ids + * @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to + * assign each replica to a unique rack. + * */ - def assignReplicasToBrokers(brokerList: Seq[Int], + def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata], nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, - startPartitionId: Int = -1) - : Map[Int, Seq[Int]] = { + startPartitionId: Int = -1): Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdminOperationException("number of partitions must be larger than 0") if (replicationFactor <= 0) throw new AdminOperationException("replication factor must be larger than 0") - if (replicationFactor > brokerList.size) - throw new AdminOperationException("replication factor: " + replicationFactor + - " larger than available brokers: " + brokerList.size) - val ret = new mutable.HashMap[Int, List[Int]]() - val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) - var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0 + if (replicationFactor > brokerMetadatas.size) + throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}") + if (brokerMetadatas.forall(_.rack.isEmpty)) + assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex, + startPartitionId) + else { + if (brokerMetadatas.exists(_.rack.isEmpty)) + throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment") + assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, + startPartitionId) + } + } - var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) - for (i <- 0 until nPartitions) { - if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) + private def assignReplicasToBrokersRackUnaware(nPartitions: Int, + replicationFactor: Int, + brokerList: Seq[Int], + fixedStartIndex: Int, + startPartitionId: Int): Map[Int, Seq[Int]] = { + val ret = mutable.Map[Int, Seq[Int]]() + val brokerArray = brokerList.toArray + val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) + var currentPartitionId = math.max(0, startPartitionId) + var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) + for (_ <- 0 until nPartitions) { + if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 - val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size - var replicaList = List(brokerList(firstReplicaIndex)) + val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length + val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) - replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) - ret.put(currentPartitionId, replicaList.reverse) - currentPartitionId = currentPartitionId + 1 + replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) + ret.put(currentPartitionId, replicaBuffer) + currentPartitionId += 1 } - ret.toMap + ret + } + + private def assignReplicasToBrokersRackAware(nPartitions: Int, + replicationFactor: Int, + brokerMetadatas: Seq[BrokerMetadata], + fixedStartIndex: Int, + startPartitionId: Int): Map[Int, Seq[Int]] = { + val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) => + id -> rack + }.toMap + val numRacks = brokerRackMap.values.toSet.size + val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap) + val numBrokers = arrangedBrokerList.size + val ret = mutable.Map[Int, Seq[Int]]() + val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size) + var currentPartitionId = math.max(0, startPartitionId) + var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size) + for (_ <- 0 until nPartitions) { + if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0)) + nextReplicaShift += 1 + val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size + val leader = arrangedBrokerList(firstReplicaIndex) + val replicaBuffer = mutable.ArrayBuffer(leader) + val racksWithReplicas = mutable.Set(brokerRackMap(leader)) + val brokersWithReplicas = mutable.Set(leader) + var k = 0 + for (_ <- 0 until replicationFactor - 1) { + var done = false + while (!done) { + val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size)) + val rack = brokerRackMap(broker) + // Skip this broker if + // 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks + // that do not have any replica, or + // 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned + if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) + && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { + replicaBuffer += broker + racksWithReplicas += rack + brokersWithReplicas += broker + done = true + } + k += 1 + } + } + ret.put(currentPartitionId, replicaBuffer) + currentPartitionId += 1 + } + ret } + /** + * Given broker and rack information, returns a list of brokers alternated by the rack. Assume + * this is the rack and its brokers: + * + * rack1: 0, 1, 2 + * rack2: 3, 4, 5 + * rack3: 6, 7, 8 + * + * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8 + * + * This is essential to make sure that the assignReplicasToBrokers API can use such list and + * assign replicas to brokers in a simple round-robin fashion, while ensuring an even + * distribution of leader and replica counts on each broker and that replicas are + * distributed to all racks. + */ + private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = { + val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) => + (rack, brokers.toIterator) + } + val racks = brokersIteratorByRack.keys.toArray.sorted + val result = new mutable.ArrayBuffer[Int] + var rackIndex = 0 + while (result.size < brokerRackMap.size) { + val rackIterator = brokersIteratorByRack(racks(rackIndex)) + if (rackIterator.hasNext) + result += rackIterator.next() + rackIndex = (rackIndex + 1) % racks.length + } + result + } + private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): Map[String, Seq[Int]] = { + brokerRackMap.toSeq.map { case (id, rack) => (rack, id) } + .groupBy { case (rack, _) => rack } + .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) } + } /** * Add partitions to existing topic with optional replica assignment * @@ -110,7 +247,8 @@ object AdminUtils extends Logging { topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", - checkBrokerAvailable: Boolean = true) { + checkBrokerAvailable: Boolean = true, + rackAwareMode: RackAwareMode = RackAwareMode.Enforced) { val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic)) if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) @@ -124,16 +262,16 @@ object AdminUtils extends Logging { throw new AdminOperationException("The number of partitions for a topic can only be increased") // create the new partition replication list - val brokerList = zkUtils.getSortedBrokerList() - val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") { - var startIndex = brokerList.indexWhere(_ >= existingReplicaListForPartitionZero.head) - if(startIndex < 0) { - startIndex = 0 + val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode) + val newPartitionReplicaList = + if (replicaAssignmentStr == null || replicaAssignmentStr == "") { + val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head)) + AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size, + startIndex, existingPartitionsReplicaList.size) } - AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaListForPartitionZero.size, startIndex, existingPartitionsReplicaList.size) - } - else - getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable) + else + getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet, + existingPartitionsReplicaList.size, checkBrokerAvailable) // check if manual assignment has the right replication factor val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size)) @@ -237,13 +375,32 @@ object AdminUtils extends Logging { def topicExists(zkUtils: ZkUtils, topic: String): Boolean = zkUtils.zkClient.exists(getTopicPath(topic)) + def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced, + brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = { + val allBrokers = zkUtils.getAllBrokersInCluster() + val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers) + val brokersWithRack = brokers.filter(_.rack.nonEmpty) + if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) { + throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" + + " to make replica assignment without rack information.") + } + val brokerMetadatas = rackAwareMode match { + case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None)) + case RackAwareMode.Safe if brokersWithRack.size < brokers.size => + brokers.map(broker => BrokerMetadata(broker.id, None)) + case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack)) + } + brokerMetadatas.sortBy(_.id) + } + def createTopic(zkUtils: ZkUtils, topic: String, partitions: Int, replicationFactor: Int, - topicConfig: Properties = new Properties) { - val brokerList = zkUtils.getSortedBrokerList() - val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) + topicConfig: Properties = new Properties, + rackAwareMode: RackAwareMode = RackAwareMode.Enforced) { + val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode) + val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig) } @@ -304,6 +461,7 @@ object AdminUtils extends Logging { /** * Update the config for a client and create a change notification so the change will propagate to other brokers + * * @param zkUtils Zookeeper utilities used to write the config to ZK * @param clientId: The clientId for which configs are being changed * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or @@ -316,6 +474,7 @@ object AdminUtils extends Logging { /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers + * * @param zkUtils Zookeeper utilities used to write the config to ZK * @param topic: The topic for which configs are being changed * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or diff --git a/core/src/main/scala/kafka/admin/BrokerMetadata.scala b/core/src/main/scala/kafka/admin/BrokerMetadata.scala new file mode 100644 index 000000000000..86831e376e5b --- /dev/null +++ b/core/src/main/scala/kafka/admin/BrokerMetadata.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package kafka.admin + +/** + * Broker metadata used by admin tools. + * + * @param id an integer that uniquely identifies this broker + * @param rack the rack of the broker, which is used to in rack aware partition assignment for fault tolerance. + * Examples: "RACK1", "us-east-1d" + */ +case class BrokerMetadata(id: Int, rack: Option[String]) diff --git a/core/src/main/scala/kafka/admin/RackAwareMode.scala b/core/src/main/scala/kafka/admin/RackAwareMode.scala new file mode 100644 index 000000000000..45555b60bfce --- /dev/null +++ b/core/src/main/scala/kafka/admin/RackAwareMode.scala @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +/** + * Mode to control how rack aware replica assignment will be executed + */ +object RackAwareMode { + + /** + * Ignore all rack information in replica assignment. This is an optional mode used in command line. + */ + case object Disabled extends RackAwareMode + + /** + * Assume every broker has rack, or none of the brokers has rack. If only partial brokers have rack, fail fast + * in replica assignment. This is the default mode in command line tools (TopicCommand and ReassignPartitionsCommand). + */ + case object Enforced extends RackAwareMode + + /** + * Use rack information if every broker has a rack. Otherwise, fallback to Disabled mode. This is used in auto topic + * creation. + */ + case object Safe extends RackAwareMode +} + +sealed trait RackAwareMode diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 13e423d4c23d..446ab9f5374c 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -91,23 +91,33 @@ object ReassignPartitionsCommand extends Logging { if (duplicateReassignments.nonEmpty) throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(","))) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) + val disableRackAware = opts.options.has(opts.disableRackAware) + val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware) + println("Current partition replica assignment\n\n%s".format(zkUtils.getPartitionReassignmentZkData(currentAssignments))) + println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(proposedAssignments))) + } + + def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = { val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString) val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign) if (duplicateTopicsToReassign.nonEmpty) throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(","))) - val topicPartitionsToReassign = zkUtils.getReplicaAssignmentForTopics(topicsToReassign) - - var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() - val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) - groupedByTopic.foreach { topicInfo => - val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, - topicInfo._2.head._2.size) - partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) + val currentAssignment = zkUtils.getReplicaAssignmentForTopics(topicsToReassign) + + val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic } + val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced + val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, rackAwareMode, Some(brokerListToReassign)) + + val partitionsToBeReassigned = mutable.Map[TopicAndPartition, Seq[Int]]() + groupedByTopic.foreach { case (topic, assignment) => + val (_, replicas) = assignment.head + val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size) + partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) => + (TopicAndPartition(topic, partition) -> replicas) + } } - val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic).toSeq) - println("Current partition replica assignment\n\n%s" - .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) - println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) + + (partitionsToBeReassigned, currentAssignment) } def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) { @@ -200,7 +210,8 @@ object ReassignPartitionsCommand extends Logging { .withRequiredArg .describedAs("brokerlist") .ofType(classOf[String]) - + val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment") + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.") diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index d4212c551f54..e89e09d2a3bf 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -105,7 +105,9 @@ object TopicCommand extends Logging { val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue warnOnMaxMessagesChange(configs, replicas) - AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs) + val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled + else RackAwareMode.Enforced + AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode) } println("Created topic \"%s\".".format(topic)) } catch { @@ -324,6 +326,7 @@ object TopicCommand extends Logging { val ifNotExistsOpt = parser.accepts("if-not-exists", "if set when creating topics, the action will only execute if the topic does not already exist") + val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment") val options = parser.parse(args : _*) val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt) diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 7340f148af1e..77b85e01b402 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -19,6 +19,7 @@ package kafka.cluster import java.nio.ByteBuffer +import kafka.api.ApiUtils._ import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException} import kafka.utils.Json import org.apache.kafka.common.Node @@ -32,26 +33,41 @@ import org.apache.kafka.common.protocol.SecurityProtocol object Broker { /** - * Create a broker object from id and JSON string. - * @param id - * @param brokerInfoString - * - * Version 1 JSON schema for a broker is: - * {"version":1, - * "host":"localhost", - * "port":9092 - * "jmx_port":9999, - * "timestamp":"2233345666" } - * - * The current JSON schema for a broker is: - * {"version":2, - * "host","localhost", - * "port",9092 - * "jmx_port":9999, - * "timestamp":"2233345666", - * "endpoints": ["PLAINTEXT://host1:9092", - * "SSL://host1:9093"] - */ + * Create a broker object from id and JSON string. + * + * @param id + * @param brokerInfoString + * + * Version 1 JSON schema for a broker is: + * { + * "version":1, + * "host":"localhost", + * "port":9092 + * "jmx_port":9999, + * "timestamp":"2233345666" + * } + * + * Version 2 JSON schema for a broker is: + * { + * "version":2, + * "host":"localhost", + * "port":9092 + * "jmx_port":9999, + * "timestamp":"2233345666", + * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"] + * } + * + * Version 3 (current) JSON schema for a broker is: + * { + * "version":3, + * "host":"localhost", + * "port":9092 + * "jmx_port":9999, + * "timestamp":"2233345666", + * "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"], + * "rack":"dc1" + * } + */ def createBroker(id: Int, brokerInfoString: String): Broker = { if (brokerInfoString == null) throw new BrokerNotAvailableException(s"Broker id $id does not exist") @@ -75,9 +91,8 @@ object Broker { (ep.protocolType, ep) }.toMap } - - - new Broker(id, endpoints) + val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String]) + new Broker(id, endpoints, rack) case None => throw new BrokerNotAvailableException(s"Broker id $id does not exist") } @@ -86,61 +101,34 @@ object Broker { throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t) } } - - /** - * - * @param buffer Containing serialized broker. - * Current serialization is: - * id (int), number of endpoints (int), serialized endpoints - * @return broker object - */ - def readFrom(buffer: ByteBuffer): Broker = { - val id = buffer.getInt - val numEndpoints = buffer.getInt - - val endpoints = List.range(0, numEndpoints).map(i => EndPoint.readFrom(buffer)) - .map(ep => ep.protocolType -> ep).toMap - new Broker(id, endpoints) - } } -case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]) { +case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint], rack: Option[String]) { - override def toString: String = id + " : " + endPoints.values.mkString("(",",",")") + override def toString: String = + s"$id : ${endPoints.values.mkString("(",",",")")} : ${rack.orNull}" + + def this(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) = { + this(id, endPoints, None) + } def this(id: Int, host: String, port: Int, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { - this(id, Map(protocol -> EndPoint(host, port, protocol))) + this(id, Map(protocol -> EndPoint(host, port, protocol)), None) } def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = { this(bep.id, bep.host, bep.port, protocol) } - - def writeTo(buffer: ByteBuffer) { - buffer.putInt(id) - buffer.putInt(endPoints.size) - for(endpoint <- endPoints.values) { - endpoint.writeTo(buffer) - } - } - - def sizeInBytes: Int = - 4 + /* broker id*/ - 4 + /* number of endPoints */ - endPoints.values.map(_.sizeInBytes).sum /* end points */ - - def supportsChannel(protocolType: SecurityProtocol): Unit = { - endPoints.contains(protocolType) - } - def getNode(protocolType: SecurityProtocol): Node = { - val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id))) + val endpoint = endPoints.getOrElse(protocolType, + throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id")) new Node(id, endpoint.host, endpoint.port) } def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = { - val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id))) + val endpoint = endPoints.getOrElse(protocolType, + throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id")) new BrokerEndPoint(id, endpoint.host, endpoint.port) } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 3b1a458e6be1..ea156fa66fd2 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,24 +16,25 @@ */ package kafka.controller -import kafka.api.{LeaderAndIsr, KAFKA_0_9_0, PartitionStateInfo} +import java.net.SocketTimeoutException +import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} + +import kafka.api._ +import kafka.cluster.Broker +import kafka.common.{KafkaException, TopicAndPartition} +import kafka.server.KafkaConfig import kafka.utils._ -import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient} -import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node} +import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode} -import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys} -import org.apache.kafka.common.requests._ +import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} +import org.apache.kafka.common.requests.{UpdateMetadataRequest, _} import org.apache.kafka.common.utils.Time -import collection.mutable.HashMap -import kafka.cluster.Broker -import java.net.{SocketTimeoutException} -import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} -import kafka.server.KafkaConfig -import collection.mutable -import kafka.common.{KafkaException, TopicAndPartition} -import collection.Set -import collection.JavaConverters._ +import org.apache.kafka.common.{BrokerEndPoint, Node, TopicPartition} + +import scala.collection.JavaConverters._ +import scala.collection.{Set, mutable} +import scala.collection.mutable.HashMap class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging { protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] @@ -380,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging topicPartition -> partitionState } - val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) (1: Short) else (0: Short) + val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2: Short + else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short + else 0: Short val updateMetadataRequest = if (version == 0) { @@ -395,9 +398,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) => securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port) } - new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava) + new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) } - new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava) + new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava) } controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5f9ec8ba4cee..452f721c0a58 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.lang.{Long => JLong, Short => JShort} import java.util.Properties -import kafka.admin.AdminUtils +import kafka.admin.{RackAwareMode, AdminUtils} import kafka.api._ import kafka.cluster.Partition import kafka.common._ @@ -624,7 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel, replicationFactor: Int, properties: Properties = new Properties()): MetadataResponse.TopicMetadata = { try { - AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties) + AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) info("Auto creation of topic %s with %d partitions and replication factor %d is successful" .format(topic, numPartitions, replicationFactor)) new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList()) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8d14edd17f84..9c2487673e26 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -222,6 +222,8 @@ object KafkaConfig { val MaxConnectionsPerIpProp = "max.connections.per.ip" val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides" val ConnectionsMaxIdleMsProp = "connections.max.idle.ms" + /***************** rack configuration *************/ + val RackProp = "broker.rack" /** ********* Log Configuration ***********/ val NumPartitionsProp = "num.partitions" val LogDirsProp = "log.dirs" @@ -388,6 +390,8 @@ object KafkaConfig { val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address" val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections" val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this" + /************* Rack Configuration **************/ + val RackDoc = "Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: `RACK1`, `us-east-1d`" /** ********* Log Configuration ***********/ val NumPartitionsDoc = "The default number of log partitions per topic" val LogDirDoc = "The directory in which the log data is kept (supplemental for " + LogDirsProp + " property)" @@ -571,6 +575,9 @@ object KafkaConfig { .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc) .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc) + /************ Rack Configuration ******************/ + .define(RackProp, STRING, null, MEDIUM, RackDoc) + /** ********* Log Configuration ***********/ .define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc) .define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc) @@ -771,6 +778,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)} val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp) + /***************** rack configuration **************/ + val rack = Option(getString(KafkaConfig.RackProp)) + /** ********* Log Configuration ***********/ val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) val numPartitions = getInt(KafkaConfig.NumPartitionsProp) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 928ff43e1193..2598e6dbee60 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -17,12 +17,14 @@ package kafka.server +import java.net.InetAddress + +import kafka.api.ApiVersion import kafka.cluster.EndPoint import kafka.utils._ +import org.I0Itec.zkclient.IZkStateListener import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkStateListener, ZkClient, ZkConnection} -import java.net.InetAddress /** @@ -35,7 +37,9 @@ import java.net.InetAddress */ class KafkaHealthcheck(private val brokerId: Int, private val advertisedEndpoints: Map[SecurityProtocol, EndPoint], - private val zkUtils: ZkUtils) extends Logging { + private val zkUtils: ZkUtils, + private val rack: Option[String], + private val interBrokerProtocolVersion: ApiVersion) extends Logging { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener @@ -61,7 +65,8 @@ class KafkaHealthcheck(private val brokerId: Int, // only PLAINTEXT is supported as default // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) - zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort) + zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack, + interBrokerProtocolVersion) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2f5441ac1211..e29494baa1d9 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -239,7 +239,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr else (protocol, endpoint) } - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion) kafkaHealthcheck.startup() // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 1fdd717044c2..6df261c7482c 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -159,7 +159,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol)) nodes.put(protocol, new Node(broker.id, ep.host, ep.port)) } - aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala) + aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f39ed014cec9..99c8196a0389 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -18,31 +18,26 @@ package kafka.utils import java.util.concurrent.CountDownLatch + +import kafka.admin._ +import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr} import kafka.cluster._ +import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition} import kafka.consumer.{ConsumerThreadId, TopicCount} +import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext} import kafka.server.ConfigType -import org.I0Itec.zkclient.{ZkClient,ZkConnection} -import org.I0Itec.zkclient.exception.{ZkException, ZkNodeExistsException, ZkNoNodeException, - ZkMarshallingError, ZkBadVersionException} +import kafka.utils.ZkUtils._ +import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException} import org.I0Itec.zkclient.serialize.ZkSerializer +import org.I0Itec.zkclient.{ZkClient, ZkConnection} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.zookeeper.ZooDefs -import scala.collection._ -import kafka.api.LeaderAndIsr -import org.apache.zookeeper.data.{ACL, Stat} -import kafka.admin._ -import kafka.common.{KafkaException, NoEpochForPartitionException} -import kafka.controller.ReassignedPartitionsContext -import kafka.controller.KafkaController -import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.common.TopicAndPartition -import kafka.utils.ZkUtils._ -import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback} -import org.apache.zookeeper.CreateMode -import org.apache.zookeeper.KeeperException +import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} import org.apache.zookeeper.KeeperException.Code -import org.apache.zookeeper.ZooKeeper +import org.apache.zookeeper.data.{ACL, Stat} +import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper} + +import scala.collection._ object ZkUtils { val ConsumersPath = "/consumers" @@ -256,19 +251,43 @@ class ZkUtils(val zkClient: ZkClient, } /** - * Register brokers with v2 json format (which includes multiple endpoints). + * Register brokers with v3 json format (which includes multiple endpoints and rack) if + * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. + * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2. + * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X without having to upgrade + * to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case). + * * This format also includes default endpoints for compatibility with older clients. - * @param id - * @param host - * @param port - * @param advertisedEndpoints - * @param jmxPort + * + * @param id broker ID + * @param host broker host name + * @param port broker port + * @param advertisedEndpoints broker end points + * @param jmxPort jmx port + * @param rack broker rack + * @param apiVersion Kafka version the broker is running as */ - def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int) { + def registerBrokerInZk(id: Int, + host: String, + port: Int, + advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], + jmxPort: Int, + rack: Option[String], + apiVersion: ApiVersion) { val brokerIdPath = BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString - val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) + val version = if (apiVersion >= KAFKA_0_10_0_IV0) 3 else 2 + var jsonMap = Map("version" -> version, + "host" -> host, + "port" -> port, + "endpoints" -> advertisedEndpoints.values.map(_.connectionString).toArray, + "jmx_port" -> jmxPort, + "timestamp" -> timestamp + ) + rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack)) + + val brokerInfo = Json.encode(jsonMap) registerBrokerInZk(brokerIdPath, brokerInfo) info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) @@ -745,6 +764,7 @@ class ZkUtils(val zkClient: ZkClient, /** * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker * or throws an exception if the broker dies before the query to zookeeper finishes + * * @param brokerId The broker id * @return An optional Broker object encapsulating the broker metadata */ @@ -768,7 +788,6 @@ class ZkUtils(val zkClient: ZkClient, case e: ZkNoNodeException => { createParentPath(BrokerSequenceIdPath, acls) try { - import scala.collection.JavaConversions._ zkClient.createPersistent(BrokerSequenceIdPath, "", acls) 0 } catch { @@ -880,7 +899,6 @@ class ZKConfig(props: VerifiableProperties) { object ZkPath { @volatile private var isNamespacePresent: Boolean = false - import scala.collection.JavaConversions._ def checkNamespace(client: ZkClient) { if(isNamespacePresent) diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala new file mode 100644 index 000000000000..a2f2041e7aa8 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import java.util.Properties + +import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest} +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord +import org.junit.Assert._ +import org.junit.Test +import scala.collection.Map + +class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest { + val numServers = 4 + val numPartitions = 8 + val replicationFactor = 2 + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString) + overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString) + + def generateConfigs() = + (0 until numServers) map { node => + TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + } map (KafkaConfig.fromProps(_, overridingProps)) + + private val topic = "topic" + + @Test + def testAutoCreateTopic() { + val producer = TestUtils.createNewProducer(brokerList, retries = 5) + try { + // Send a message to auto-create the topic + val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) + + // double check that the topic is created with leader elected + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0) + val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) => + topicPartition.partition -> replicas + } + val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced) + val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1") + assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap) + checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor) + } finally producer.close() + } +} + diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala new file mode 100644 index 000000000000..27ff4d4e34ad --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.utils.{Logging, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.{Map, Seq} + +class AdminRackAwareTest extends RackAwareTest with Logging { + + @Test + def testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() { + val rackMap = Map(0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1") + val newList = AdminUtils.getRackAlternatedBrokerList(rackMap) + assertEquals(List(0, 3, 1, 5, 4, 2), newList) + val anotherList = AdminUtils.getRackAlternatedBrokerList(rackMap - 5) + assertEquals(List(0, 3, 1, 4, 2), anotherList) + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0) + val expected = Map(0 -> List(0, 3, 1), + 1 -> List(3, 1, 5), + 2 -> List(1, 5, 4), + 3 -> List(5, 4, 2), + 4 -> List(4, 2, 0), + 5 -> List(2, 0, 3), + 6 -> List(0, 4, 2)) + assertEquals(expected, assignment) + } + + @Test + def testAssignmentWithRackAware() { + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1") + val numPartitions = 6 + val replicationFactor = 3 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor, 2, 0) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor) + } + + @Test + def testAssignmentWithRackAwareWithRandomStartIndex() { + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1") + val numPartitions = 6 + val replicationFactor = 3 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor) + } + + @Test + def testAssignmentWithRackAwareWithUnevenReplicas() { + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1") + val numPartitions = 13 + val replicationFactor = 3 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor, 0, 0) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor, verifyLeaderDistribution = false, verifyReplicasDistribution = false) + } + + @Test + def testAssignmentWithRackAwareWithUnevenRacks() { + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1") + val numPartitions = 12 + val replicationFactor = 3 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor, verifyReplicasDistribution = false) + } + + @Test + def testAssignmentWith2ReplicasRackAware() { + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1") + val numPartitions = 12 + val replicationFactor = 2 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor) + } + + @Test + def testRackAwareExpansion() { + val brokerRackMapping = Map(6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack3", 11 -> "rack1") + val numPartitions = 12 + val replicationFactor = 2 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor, startPartitionId = 12) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor) + } + + @Test + def testAssignmentWith2ReplicasRackAwareWith6Partitions() { + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1") + val numPartitions = 6 + val replicationFactor = 2 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor) + } + + @Test + def testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() { + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 4 -> "rack3") + val numPartitions = 3 + val replicationFactor = 2 + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, replicationFactor) + } + + @Test + def testLargeNumberPartitionsAssignment() { + val numPartitions = 96 + val replicationFactor = 3 + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1", + 6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack1", 11 -> "rack3") + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor) + checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, + replicationFactor) + } + + @Test + def testMoreReplicasThanRacks() { + val numPartitions = 6 + val replicationFactor = 5 + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2") + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor) + assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size)) + val distribution = getReplicaDistribution(assignment, brokerRackMapping) + for (partition <- 0 until numPartitions) + assertEquals(3, distribution.partitionRacks(partition).toSet.size) + } + + @Test + def testLessReplicasThanRacks() { + val numPartitions = 6 + val replicationFactor = 2 + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2") + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, + replicationFactor) + assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size)) + val distribution = getReplicaDistribution(assignment, brokerRackMapping) + for (partition <- 0 to 5) + assertEquals(2, distribution.partitionRacks(partition).toSet.size) + } + + @Test + def testSingleRack() { + val numPartitions = 6 + val replicationFactor = 3 + val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> "rack1", 4 -> "rack1", 5 -> "rack1") + val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor) + assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size)) + val distribution = getReplicaDistribution(assignment, brokerRackMapping) + for (partition <- 0 until numPartitions) + assertEquals(1, distribution.partitionRacks(partition).toSet.size) + for (broker <- brokerRackMapping.keys) + assertEquals(1, distribution.brokerLeaderCount(broker)) + } + + @Test + def testSkipBrokerWithReplicaAlreadyAssigned() { + val rackInfo = Map(0 -> "a", 1 -> "b", 2 -> "c", 3 -> "a", 4 -> "a") + val brokerList = 0 to 4 + val numPartitions = 6 + val replicationFactor = 4 + val brokerMetadatas = toBrokerMetadata(rackInfo) + assertEquals(brokerList, brokerMetadatas.map(_.id)) + val assignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, numPartitions, replicationFactor, + fixedStartIndex = 2) + checkReplicaDistribution(assignment, rackInfo, 5, 6, 4, + verifyRackAware = false, verifyLeaderDistribution = false, verifyReplicasDistribution = false) + } +} diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 7c2577c992aa..8910e096d849 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -33,20 +33,20 @@ import TestUtils._ import scala.collection.{Map, immutable} -class AdminTest extends ZooKeeperTestHarness with Logging { +class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @Test def testReplicaAssignment() { - val brokerList = List(0, 1, 2, 3, 4) + val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None)) // test 0 replication factor intercept[AdminOperationException] { - AdminUtils.assignReplicasToBrokers(brokerList, 10, 0) + AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0) } // test wrong replication factor intercept[AdminOperationException] { - AdminUtils.assignReplicasToBrokers(brokerList, 10, 6) + AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6) } // correct assignment @@ -62,9 +62,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging { 8 -> List(3, 0, 1), 9 -> List(4, 1, 2)) - val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0) - val e = (expectedAssignment.toList == actualAssignment.toList) - assertTrue(expectedAssignment.toList == actualAssignment.toList) + val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0) + assertEquals(expectedAssignment, actualAssignment) } @Test @@ -314,7 +313,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging { val partition = 1 val preferredReplica = 0 // create brokers - val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) + val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2") + val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment) val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) @@ -452,4 +452,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging { server.config.logDirs.foreach(CoreUtils.rm(_)) } } + + @Test + def testGetBrokerMetadatas() { + // broker 4 has no rack information + val brokerList = 0 to 5 + val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3") + val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet)) + TestUtils.createBrokersInZk(brokerMetadatas, zkUtils) + + val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled) + assertEquals(brokerList, processedMetadatas1.map(_.id)) + assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack)) + + val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe) + assertEquals(brokerList, processedMetadatas2.map(_.id)) + assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack)) + + intercept[AdminOperationException] { + AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced) + } + + val partialList = List(0, 1, 2, 3, 5) + val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced, Some(partialList)) + assertEquals(partialList, processedMetadatas3.map(_.id)) + assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack)) + + val numPartitions = 3 + AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe) + val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")) + assertEquals(numPartitions, assignment.size) + } } diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala new file mode 100644 index 000000000000..facc7458333d --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import scala.collection.{Map, Seq, mutable} +import org.junit.Assert._ + +trait RackAwareTest { + + def checkReplicaDistribution(assignment: Map[Int, Seq[Int]], + brokerRackMapping: Map[Int, String], + numBrokers: Int, + numPartitions: Int, + replicationFactor: Int, + verifyRackAware: Boolean = true, + verifyLeaderDistribution: Boolean = true, + verifyReplicasDistribution: Boolean = true) { + // always verify that no broker will be assigned for more than one replica + for ((_, brokerList) <- assignment) { + assertEquals("More than one replica is assigned to same broker for the same partition", brokerList.toSet.size, brokerList.size) + } + val distribution = getReplicaDistribution(assignment, brokerRackMapping) + + if (verifyRackAware) { + val partitionRackMap = distribution.partitionRacks + assertEquals("More than one replica of the same partition is assigned to the same rack", + List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size)) + } + + if (verifyLeaderDistribution) { + val leaderCount = distribution.brokerLeaderCount + val leaderCountPerBroker = numPartitions / numBrokers + assertEquals("Preferred leader count is not even for brokers", List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList) + } + + if (verifyReplicasDistribution) { + val replicasCount = distribution.brokerReplicasCount + val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers + assertEquals("Replica count is not even for broker", List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList) + } + } + + def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = { + val leaderCount = mutable.Map[Int, Int]() + val partitionCount = mutable.Map[Int, Int]() + val partitionRackMap = mutable.Map[Int, List[String]]() + assignment.foreach { case (partitionId, replicaList) => + val leader = replicaList.head + leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1 + for (brokerId <- replicaList) { + partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1 + val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`")) + partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List()) + } + } + ReplicaDistributions(partitionRackMap, leaderCount, partitionCount) + } + + def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): Seq[BrokerMetadata] = + rackMap.toSeq.map { case (brokerId, rack) => + BrokerMetadata(brokerId, Some(rack)) + } ++ brokersWithoutRack.map { brokerId => + BrokerMetadata(brokerId, None) + }.sortBy(_.id) + +} + +case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int]) diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala new file mode 100644 index 000000000000..0f71a19fcd5d --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.utils.{Logging, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import org.junit.Test + +class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest { + + @Test + def testRackAwareReassign() { + val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3") + TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils) + + val numPartitions = 18 + val replicationFactor = 3 + + // create a non rack aware assignment topic first + val createOpts = new kafka.admin.TopicCommand.TopicCommandOptions(Array( + "--partitions", numPartitions.toString, + "--replication-factor", replicationFactor.toString, + "--disable-rack-aware", + "--topic", "foo")) + kafka.admin.TopicCommand.createTopic(zkUtils, createOpts) + + val topicJson = """{"topics": [{"topic": "foo"}], "version":1}""" + val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils, + rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false) + + val assignment = proposedAssignment map { case (topicPartition, replicas) => + (topicPartition.partition, replicas) + } + checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor) + } + +} diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index d554b02809d8..b42aaf4d2ef9 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -27,7 +27,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils._ import kafka.coordinator.GroupCoordinator -class TopicCommandTest extends ZooKeeperTestHarness with Logging { +class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @Test def testConfigPreservationAcrossPartitionAlteration() { @@ -157,4 +157,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging { Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists")) TopicCommand.createTopic(zkUtils, createNotExistsOpts) } + + @Test + def testCreateAlterTopicWithRackAware() { + val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3") + TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils) + + val numPartitions = 18 + val replicationFactor = 3 + val createOpts = new TopicCommandOptions(Array( + "--partitions", numPartitions.toString, + "--replication-factor", replicationFactor.toString, + "--topic", "foo")) + TopicCommand.createTopic(zkUtils, createOpts) + + var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) => + tp.partition -> replicas + } + checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor) + + val alteredNumPartitions = 36 + // verify that adding partitions will also be rack aware + val alterOpts = new TopicCommandOptions(Array( + "--partitions", alteredNumPartitions.toString, + "--topic", "foo")) + TopicCommand.alterTopic(zkUtils, alterOpts) + assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) => + tp.partition -> replicas + } + checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor) + } } diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala index 905612cb801a..400d6d6d67a2 100644 --- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -27,20 +27,6 @@ import scala.collection.mutable class BrokerEndPointTest extends Logging { - @Test - def testSerDe() { - - val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) - val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint) - val origBroker = new Broker(1, listEndPoints) - val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes) - - origBroker.writeTo(brokerBytes) - - val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer]) - assert(origBroker == newBroker) - } - @Test def testHashAndEquals() { val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 7524e6a6f872..fa240d2efd1b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -530,7 +530,7 @@ class KafkaConfigTest { case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") case KafkaConfig.MetricReporterClassesProp => // ignore string - + case KafkaConfig.RackProp => // ignore string //SSL Configs case KafkaConfig.PrincipalBuilderClassProp => case KafkaConfig.SslProtocolProp => // ignore string diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 252308304d44..49fb85fb11c3 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -27,29 +27,27 @@ import java.security.cert.X509Certificate import javax.net.ssl.X509TrustManager import charset.Charset -import kafka.security.auth.{Resource, Authorizer, Acl} +import kafka.security.auth.{Acl, Authorizer, Resource} import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.test.TestSslUtils import scala.collection.mutable.{ArrayBuffer, ListBuffer} - import kafka.server._ import kafka.producer._ import kafka.message._ import kafka.api._ -import kafka.cluster.Broker -import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig} -import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} +import kafka.cluster.{Broker, EndPoint} +import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream} +import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils import kafka.producer.ProducerConfig import kafka.log._ import kafka.utils.ZkUtils._ - import org.junit.Assert._ import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer} +import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.network.Mode @@ -154,11 +152,12 @@ object TestUtils extends Logging { enablePlaintext: Boolean = true, enableSsl: Boolean = false, enableSaslPlaintext: Boolean = false, - enableSaslSsl: Boolean = false): Seq[Properties] = { + enableSaslSsl: Boolean = false, + rackInfo: Map[Int, String] = Map()): Seq[Properties] = { (0 until numConfigs).map { node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort, interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl = enableSsl, - enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl) + enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node)) } } @@ -180,7 +179,7 @@ object TestUtils extends Logging { enablePlaintext: Boolean = true, enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort, enableSsl: Boolean = false, sslPort: Int = RandomPort, - enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort) + enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] = None) : Properties = { def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol) @@ -210,6 +209,7 @@ object TestUtils extends Logging { props.put("delete.topic.enable", enableDeleteTopic.toString) props.put("controlled.shutdown.retry.backoff.ms", "100") props.put("log.cleaner.dedupe.buffer.size", "2097152") + rack.foreach(props.put("broker.rack", _)) if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")) @@ -591,9 +591,16 @@ object TestUtils extends Logging { } } - def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)) - brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1)) + def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = + createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkUtils) + + def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], zkUtils: ZkUtils): Seq[Broker] = { + val brokers = brokerMetadatas.map { b => + val protocol = SecurityProtocol.PLAINTEXT + Broker(b.id, Map(protocol -> EndPoint("localhost", 6667, protocol)).toMap, b.rack) + } + brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1, + rack = b.rack, ApiVersion.latestVersion)) brokers } diff --git a/docs/upgrade.html b/docs/upgrade.html index 15ea3ae045cf..ba3d0248718a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -21,6 +21,11 @@

Upgrading from 0.8.x or 0.9.x to 0.10. 0.10.0.0 has potential breaking changes (please review before upgrading) and there may be a performance impact during the upgrade. Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients. +

+Notes to clients with version 0.9.0.0: Due to a bug introduced in 0.9.0.0, +clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not +work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 before brokers are upgraded to +0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.

For a rolling upgrade: