Skip to content

Commit

Permalink
KAFKA-6501; Dynamic broker config tests updates and metrics fix (#4539)
Browse files Browse the repository at this point in the history
1. Handle listener-not-found in MetadataCache since this can occur when listeners are being updated. To avoid breaking clients, this is handled in the same way as broker-not-available so that clients may retry.
2. Set retries=1000 for listener reconfiguration tests to avoid transient failures when metadata cache has not been updated 
3. Remove IdlePercent metric when Processor is deleted, add test
4. Reduce log segment size used during reconfiguration to avoid timeout while waiting for log rolling
5.Test markPartitionsForTruncation after fetcher thread resize
6. Move per-processor ResponseQueueSize metric back to RequestChannel.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
rajinisivaram authored and hachikuji committed Feb 9, 2018
1 parent 77a6104 commit 15bc405
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 55 deletions.
18 changes: 15 additions & 3 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent._
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.{Gauge, Meter}
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.network.Send
Expand All @@ -40,6 +39,10 @@ import scala.reflect.ClassTag
object RequestChannel extends Logging {
private val requestLogger = Logger("kafka.request.logger")

val RequestQueueSizeMetric = "RequestQueueSize"
val ResponseQueueSizeMetric = "ResponseQueueSize"
val ProcessorMetricTag = "processor"

def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled

sealed trait BaseRequest
Expand Down Expand Up @@ -241,15 +244,16 @@ object RequestChannel extends Logging {
}

class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()

newGauge("RequestQueueSize", new Gauge[Int] {
newGauge(RequestQueueSizeMetric, new Gauge[Int] {
def value = requestQueue.size
})

newGauge("ResponseQueueSize", new Gauge[Int]{
newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize
}
Expand All @@ -258,10 +262,18 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
def addProcessor(processor: Processor): Unit = {
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")

newGauge(ResponseQueueSizeMetric,
new Gauge[Int] {
def value = processor.responseQueueSize
},
Map(ProcessorMetricTag -> processor.id.toString)
)
}

def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString))
}

/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ private[kafka] class Acceptor(val endPoint: EndPoint,

}

private[kafka] object Processor {
val IdlePercentMetricName = "IdlePercent"
val NetworkProcessorMetricTag = "networkProcessor"
val ListenerMetricTag = "listener"
}

/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
Expand All @@ -451,6 +457,7 @@ private[kafka] class Processor(val id: Int,
memoryPool: MemoryPool,
logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

import Processor._
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
Expand All @@ -471,26 +478,19 @@ private[kafka] class Processor(val id: Int,
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

private[kafka] val metricTags = mutable.LinkedHashMap(
"listener" -> listenerName.value,
"networkProcessor" -> id.toString
ListenerMetricTag -> listenerName.value,
NetworkProcessorMetricTag -> id.toString
).asJava

newGauge("ResponseQueueSize",
new Gauge[Int] {
def value = responseQueue.size()
},
Map("processor" -> id.toString)
)

newGauge("IdlePercent",
newGauge(IdlePercentMetricName,
new Gauge[Double] {
def value = {
Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value)
}
},
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
Map("networkProcessor" -> id.toString)
Map(NetworkProcessorMetricTag -> id.toString)
)

private val selector = createSelector(
Expand Down Expand Up @@ -742,6 +742,7 @@ private[kafka] class Processor(val id: Int,
close(channel.id)
}
selector.close()
removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString))
}

// 'protected` to allow override for testing
Expand Down Expand Up @@ -792,7 +793,6 @@ private[kafka] class Processor(val id: Int,

override def shutdown(): Unit = {
super.shutdown()
removeMetric("ResponseQueueSize", Map("processor" -> id.toString))
removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
}
}

private def getFetcherId(topic: String, partitionId: Int) : Int = {
// Visibility for testing
private[server] def getFetcherId(topic: String, partitionId: Int) : Int = {
lock synchronized {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchersPerBroker
}
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,11 @@ class MetadataCache(brokerId: Int) extends Logging {
}
}

def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
inReadLock(partitionMetadataLock) {
aliveNodes.get(brokerId).map { nodeMap =>
nodeMap.getOrElse(listenerName,
throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`"))
}
// Returns None if broker is not alive or if the broker does not have a listener named `listenerName`.
// Since listeners can be added dynamically, a broker with a missing listener could be a transient error.
aliveNodes.get(brokerId).flatMap(_.get(listenerName))
}

// errorUnavailableEndpoints exists to support v0 MetadataResponses
Expand Down Expand Up @@ -203,6 +202,11 @@ class MetadataCache(brokerId: Int) extends Logging {
aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
aliveNodes(broker.id) = nodes.asScala
}
aliveNodes.get(brokerId).foreach { listenerMap =>
val listeners = listenerMap.keySet
if (!aliveNodes.values.forall(_.keySet == listeners))
error(s"Listeners are not identical across brokers: $aliveNodes")
}

val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
Expand Down
Loading

0 comments on commit 15bc405

Please sign in to comment.