Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Add explicit result type in public defs/vals #7993

Merged
merged 1 commit into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/message/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object CompressionCodec {
object BrokerCompressionCodec {

val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
val brokerCompressionOptions: List[String] = brokerCompressionCodecs.map(codec => codec.name)

def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))

Expand All @@ -70,8 +70,8 @@ sealed trait CompressionCodec { def codec: Int; def name: String }
sealed trait BrokerCompressionCodec { def name: String }

case object DefaultCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = GZIPCompressionCodec.codec
val name = GZIPCompressionCodec.name
val codec: Int = GZIPCompressionCodec.codec
val name: String = GZIPCompressionCodec.name
}

case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ package kafka.metrics

import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{CoreUtils, VerifiableProperties}
import scala.collection.Seq

class KafkaMetricsConfig(props: VerifiableProperties) {

/**
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
val reporters = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
val reporters: Seq[String] = CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scala 2.13 build failed due to this change. To make it work, you need to add import scala.collection.Seq.

Defaults.KafkaMetricReporterClasses))

/**
* The metrics polling interval (in seconds).
*/
val pollingIntervalSecs = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
val pollingIntervalSecs: Int = props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
Defaults.KafkaMetricsPollingIntervalSeconds)
}
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.metrics
import java.util.concurrent.TimeUnit

import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, MetricName}
import com.yammer.metrics.core.{Gauge, MetricName, Meter, Histogram, Timer}
import kafka.utils.Logging
import org.apache.kafka.common.utils.Sanitizer

Expand Down Expand Up @@ -58,26 +58,26 @@ trait KafkaMetricsGroup extends Logging {
nameBuilder.append(name)
}

val scope: String = toScope(tags).getOrElse(null)
val scope: String = toScope(tags).orNull
val tagsName = toMBeanName(tags)
tagsName.foreach(nameBuilder.append(",").append(_))

new MetricName(group, typeName, name, scope, nameBuilder.toString)
}

def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty): Gauge[T] =
Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)

def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty): Meter =
Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)

def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty): Histogram =
Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)

def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty): Timer =
Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)

def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty): Unit =
Metrics.defaultRegistry().removeMetric(metricName(name, tags))

private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object RequestChannel extends Logging {
case object ShutdownRequest extends BaseRequest

case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
val sanitizedUser = Sanitizer.sanitize(principal.getName)
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}

class Metrics {
Expand Down Expand Up @@ -110,7 +110,7 @@ object RequestChannel extends Logging {

trace(s"Processor $processor received request: ${requestDesc(true)}")

def requestThreadTimeNanos = {
def requestThreadTimeNanos: Long = {
if (apiLocalCompleteTimeNanos == -1L) apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
}
Expand Down