Skip to content

Commit

Permalink
MINOR: scala explicit type
Browse files Browse the repository at this point in the history
  • Loading branch information
high.lee committed Jan 29, 2020
1 parent 0ecb5bb commit 72711ed
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/message/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object CompressionCodec {
object BrokerCompressionCodec {

val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
val brokerCompressionOptions: List[String] = brokerCompressionCodecs.map(codec => codec.name)

def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))

Expand All @@ -70,8 +70,8 @@ sealed trait CompressionCodec { def codec: Int; def name: String }
sealed trait BrokerCompressionCodec { def name: String }

case object DefaultCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = GZIPCompressionCodec.codec
val name = GZIPCompressionCodec.name
val codec: Int = GZIPCompressionCodec.codec
val name: String = GZIPCompressionCodec.name
}

case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ package kafka.metrics

import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{CoreUtils, VerifiableProperties}
import scala.collection.Seq

class KafkaMetricsConfig(props: VerifiableProperties) {

/**
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
val reporters = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
val reporters: Seq[String] = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
Defaults.KafkaMetricReporterClasses))

/**
* The metrics polling interval (in seconds).
*/
val pollingIntervalSecs = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
val pollingIntervalSecs: Int = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
Defaults.KafkaMetricsPollingIntervalSeconds)
}
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.metrics
import java.util.concurrent.TimeUnit

import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, MetricName}
import com.yammer.metrics.core.{Gauge, MetricName, Meter, Histogram, Timer}
import kafka.utils.Logging
import org.apache.kafka.common.utils.Sanitizer

Expand Down Expand Up @@ -58,26 +58,26 @@ trait KafkaMetricsGroup extends Logging {
nameBuilder.append(name)
}

val scope: String = toScope(tags).getOrElse(null)
val scope: String = toScope(tags).orNull
val tagsName = toMBeanName(tags)
tagsName.foreach(nameBuilder.append(",").append(_))

new MetricName(group, typeName, name, scope, nameBuilder.toString)
}

def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty): Gauge[T] =
Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)

def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty): Meter =
Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)

def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty): Histogram =
Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)

def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty): Timer =
Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)

def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty): Unit =
Metrics.defaultRegistry().removeMetric(metricName(name, tags))

private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object RequestChannel extends Logging {
case object ShutdownRequest extends BaseRequest

case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
val sanitizedUser = Sanitizer.sanitize(principal.getName)
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}

class Metrics {
Expand Down Expand Up @@ -110,7 +110,7 @@ object RequestChannel extends Logging {

trace(s"Processor $processor received request: ${requestDesc(true)}")

def requestThreadTimeNanos = {
def requestThreadTimeNanos: Long = {
if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
}
Expand Down

0 comments on commit 72711ed

Please sign in to comment.