Skip to content

Commit

Permalink
Standardize Zk data structures for Re-assign partitions and Preferred…
Browse files Browse the repository at this point in the history
… replication election; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-779
  • Loading branch information
Swapnil Ghike authored and junrao committed Mar 6, 2013
1 parent 2457bc4 commit eae1bd5
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.common.{TopicAndPartition, AdminCommandFailedException}
import collection._
import mutable.ListBuffer

object PreferredReplicaLeaderElectionCommand extends Logging {

def main(args: Array[String]): Unit = {
val parser = new OptionParser
val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " +
"for which preferred replica leader election should be done, in the following format - \n" +
"[{\"topic\": \"foo\", \"partition\": \"1\"}, {\"topic\": \"foobar\", \"partition\": \"2\"}]. \n" +
"{\"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" +
"Defaults to all existing partitions")
.withRequiredArg
.describedAs("list of partitions for which preferred replica leader election needs to be triggered")
Expand Down Expand Up @@ -67,14 +69,18 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}
}

def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = {
def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
Json.parseFull(jsonString) match {
case Some(partitionList) =>
val partitions = (partitionList.asInstanceOf[List[Any]])
Set.empty[TopicAndPartition] ++ partitions.map { m =>
val topic = m.asInstanceOf[Map[String, String]].get("topic").get
val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
TopicAndPartition(topic, partition)
case Some(m) =>
m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(partitionsList) =>
val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
partitions.map { p =>
val topic = p.get("topic").get.asInstanceOf[String]
val partition = p.get("partition").get.asInstanceOf[Int]
TopicAndPartition(topic, partition)
}.toSet
case None => throw new AdministrationException("Preferred replica election data is empty")
}
case None => throw new AdministrationException("Preferred replica election data is empty")
}
Expand All @@ -83,9 +89,13 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
def writePreferredReplicaElectionData(zkClient: ZkClient,
partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
val jsonData = Utils.seqToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
Utils.mapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)), valueInQuotes = true)
}.toSeq.sorted, valueInQuotes = false)
var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
for (p <- partitionsUndergoingPreferredReplicaElection) {
partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++
Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false))
}
val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false)
val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false)
try {
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
info("Created preferred replica election path with %s".format(jsonData))
Expand Down
23 changes: 7 additions & 16 deletions core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object ReassignPartitionsCommand extends Logging {
val parser = new OptionParser
val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
"new replicas they should be reassigned to in the following format - \n" +
"[{\"topic\": \"foo\", \"partition\": \"1\", \"replicas\": \"1,2,3\" }]")
"{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }]\n}")
.withRequiredArg
.describedAs("partition reassignment json file path")
.ofType(classOf[String])
Expand All @@ -55,18 +55,9 @@ object ReassignPartitionsCommand extends Logging {

try {
// read the json file into a string
val partitionsToBeReassigned = Json.parseFull(jsonString) match {
case Some(reassignedPartitions) =>
val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
partitions.map { m =>
val topic = m.asInstanceOf[Map[String, String]].get("topic").get
val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
val newReplicas = replicasList.split(",").map(_.toInt)
(TopicAndPartition(topic, partition), newReplicas.toSeq)
}.toMap
case None => throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
}
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))

zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
Expand All @@ -86,15 +77,15 @@ object ReassignPartitionsCommand extends Logging {
}
}

class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[TopicAndPartition, Seq[Int]])
class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
extends Logging {
def reassignPartitions(): Boolean = {
try {
val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
val jsonReassignmentData = Utils.mapWithSeqValuesToJson(validPartitions.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
true
}catch {
} catch {
case ze: ZkNodeExistsException =>
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
throw new AdminCommandFailedException("Partition reassignment currently in " +
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/api/ProducerRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.api

import java.nio._
import kafka.message._
import scala.collection.Map
import kafka.api.ApiUtils._
import kafka.common._
import kafka.network.RequestChannel.Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
info("begin registering consumer " + consumerIdString + " in ZK")
val consumerRegistrationInfo =
Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false),
Utils.mapToJson(Map("pattern" -> topicCount.pattern), valueInQuotes = true)))
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
info("end registering consumer " + consumerIdString + " in ZK")
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/tools/KafkaMigrationTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;


Expand Down
71 changes: 26 additions & 45 deletions core/src/main/scala/kafka/utils/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -465,27 +465,42 @@ object Utils extends Logging {
def nullOrEmpty(s: String): Boolean = s == null || s.equals("")

/**
* Format a Map[String, String] as JSON object.
* Merge JSON fields of the format "key" : value/object/array.
*/
def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
def mergeJsonFields(objects: Seq[String]): String = {
val builder = new StringBuilder
builder.append("{ ")
var numElements = 0
for ( (key, value) <- jsonDataMap.toList.sorted) {
if (numElements > 0)
builder.append(", ")
builder.append(objects.sorted.map(_.trim).mkString(", "))
builder.append(" }")
builder.toString
}

/**
* Format a Map[String, String] as JSON object.
*/
def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = {
val jsonFields: mutable.ListBuffer[String] = ListBuffer()
val builder = new StringBuilder
for ((key, value) <- jsonDataMap.toList.sorted) {
builder.append("\"" + key + "\":")
if (valueInQuotes)
builder.append("\"" + value + "\"")
else
builder.append(value)
numElements += 1
jsonFields += builder.toString
builder.clear()
}
builder.append(" }")
builder.toString
jsonFields
}

/**
* Format a Map[String, String] as JSON object.
*/
def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes))
}

/**
* Format a Seq[String] as JSON array.
*/
def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = {
Expand All @@ -504,44 +519,10 @@ object Utils extends Logging {
*/

def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = {
val builder = new StringBuilder
builder.append("{ ")
var numElements = 0
for ((key, value) <- jsonDataMap.toList.sortBy(_._1)) {
if (numElements > 0)
builder.append(", ")
builder.append("\"" + key + "\": ")
builder.append(Utils.seqToJson(value.map(_.toString), valueInQuotes = false))
numElements += 1
}
builder.append(" }")
builder.toString
}


/**
* Merge arbitrary JSON objects.
*/
def mergeJsonObjects(objects: Seq[String]): String = {
val builder = new StringBuilder
builder.append("{ ")
var obs = List[String]()
objects.foreach(ob => obs = obs ::: getJsonContents(ob).split(',').toList)
obs = obs.sorted.map(_.trim)
builder.append(obs.mkString(", "))
builder.append(" }")
builder.toString
mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))),
valueInQuotes = false))
}

/**
* Get the contents of a JSON object or array.
*/
def getJsonContents(str: String): String = {
str.trim().substring(1, str.length - 1)
}



/**
* Create a circular (looping) iterator over a collection.
* @param coll An iterable over the underlying collection.
Expand Down
51 changes: 34 additions & 17 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
import org.I0Itec.zkclient.serialize.ZkSerializer
import collection._
import kafka.api.LeaderAndIsr
import mutable.ListBuffer
import org.apache.zookeeper.data.Stat
import java.util.concurrent.locks.{ReentrantLock, Condition}
import kafka.admin._
Expand Down Expand Up @@ -183,8 +184,9 @@ object ZkUtils extends Logging {
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val brokerInfo =
Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("host" -> host), valueInQuotes = true),
Utils.mapToJson(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString), valueInQuotes = false)))
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString),
valueInQuotes = false))
try {
createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
} catch {
Expand All @@ -209,7 +211,7 @@ object ZkUtils extends Logging {
* Get JSON partition to replica map from zookeeper.
*/
def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = {
val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map.map(e => (e._1.toString -> e._2)))
val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map)
Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false)
}

Expand Down Expand Up @@ -559,26 +561,41 @@ object ZkUtils extends Logging {
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
reassignedPartitions.map { p =>
val newReplicas = p._2
(p._1 -> new ReassignedPartitionsContext(newReplicas))
}
reassignedPartitions.map(p => (p._1 -> new ReassignedPartitionsContext(p._2)))
case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
}
}

def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = {
def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map()
Json.parseFull(jsonData) match {
case Some(m) =>
val replicaMap = m.asInstanceOf[Map[String, Seq[Int]]]
replicaMap.map { reassignedPartitions =>
val topic = reassignedPartitions._1.split(",").head.trim
val partition = reassignedPartitions._1.split(",").last.trim.toInt
val newReplicas = reassignedPartitions._2
TopicAndPartition(topic, partition) -> newReplicas
m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(partitionsSeq) =>
partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => {
val topic = p.get("topic").get.asInstanceOf[String]
val partition = p.get("partition").get.asInstanceOf[Int]
val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas
})
case None =>
}
case None => Map.empty[TopicAndPartition, Seq[Int]]
case None =>
}
reassignedPartitions
}

def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]()
for (p <- partitionsToBeReassigned) {
val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false)
val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true)
val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData),
valueInQuotes = false)
jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData)
}
Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)),
valueInQuotes = false)
}

def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
Expand All @@ -588,11 +605,11 @@ object ZkUtils extends Logging {
deletePath(zkClient, zkPath)
info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
case _ =>
val jsonData = Utils.mapWithSeqValuesToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned)
try {
updatePersistentPath(zkClient, zkPath, jsonData)
info("Updated partition reassignment path with %s".format(jsonData))
}catch {
} catch {
case nne: ZkNoNodeException =>
ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/unit/kafka/utils/UtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Assert._
import kafka.common.KafkaException
import org.junit.{Test}
import kafka.tools.KafkaMigrationTool
import org.junit.Test


class UtilsTest extends JUnitSuite {
Expand Down

0 comments on commit eae1bd5

Please sign in to comment.