Skip to content

Commit

Permalink
KAFKA-3765; Kafka Code style corrections
Browse files Browse the repository at this point in the history
Removed explicit returns, not needed parentheses, corrected variables, removed unused imports
Using isEmpty/nonEmpty  instead of size check, using head, flatmap instead of map-flatten

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1442 from rekhajoshm/KAFKA-3765
  • Loading branch information
rekhajoshm authored and ijuma committed May 29, 2016
1 parent 0aff450 commit 404b696
Show file tree
Hide file tree
Showing 17 changed files with 45 additions and 44 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AclCommand.scala
Expand Up @@ -117,7 +117,7 @@ object AclCommand {

val resourceToAcls: Iterable[(Resource, Set[Acl])] =
if (resources.isEmpty) authorizer.getAcls()
else resources.map(resource => (resource -> authorizer.getAcls(resource)))
else resources.map(resource => resource -> authorizer.getAcls(resource))

for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/admin/AdminClient.scala
Expand Up @@ -49,7 +49,7 @@ class AdminClient(val time: Time,
client.poll(future)

if (future.succeeded())
return future.value().responseBody()
future.value().responseBody()
else
throw future.exception()
}
Expand All @@ -61,10 +61,10 @@ class AdminClient(val time: Time,
return send(broker, api, request)
} catch {
case e: Exception =>
debug(s"Request ${api} failed against node ${broker}", e)
debug(s"Request $api failed against node $broker", e)
}
}
throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
}

private def findCoordinator(groupId: String): Node = {
Expand All @@ -88,7 +88,7 @@ class AdminClient(val time: Time,
val response = new MetadataResponse(responseBody)
val errors = response.errors()
if (!errors.isEmpty)
debug(s"Metadata request contained errors: ${errors}")
debug(s"Metadata request contained errors: $errors")
response.cluster().nodes().asScala.toList
}

Expand All @@ -100,7 +100,7 @@ class AdminClient(val time: Time,
listGroups(broker)
} catch {
case e: Exception =>
debug(s"Failed to find groups from broker ${broker}", e)
debug(s"Failed to find groups from broker $broker", e)
List[GroupOverview]()
}
}
Expand All @@ -127,7 +127,7 @@ class AdminClient(val time: Time,
val response = new DescribeGroupsResponse(responseBody)
val metadata = response.groups().get(groupId)
if (metadata == null)
throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
throw new KafkaException(s"Response from broker contained no metadata for group $groupId")

Errors.forCode(metadata.errorCode()).maybeThrow()
val members = metadata.members().map { member =>
Expand All @@ -149,7 +149,7 @@ class AdminClient(val time: Time,
return None

if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}' is not a valid consumer group")

if (group.state == "Stable") {
Some(group.members.map { member =>
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Expand Up @@ -250,7 +250,7 @@ object AdminUtils extends Logging {
checkBrokerAvailable: Boolean = true,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
if (existingPartitionsReplicaList.size == 0)
if (existingPartitionsReplicaList.isEmpty)
throw new AdminOperationException("The topic %s does not exist".format(topic))

val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
Expand All @@ -274,8 +274,8 @@ object AdminUtils extends Logging {
existingPartitionsReplicaList.size, checkBrokerAvailable)

// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
if (unmatchedRepFactorList.size != 0)
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingReplicaListForPartitionZero.size)
if (unmatchedRepFactorList.nonEmpty)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)

Expand All @@ -291,9 +291,9 @@ object AdminUtils extends Logging {
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
partitionList = partitionList.takeRight(partitionList.size - partitionId)
for (i <- 0 until partitionList.size) {
for (i <- partitionList.indices) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
if (brokerList.isEmpty)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
Expand Down Expand Up @@ -443,7 +443,7 @@ object AdminUtils extends Logging {
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = getTopicPath(topic)
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))

if (!update) {
info("Topic creation " + jsonPartitionData.toString)
Expand Down
Expand Up @@ -113,7 +113,7 @@ object ReassignPartitionsCommand extends Logging {
val (_, replicas) = assignment.head
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
(TopicAndPartition(topic, partition) -> replicas)
TopicAndPartition(topic, partition) -> replicas
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Expand Up @@ -118,7 +118,7 @@ object TopicCommand extends Logging {
def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
if (topics.length == 0 && !ifExists) {
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
Expand Down Expand Up @@ -165,7 +165,7 @@ object TopicCommand extends Logging {
def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
if (topics.length == 0 && !ifExists) {
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Expand Up @@ -18,20 +18,18 @@
package kafka.admin

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import joptsimple.OptionParser
import org.I0Itec.zkclient.exception.ZkException
import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
import org.apache.log4j.Level
import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection._
import scala.collection.mutable.Queue
import scala.concurrent._
import scala.concurrent.duration._
Expand Down Expand Up @@ -83,9 +81,9 @@ object ZkSecurityMigrator extends Logging {
if (options.has(helpOpt))
CommandLineUtils.printUsageAndDie(parser, usageMessage)

if ((jaasFile == null)) {
val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set " +
"the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))
if (jaasFile == null) {
val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " +
"the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ package kafka.api

import java.nio.ByteBuffer

import kafka.common.{TopicAndPartition}
import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
import kafka.network.{RequestOrResponseSend, RequestChannel}
import kafka.network.RequestChannel.Response
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/FileMessageSet.scala
Expand Up @@ -83,7 +83,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
this(file,
channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
start = 0,
end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue,
isSlice = false)

/**
Expand Down Expand Up @@ -224,7 +224,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
}

if (sizeInBytes > 0 && newMessages.size == 0) {
if (sizeInBytes > 0 && newMessages.isEmpty) {
// This indicates that the message is too large. We just return all the bytes in the file message set.
this
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Expand Up @@ -35,7 +35,7 @@ import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.Utils

object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}

/**
Expand Down Expand Up @@ -228,7 +228,7 @@ class Log(val dir: File,
replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
}

if(logSegments.size == 0) {
if(logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Expand Up @@ -578,12 +578,12 @@ private[log] class Cleaner(val id: Int,
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
var grouped = List[List[LogSegment]]()
var segs = segments.toList
while(!segs.isEmpty) {
while(segs.nonEmpty) {
var group = List(segs.head)
var logSize = segs.head.size
var indexSize = segs.head.index.sizeInBytes
segs = segs.tail
while(!segs.isEmpty &&
while(segs.nonEmpty &&
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogCleanerManager.scala
Expand Up @@ -100,7 +100,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
LogToClean(topicAndPartition, log, firstDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs

this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Expand Up @@ -132,10 +132,9 @@ class LogManager(val logDirs: Array[File],
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception => {
case e: Exception =>
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}
}

val jobsForDir = for {
Expand Down Expand Up @@ -282,7 +281,7 @@ class LogManager(val logDirs: Array[File],
// If the log does not exist, skip it
if (log != null) {
//May need to abort and pause the cleaning of the log, and resume after truncation is done.
val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset
if (needToStopCleaner && cleaner != null)
cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateTo(truncateOffset)
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/log/OffsetIndex.scala
Expand Up @@ -24,9 +24,11 @@ import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.locks._

import kafka.utils._
import kafka.utils.CoreUtils.inLock
import kafka.common.InvalidOffsetException
import sun.nio.ch.DirectBuffer

/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
Expand Down Expand Up @@ -306,8 +308,10 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
*/
private def forceUnmap(m: MappedByteBuffer) {
try {
if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
(m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
m match {
case buffer: DirectBuffer => buffer.cleaner().clean()
case _ =>
}
} catch {
case t: Throwable => warn("Error when freeing index buffer", t)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/ConsoleProducer.scala
Expand Up @@ -311,7 +311,7 @@ object ConsoleProducer {
line.indexOf(keySeparator) match {
case -1 =>
if (ignoreError) new ProducerRecord(topic, line.getBytes)
else throw new KafkaException(s"No key found on line ${lineNumber}: $line")
else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n =>
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
new ProducerRecord(topic, line.substring(0, n).getBytes, value)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/GetOffsetShell.scala
Expand Up @@ -77,7 +77,7 @@ object GetOffsetShell {
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
System.exit(1)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/JmxTool.scala
Expand Up @@ -89,7 +89,7 @@ object JmxTool extends Logging {
else
List(null)

val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName])

val numExpectedAttributes: Map[ObjectName, Int] =
attributesWhitelistExists match {
Expand All @@ -101,7 +101,7 @@ object JmxTool extends Logging {

// print csv header
val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
if(keys.size == numExpectedAttributes.map(_._2).sum + 1)
if(keys.size == numExpectedAttributes.values.sum + 1)
println(keys.map("\"" + _ + "\"").mkString(","))

while(true) {
Expand All @@ -111,7 +111,7 @@ object JmxTool extends Logging {
case Some(dFormat) => dFormat.format(new Date)
case None => System.currentTimeMillis().toString
}
if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1)
if(attributes.keySet.size == numExpectedAttributes.values.sum + 1)
println(keys.map(attributes(_)).mkString(","))
val sleep = max(0, interval - (System.currentTimeMillis - start))
Thread.sleep(sleep)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/MirrorMaker.scala
Expand Up @@ -494,7 +494,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// Creating one stream per each connector instance
val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
require(streams.size == 1)
val stream = streams(0)
val stream = streams.head
iter = stream.iterator()
}

Expand Down

0 comments on commit 404b696

Please sign in to comment.