Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6501: Dynamic broker config tests updates and metrics fix #4539

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 15 additions & 3 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent._
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.{Gauge, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.network.Send
Expand All @@ -40,6 +39,10 @@ import scala.reflect.ClassTag
object RequestChannel extends Logging {
private val requestLogger = Logger("kafka.request.logger")

val RequestQueueSizeMetric = "RequestQueueSize"
val ResponseQueueSizeMetric = "ResponseQueueSize"
val ProcessorMetricTag = "processor"

def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled

sealed trait BaseRequest
Expand Down Expand Up @@ -241,15 +244,16 @@ object RequestChannel extends Logging {
}

class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()

newGauge("RequestQueueSize", new Gauge[Int] {
newGauge(RequestQueueSizeMetric, new Gauge[Int] {
def value = requestQueue.size
})

newGauge("ResponseQueueSize", new Gauge[Int]{
newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize
}
Expand All @@ -258,10 +262,18 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
def addProcessor(processor: Processor): Unit = {
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")

newGauge(ResponseQueueSizeMetric,
new Gauge[Int] {
def value = processor.responseQueueSize
},
Map(ProcessorMetricTag -> processor.id.toString)
)
}

def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString))
}

/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/kafka/network/SocketServer.scala
Expand Up @@ -433,6 +433,12 @@ private[kafka] class Acceptor(val endPoint: EndPoint,

}

private[kafka] object Processor {
val IdlePercentMetricName = "IdlePercent"
val NetworkProcessorMetricTag = "networkProcessor"
val ListenerMetricTag = "listener"
}

/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
Expand All @@ -451,6 +457,7 @@ private[kafka] class Processor(val id: Int,
memoryPool: MemoryPool,
logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

import Processor._
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
Expand All @@ -471,26 +478,19 @@ private[kafka] class Processor(val id: Int,
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

private[kafka] val metricTags = mutable.LinkedHashMap(
"listener" -> listenerName.value,
"networkProcessor" -> id.toString
ListenerMetricTag -> listenerName.value,
NetworkProcessorMetricTag -> id.toString
).asJava

newGauge("ResponseQueueSize",
new Gauge[Int] {
def value = responseQueue.size()
},
Map("processor" -> id.toString)
)

newGauge("IdlePercent",
newGauge(IdlePercentMetricName,
new Gauge[Double] {
def value = {
Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value)
}
},
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
Map("networkProcessor" -> id.toString)
Map(NetworkProcessorMetricTag -> id.toString)
)

private val selector = createSelector(
Expand Down Expand Up @@ -742,6 +742,7 @@ private[kafka] class Processor(val id: Int,
close(channel.id)
}
selector.close()
removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString))
}

// 'protected` to allow override for testing
Expand Down Expand Up @@ -792,7 +793,6 @@ private[kafka] class Processor(val id: Int,

override def shutdown(): Unit = {
super.shutdown()
removeMetric("ResponseQueueSize", Map("processor" -> id.toString))
removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
}

Expand Down
Expand Up @@ -91,7 +91,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
}
}

private def getFetcherId(topic: String, partitionId: Int) : Int = {
// Visibility for testing
private[server] def getFetcherId(topic: String, partitionId: Int) : Int = {
lock synchronized {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchersPerBroker
}
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Expand Up @@ -103,12 +103,11 @@ class MetadataCache(brokerId: Int) extends Logging {
}
}

def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
inReadLock(partitionMetadataLock) {
aliveNodes.get(brokerId).map { nodeMap =>
nodeMap.getOrElse(listenerName,
throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`"))
}
// Returns None if broker is not alive or if the broker does not have a listener named `listenerName`.
// Since listeners can be added dynamically, a broker with a missing listener could be a transient error.
aliveNodes.get(brokerId).flatMap(_.get(listenerName))
}

// errorUnavailableEndpoints exists to support v0 MetadataResponses
Expand Down Expand Up @@ -203,6 +202,11 @@ class MetadataCache(brokerId: Int) extends Logging {
aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
aliveNodes(broker.id) = nodes.asScala
}
aliveNodes.get(brokerId).foreach { listenerMap =>
val listeners = listenerMap.keySet
if (!aliveNodes.values.forall(_.keySet == listeners))
error(s"Listeners are not identical across brokers: $aliveNodes")
}

val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
Expand Down