From 72711edfcdde158eeaa45ae5e833b5643bb71026 Mon Sep 17 00:00:00 2001 From: "high.lee" Date: Wed, 22 Jan 2020 00:27:32 +0900 Subject: [PATCH] MINOR: scala explicit type --- .../scala/kafka/message/CompressionCodec.scala | 6 +++--- .../scala/kafka/metrics/KafkaMetricsConfig.scala | 5 +++-- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 14 +++++++------- .../main/scala/kafka/network/RequestChannel.scala | 4 ++-- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index abe3694b0359..b174feaea42d 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -47,7 +47,7 @@ object CompressionCodec { object BrokerCompressionCodec { val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec) - val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name) + val brokerCompressionOptions: List[String] = brokerCompressionCodecs.map(codec => codec.name) def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT)) @@ -70,8 +70,8 @@ sealed trait CompressionCodec { def codec: Int; def name: String } sealed trait BrokerCompressionCodec { def name: String } case object DefaultCompressionCodec extends CompressionCodec with BrokerCompressionCodec { - val codec = GZIPCompressionCodec.codec - val name = GZIPCompressionCodec.name + val codec: Int = GZIPCompressionCodec.codec + val name: String = GZIPCompressionCodec.name } case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec { diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala index a3a21bdbca0d..b13a1b9350fb 100755 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala @@ -22,6 +22,7 @@ package kafka.metrics import kafka.server.{Defaults, KafkaConfig} import kafka.utils.{CoreUtils, VerifiableProperties} +import scala.collection.Seq class KafkaMetricsConfig(props: VerifiableProperties) { @@ -29,12 +30,12 @@ class KafkaMetricsConfig(props: VerifiableProperties) { * Comma-separated list of reporter types. These classes should be on the * classpath and will be instantiated at run-time. */ - val reporters = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp, + val reporters: Seq[String] = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp, Defaults.KafkaMetricReporterClasses)) /** * The metrics polling interval (in seconds). */ - val pollingIntervalSecs = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp, + val pollingIntervalSecs: Int = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp, Defaults.KafkaMetricsPollingIntervalSeconds) } diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index 03a4f7ccd4c0..53088b347d9b 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -20,7 +20,7 @@ package kafka.metrics import java.util.concurrent.TimeUnit import com.yammer.metrics.Metrics -import com.yammer.metrics.core.{Gauge, MetricName} +import com.yammer.metrics.core.{Gauge, MetricName, Meter, Histogram, Timer} import kafka.utils.Logging import org.apache.kafka.common.utils.Sanitizer @@ -58,26 +58,26 @@ trait KafkaMetricsGroup extends Logging { nameBuilder.append(name) } - val scope: String = toScope(tags).getOrElse(null) + val scope: String = toScope(tags).orNull val tagsName = toMBeanName(tags) tagsName.foreach(nameBuilder.append(",").append(_)) new MetricName(group, typeName, name, scope, nameBuilder.toString) } - def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) = + def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty): Gauge[T] = Metrics.defaultRegistry().newGauge(metricName(name, tags), metric) - def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) = + def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty): Meter = Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit) - def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) = + def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty): Histogram = Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased) - def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) = + def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty): Timer = Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit) - def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) = + def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty): Unit = Metrics.defaultRegistry().removeMetric(metricName(name, tags)) private def toMBeanName(tags: collection.Map[String, String]): Option[String] = { diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index d9f2281a8fde..531cac134500 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -49,7 +49,7 @@ object RequestChannel extends Logging { case object ShutdownRequest extends BaseRequest case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) { - val sanitizedUser = Sanitizer.sanitize(principal.getName) + val sanitizedUser: String = Sanitizer.sanitize(principal.getName) } class Metrics { @@ -110,7 +110,7 @@ object RequestChannel extends Logging { trace(s"Processor $processor received request: ${requestDesc(true)}") - def requestThreadTimeNanos = { + def requestThreadTimeNanos: Long = { if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L) }