From 404b696bea58aca17fbe528aed03cb3c94516c39 Mon Sep 17 00:00:00 2001 From: Joshi Date: Sun, 29 May 2016 09:01:20 +0100 Subject: [PATCH] KAFKA-3765; Kafka Code style corrections Removed explicit returns, not needed parentheses, corrected variables, removed unused imports Using isEmpty/nonEmpty instead of size check, using head, flatmap instead of map-flatten Author: Joshi Author: Rekha Joshi Reviewers: Ismael Juma Closes #1442 from rekhajoshm/KAFKA-3765 --- core/src/main/scala/kafka/admin/AclCommand.scala | 2 +- core/src/main/scala/kafka/admin/AdminClient.scala | 14 +++++++------- core/src/main/scala/kafka/admin/AdminUtils.scala | 12 ++++++------ .../kafka/admin/ReassignPartitionsCommand.scala | 2 +- core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++-- .../scala/kafka/admin/ZkSecurityMigrator.scala | 14 ++++++-------- .../kafka/api/ControlledShutdownRequest.scala | 2 +- core/src/main/scala/kafka/log/FileMessageSet.scala | 4 ++-- core/src/main/scala/kafka/log/Log.scala | 4 ++-- core/src/main/scala/kafka/log/LogCleaner.scala | 4 ++-- .../main/scala/kafka/log/LogCleanerManager.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 5 ++--- core/src/main/scala/kafka/log/OffsetIndex.scala | 8 ++++++-- .../main/scala/kafka/tools/ConsoleProducer.scala | 2 +- .../main/scala/kafka/tools/GetOffsetShell.scala | 2 +- core/src/main/scala/kafka/tools/JmxTool.scala | 6 +++--- core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- 17 files changed, 45 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 966c4beee0eb..080f8097c728 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -117,7 +117,7 @@ object AclCommand { val resourceToAcls: Iterable[(Resource, Set[Acl])] = if (resources.isEmpty) authorizer.getAcls() - else resources.map(resource => (resource -> authorizer.getAcls(resource))) + else resources.map(resource => resource -> authorizer.getAcls(resource)) for ((resource, acls) <- resourceToAcls) println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 8572cebe3a25..556a02b9d411 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -49,7 +49,7 @@ class AdminClient(val time: Time, client.poll(future) if (future.succeeded()) - return future.value().responseBody() + future.value().responseBody() else throw future.exception() } @@ -61,10 +61,10 @@ class AdminClient(val time: Time, return send(broker, api, request) } catch { case e: Exception => - debug(s"Request ${api} failed against node ${broker}", e) + debug(s"Request $api failed against node $broker", e) } } - throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}") + throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers") } private def findCoordinator(groupId: String): Node = { @@ -88,7 +88,7 @@ class AdminClient(val time: Time, val response = new MetadataResponse(responseBody) val errors = response.errors() if (!errors.isEmpty) - debug(s"Metadata request contained errors: ${errors}") + debug(s"Metadata request contained errors: $errors") response.cluster().nodes().asScala.toList } @@ -100,7 +100,7 @@ class AdminClient(val time: Time, listGroups(broker) } catch { case e: Exception => - debug(s"Failed to find groups from broker ${broker}", e) + debug(s"Failed to find groups from broker $broker", e) List[GroupOverview]() } } @@ -127,7 +127,7 @@ class AdminClient(val time: Time, val response = new DescribeGroupsResponse(responseBody) val metadata = response.groups().get(groupId) if (metadata == null) - throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}") + throw new KafkaException(s"Response from broker contained no metadata for group $groupId") Errors.forCode(metadata.errorCode()).maybeThrow() val members = metadata.members().map { member => @@ -149,7 +149,7 @@ class AdminClient(val time: Time, return None if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) - throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group") + throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}' is not a valid consumer group") if (group.state == "Stable") { Some(group.members.map { member => diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a8a282e2c13a..53b6dd72a0b6 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -250,7 +250,7 @@ object AdminUtils extends Logging { checkBrokerAvailable: Boolean = true, rackAwareMode: RackAwareMode = RackAwareMode.Enforced) { val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic)) - if (existingPartitionsReplicaList.size == 0) + if (existingPartitionsReplicaList.isEmpty) throw new AdminOperationException("The topic %s does not exist".format(topic)) val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match { @@ -274,8 +274,8 @@ object AdminUtils extends Logging { existingPartitionsReplicaList.size, checkBrokerAvailable) // check if manual assignment has the right replication factor - val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size)) - if (unmatchedRepFactorList.size != 0) + val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size) + if (unmatchedRepFactorList.nonEmpty) throw new AdminOperationException("The replication factor in manual replication assignment " + " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size) @@ -291,9 +291,9 @@ object AdminUtils extends Logging { val ret = new mutable.HashMap[Int, List[Int]]() var partitionId = startPartitionId partitionList = partitionList.takeRight(partitionList.size - partitionId) - for (i <- 0 until partitionList.size) { + for (i <- partitionList.indices) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) - if (brokerList.size <= 0) + if (brokerList.isEmpty) throw new AdminOperationException("replication factor must be larger than 0") if (brokerList.size != brokerList.toSet.size) throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList) @@ -443,7 +443,7 @@ object AdminUtils extends Logging { private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { try { val zkPath = getTopicPath(topic) - val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2))) + val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2)) if (!update) { info("Topic creation " + jsonPartitionData.toString) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 1bf351a05e0a..fae0a4045209 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -113,7 +113,7 @@ object ReassignPartitionsCommand extends Logging { val (_, replicas) = assignment.head val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size) partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) => - (TopicAndPartition(topic, partition) -> replicas) + TopicAndPartition(topic, partition) -> replicas } } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index e6ebb96c872b..c643a9df688d 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -118,7 +118,7 @@ object TopicCommand extends Logging { def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { val topics = getTopics(zkUtils, opts) val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false - if (topics.length == 0 && !ifExists) { + if (topics.isEmpty && !ifExists) { throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), opts.options.valueOf(opts.zkConnectOpt))) } @@ -165,7 +165,7 @@ object TopicCommand extends Logging { def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { val topics = getTopics(zkUtils, opts) val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false - if (topics.length == 0 && !ifExists) { + if (topics.isEmpty && !ifExists) { throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), opts.options.valueOf(opts.zkConnectOpt))) } diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 20808796dada..a87e5b7c836b 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -18,20 +18,18 @@ package kafka.admin import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadPoolExecutor -import java.util.concurrent.TimeUnit + import joptsimple.OptionParser import org.I0Itec.zkclient.exception.ZkException -import kafka.utils.{Logging, ZkUtils, CommandLineUtils} -import org.apache.log4j.Level +import kafka.utils.{CommandLineUtils, Logging, ZkUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback} import org.apache.zookeeper.data.Stat import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code + import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection._ import scala.collection.mutable.Queue import scala.concurrent._ import scala.concurrent.duration._ @@ -83,9 +81,9 @@ object ZkSecurityMigrator extends Logging { if (options.has(helpOpt)) CommandLineUtils.printUsageAndDie(parser, usageMessage) - if ((jaasFile == null)) { - val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set " + - "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)) + if (jaasFile == null) { + val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " + + "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) System.out.println("ERROR: %s".format(errorMsg)) throw new IllegalArgumentException("Incorrect configuration") } diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index b875e3e0a362..42a17e67fca3 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.{TopicAndPartition} +import kafka.common.TopicAndPartition import kafka.api.ApiUtils._ import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index a164b4b96730..a454f2cbe2ae 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -83,7 +83,7 @@ class FileMessageSet private[kafka](@volatile var file: File, this(file, channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate), start = 0, - end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue), + end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue, isSlice = false) /** @@ -224,7 +224,7 @@ class FileMessageSet private[kafka](@volatile var file: File, } } - if (sizeInBytes > 0 && newMessages.size == 0) { + if (sizeInBytes > 0 && newMessages.isEmpty) { // This indicates that the message is too large. We just return all the bytes in the file message set. this } else { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a7549dc134a7..62dc7a178840 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -35,7 +35,7 @@ import com.yammer.metrics.core.Gauge import org.apache.kafka.common.utils.Utils object LogAppendInfo { - val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false) + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) } /** @@ -228,7 +228,7 @@ class Log(val dir: File, replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true) } - if(logSegments.size == 0) { + if(logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 segments.put(0L, new LogSegment(dir = dir, startOffset = 0, diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index c6636be09428..4c0db0d9d451 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -578,12 +578,12 @@ private[log] class Cleaner(val id: Int, private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = { var grouped = List[List[LogSegment]]() var segs = segments.toList - while(!segs.isEmpty) { + while(segs.nonEmpty) { var group = List(segs.head) var logSize = segs.head.size var indexSize = segs.head.index.sizeInBytes segs = segs.tail - while(!segs.isEmpty && + while(segs.nonEmpty && logSize + segs.head.size <= maxSize && indexSize + segs.head.index.sizeInBytes <= maxIndexSize && segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) { diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index f92db4ed844f..72757c083d44 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -100,7 +100,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To LogToClean(topicAndPartition, log, firstDirtyOffset) }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs - this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 + this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 // and must meet the minimum threshold for dirty byte ratio val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio) if(cleanableLogs.isEmpty) { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 749c6229a7d0..4357ef4c5d36 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -132,10 +132,9 @@ class LogManager(val logDirs: Array[File], try { recoveryPoints = this.recoveryPointCheckpoints(dir).read } catch { - case e: Exception => { + case e: Exception => warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e) warn("Resetting the recovery checkpoint to 0") - } } val jobsForDir = for { @@ -282,7 +281,7 @@ class LogManager(val logDirs: Array[File], // If the log does not exist, skip it if (log != null) { //May need to abort and pause the cleaning of the log, and resume after truncation is done. - val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset) + val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset if (needToStopCleaner && cleaner != null) cleaner.abortAndPauseCleaning(topicAndPartition) log.truncateTo(truncateOffset) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index ce35d6874c7c..f4327324e063 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -24,9 +24,11 @@ import java.io._ import java.nio._ import java.nio.channels._ import java.util.concurrent.locks._ + import kafka.utils._ import kafka.utils.CoreUtils.inLock import kafka.common.InvalidOffsetException +import sun.nio.ch.DirectBuffer /** * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: @@ -306,8 +308,10 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, */ private def forceUnmap(m: MappedByteBuffer) { try { - if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) - (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + m match { + case buffer: DirectBuffer => buffer.cleaner().clean() + case _ => + } } catch { case t: Throwable => warn("Error when freeing index buffer", t) } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index e6476015f190..4cc7c2000bc1 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -311,7 +311,7 @@ object ConsoleProducer { line.indexOf(keySeparator) match { case -1 => if (ignoreError) new ProducerRecord(topic, line.getBytes) - else throw new KafkaException(s"No key found on line ${lineNumber}: $line") + else throw new KafkaException(s"No key found on line $lineNumber: $line") case n => val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes new ProducerRecord(topic, line.substring(0, n).getBytes, value) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 30c7afe7edca..f7207eca2fc8 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -77,7 +77,7 @@ object GetOffsetShell { val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index bd7ca0e85365..8112f9ea66ef 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -89,7 +89,7 @@ object JmxTool extends Logging { else List(null) - val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten + val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]) val numExpectedAttributes: Map[ObjectName, Int] = attributesWhitelistExists match { @@ -101,7 +101,7 @@ object JmxTool extends Logging { // print csv header val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted - if(keys.size == numExpectedAttributes.map(_._2).sum + 1) + if(keys.size == numExpectedAttributes.values.sum + 1) println(keys.map("\"" + _ + "\"").mkString(",")) while(true) { @@ -111,7 +111,7 @@ object JmxTool extends Logging { case Some(dFormat) => dFormat.format(new Date) case None => System.currentTimeMillis().toString } - if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1) + if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) println(keys.map(attributes(_)).mkString(",")) val sleep = max(0, interval - (System.currentTimeMillis - start)) Thread.sleep(sleep) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 87f3cc53bafb..9d5f7e6040c4 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -494,7 +494,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // Creating one stream per each connector instance val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) require(streams.size == 1) - val stream = streams(0) + val stream = streams.head iter = stream.iterator() }