From 0b11c453717dbd095025f289931ac3352f03bce6 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 4 May 2016 10:00:13 -0700 Subject: [PATCH] WIP --- core/src/main/scala/kafka/Kafka.scala | 10 +-- .../main/scala/kafka/admin/AclCommand.scala | 2 +- .../main/scala/kafka/admin/AdminClient.scala | 11 ++- .../main/scala/kafka/admin/AdminUtils.scala | 14 ++-- .../scala/kafka/admin/ConfigCommand.scala | 8 +-- .../kafka/admin/ConsumerGroupCommand.scala | 12 ++-- .../main/scala/kafka/admin/TopicCommand.scala | 13 ++-- .../kafka/admin/ZkSecurityMigrator.scala | 68 +++++++++---------- 8 files changed, 65 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 6b551ceb2b027..3129bdb6164c6 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -35,7 +35,7 @@ object Kafka extends Logging { .ofType(classOf[String]) if (args.length == 0) { - CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName())) + CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName)) } val props = Utils.loadProps(args(0)) @@ -58,14 +58,14 @@ object Kafka extends Logging { val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) // attach shutdown handler to catch control-c - Runtime.getRuntime().addShutdownHook(new Thread() { + Runtime.getRuntime.addShutdownHook(new Thread() { override def run() = { - kafkaServerStartable.shutdown + kafkaServerStartable.shutdown() } }) - kafkaServerStartable.startup - kafkaServerStartable.awaitShutdown + kafkaServerStartable.startup() + kafkaServerStartable.awaitShutdown() } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 01e95ca45b351..da94449f6d162 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -117,7 +117,7 @@ object AclCommand { val resourceToAcls: Iterable[(Resource, Set[Acl])] = if (resources.isEmpty) authorizer.getAcls() - else resources.map(resource => (resource -> authorizer.getAcls(resource))) + else resources.map(resource => resource -> authorizer.getAcls(resource)) for ((resource, acls) <- resourceToAcls) println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index ef76ffc40ee1e..ed0f136f5b1b0 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -19,11 +19,10 @@ import java.util.concurrent.atomic.AtomicInteger import kafka.common.KafkaException import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary} import kafka.utils.Logging -import org.apache.kafka.clients._ +import org.apache.kafka.clients.{ClientResponse, CommonClientConfigs, ClientUtils, NetworkClient, Metadata} import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture} import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} -import org.apache.kafka.common.errors.DisconnectException import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector import org.apache.kafka.common.protocol.types.Struct @@ -49,7 +48,7 @@ class AdminClient(val time: Time, client.poll(future) if (future.succeeded()) - return future.value().responseBody() + future.value().responseBody() else throw future.exception() } @@ -93,7 +92,7 @@ class AdminClient(val time: Time, } def listAllGroups(): Map[Node, List[GroupOverview]] = { - findAllBrokers.map { + findAllBrokers().map { case broker => broker -> { try { @@ -114,11 +113,11 @@ class AdminClient(val time: Time, } def listAllGroupsFlattened(): List[GroupOverview] = { - listAllGroups.values.flatten.toList + listAllGroups().values.flatten.toList } def listAllConsumerGroupsFlattened(): List[GroupOverview] = { - listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE) + listAllGroupsFlattened().filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE) } def describeGroup(groupId: String): GroupSummary = { diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a8a282e2c13a1..670c63d69bf4a 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -250,7 +250,7 @@ object AdminUtils extends Logging { checkBrokerAvailable: Boolean = true, rackAwareMode: RackAwareMode = RackAwareMode.Enforced) { val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic)) - if (existingPartitionsReplicaList.size == 0) + if (existingPartitionsReplicaList.isEmpty) throw new AdminOperationException("The topic %s does not exist".format(topic)) val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match { @@ -274,8 +274,8 @@ object AdminUtils extends Logging { existingPartitionsReplicaList.size, checkBrokerAvailable) // check if manual assignment has the right replication factor - val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size)) - if (unmatchedRepFactorList.size != 0) + val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size) + if (unmatchedRepFactorList.nonEmpty) throw new AdminOperationException("The replication factor in manual replication assignment " + " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size) @@ -290,12 +290,12 @@ object AdminUtils extends Logging { var partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() var partitionId = startPartitionId - partitionList = partitionList.takeRight(partitionList.size - partitionId) - for (i <- 0 until partitionList.size) { + partitionList = partitionList.takeRight(partitionList.length - partitionId) + for (i <- partitionList.indices) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - if (brokerList.size <= 0) + if (brokerList.length <= 0) throw new AdminOperationException("replication factor must be larger than 0") - if (brokerList.size != brokerList.toSet.size) + if (brokerList.length != brokerList.toSet.size) throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList) if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList)) throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString + diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index eaddd84e538d9..aad927768f6b1 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -48,7 +48,7 @@ object ConfigCommand { val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, - JaasUtils.isZkSecurityEnabled()) + JaasUtils.isZkSecurityEnabled) try { if (opts.options.has(opts.alterOpt)) @@ -57,7 +57,7 @@ object ConfigCommand { describeConfig(zkUtils, opts) } catch { case e: Throwable => - println("Error while executing topic command " + e.getMessage) + println(s"Error while executing topic command ${e.getMessage}") println(Utils.stackTrace(e)) } finally { zkUtils.close() @@ -93,7 +93,7 @@ object ConfigCommand { if (maxMessageBytes > Defaults.MaxMessageSize){ error(TopicCommand.longMessageSizeWarning(maxMessageBytes)) if (!force) - TopicCommand.askToProceed + TopicCommand.askToProceed() } } @@ -172,7 +172,7 @@ object ConfigCommand { def checkArgs() { // should have exactly one action - val actions = Seq(alterOpt, describeOpt).count(options.has _) + val actions = Seq(alterOpt, describeOpt).count(options.has) if(actions != 1) CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter") diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 414e7baee449d..a826c3e0947ea 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -47,7 +47,7 @@ object ConsumerGroupCommand { CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.") // should have exactly one action - val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) + val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has) if (actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") @@ -305,7 +305,7 @@ object ConsumerGroupCommand { private val adminClient = createAdminClient() // `consumer` is only needed for `describe`, so we instantiate it lazily - private var consumer: KafkaConsumer[String, String] = null + private lazy val consumer: KafkaConsumer[String, String] = createNewConsumer() def list() { adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId)) @@ -316,7 +316,7 @@ object ConsumerGroupCommand { if (consumerSummaries.isEmpty) println(s"Consumer group `${group}` does not exist or is rebalancing.") else { - val consumer = getConsumer() + val consumer = getConsumer printDescribeHeader() consumerSummaries.foreach { consumerSummary => val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition)) @@ -332,7 +332,7 @@ object ConsumerGroupCommand { } protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { - val consumer = getConsumer() + val consumer = getConsumer val topicPartition = new TopicPartition(topic, partition) consumer.assign(List(topicPartition).asJava) consumer.seekToEnd(List(topicPartition).asJava) @@ -351,9 +351,7 @@ object ConsumerGroupCommand { AdminClient.create(props) } - private def getConsumer() = { - if (consumer == null) - consumer = createNewConsumer() + private def getConsumer = { consumer } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 029adea1b58c8..9c86cc323eaa6 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -18,20 +18,21 @@ package kafka.admin import java.util.Properties + import joptsimple._ import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException} import kafka.consumer.{ConsumerConfig, Whitelist} -import kafka.coordinator.GroupCoordinator import kafka.log.{Defaults, LogConfig} import kafka.server.ConfigType import kafka.utils.ZkUtils._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.apache.kafka.common.internals.TopicConstants import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils + import scala.collection.JavaConversions._ import scala.collection._ -import org.apache.kafka.common.internals.TopicConstants object TopicCommand extends Logging { @@ -44,7 +45,7 @@ object TopicCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.") // should have exactly one action - val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) + val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has) if(actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") @@ -53,7 +54,7 @@ object TopicCommand extends Logging { val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, - JaasUtils.isZkSecurityEnabled()) + JaasUtils.isZkSecurityEnabled) var exitCode = 0 try { if(opts.options.has(opts.createOpt)) @@ -366,13 +367,13 @@ object TopicCommand extends Logging { if (replicas > 1) { error(longMessageSizeWarning(maxMessageBytes)) if (!force) - askToProceed + askToProceed() } else warn(shortMessageSizeWarning(maxMessageBytes)) } - def askToProceed: Unit = { + def askToProceed(): Unit = { println("Are you sure you want to continue? [y/n]") if (!Console.readLine().equalsIgnoreCase("y")) { println("Ending your session") diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 20808796dada9..d65080157a825 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -17,41 +17,37 @@ package kafka.admin -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit import joptsimple.OptionParser +import kafka.utils.{CommandLineUtils, Logging, ZkUtils} import org.I0Itec.zkclient.exception.ZkException -import kafka.utils.{Logging, ZkUtils, CommandLineUtils} -import org.apache.log4j.Level import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} -import org.apache.zookeeper.data.Stat import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.data.Stat + import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection._ import scala.collection.mutable.Queue -import scala.concurrent._ +import scala.concurrent.{Future, Promise, Await} import scala.concurrent.duration._ /** - * This tool is to be used when making access to ZooKeeper authenticated or + * This tool is to be used when making access to ZooKeeper authenticated or * the other way around, when removing authenticated access. The exact steps * to migrate a Kafka cluster from unsecure to secure with respect to ZooKeeper * access are the following: - * + * * 1- Perform a rolling upgrade of Kafka servers, setting zookeeper.set.acl to false - * and passing a valid JAAS login file via the system property + * and passing a valid JAAS login file via the system property * java.security.auth.login.config * 2- Perform a second rolling upgrade keeping the system property for the login file * and now setting zookeeper.set.acl to true - * 3- Finally run this tool. There is a script under ./bin. Run + * 3- Finally run this tool. There is a script under ./bin. Run * ./bin/zookeeper-security-migration --help * to see the configuration parameters. An example of running it is the following: * ./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181 - * + * * To convert a cluster from secure to unsecure, we need to perform the following * steps: * 1- Perform a rolling upgrade setting zookeeper.set.acl to false for each server @@ -62,13 +58,13 @@ import scala.concurrent.duration._ object ZkSecurityMigrator extends Logging { val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of " - + "znodes as part of the process of setting up ZooKeeper " + + "nodes as part of the process of setting up ZooKeeper " + "authentication.") def run(args: Array[String]) { - var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) val parser = new OptionParser() - val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure." + val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka Znodes in ZooKeeper secure or unsecure." + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String]) val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " + "takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181"). @@ -83,17 +79,17 @@ object ZkSecurityMigrator extends Logging { if (options.has(helpOpt)) CommandLineUtils.printUsageAndDie(parser, usageMessage) - if ((jaasFile == null)) { - val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set " + - "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)) - System.out.println("ERROR: %s".format(errorMsg)) + if (jaasFile == null) { + val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " + + "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + println(s"ERROR: $errorMsg") throw new IllegalArgumentException("Incorrect configuration") } - if (!JaasUtils.isZkSecurityEnabled()) { + if (!JaasUtils.isZkSecurityEnabled) { val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile) - System.out.println("ERROR: %s".format(errorMsg)) - throw new IllegalArgumentException("Incorrect configuration") + println(s"ERROR: $errorMsg") + throw new IllegalArgumentException("Incorrect configuration") } val zkAcl: Boolean = options.valueOf(zkAclOpt) match { @@ -118,14 +114,12 @@ object ZkSecurityMigrator extends Logging { try { run(args) } catch { - case e: Exception => - e.printStackTrace() + case e: Exception => e.printStackTrace() } } } class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { - private val workQueue = new LinkedBlockingQueue[Runnable] private val futures = new Queue[Future[String]] private def setAcl(path: String, setPromise: Promise[String]) = { @@ -177,15 +171,15 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { case Code.CONNECTIONLOSS => zkHandle.getChildren(path, false, GetChildrenCallback, ctx) case Code.NONODE => - warn("Node is gone, it could be have been legitimately deleted: %s".format(path)) + warn(s"Node is gone, it could be have been legitimately deleted: $path") promise success "done" case Code.SESSIONEXPIRED => // Starting a new session isn't really a problem, but it'd complicate // the logic of the tool, so we quit and let the user re-run it. - System.out.println("ZooKeeper session expired while changing ACLs") + println("ZooKeeper session expired while changing ACLs") promise failure ZkException.create(KeeperException.create(Code.get(rc))) case _ => - System.out.println("Unexpected return code: %d".format(rc)) + println(s"Unexpected return code: $rc") promise failure ZkException.create(KeeperException.create(Code.get(rc))) } } @@ -211,10 +205,10 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { case Code.SESSIONEXPIRED => // Starting a new session isn't really a problem, but it'd complicate // the logic of the tool, so we quit and let the user re-run it. - System.out.println("ZooKeeper session expired while changing ACLs") + println("ZooKeeper session expired while changing ACLs") promise failure ZkException.create(KeeperException.create(Code.get(rc))) case _ => - System.out.println("Unexpected return code: %d".format(rc)) + println("Unexpected return code: %d".format(rc)) promise failure ZkException.create(KeeperException.create(Code.get(rc))) } } @@ -228,24 +222,24 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { zkUtils.makeSurePersistentPathExists(path) setAclsRecursively(path) } - + @tailrec def recurse(): Unit = { - val future = futures.synchronized { + val future = futures.synchronized { futures.headOption } future match { case Some(a) => - Await.result(a, 6000 millis) - futures.synchronized { futures.dequeue } - recurse + Await.result(a, 6000.millis) + futures.synchronized {futures.dequeue } + recurse() case None => } } recurse() } finally { - zkUtils.close + zkUtils.close() } } }