Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Move processor response queue into Processor #4542

Merged
merged 2 commits into from Feb 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 14 additions & 41 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Expand Up @@ -242,36 +242,26 @@ object RequestChannel extends Logging {

class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
val metrics = new RequestChannel.Metrics
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val responseQueues = new ConcurrentHashMap[Int, BlockingQueue[RequestChannel.Response]]()
private val processors = new ConcurrentHashMap[Int, Processor]()

newGauge(
"RequestQueueSize",
new Gauge[Int] {
newGauge("RequestQueueSize", new Gauge[Int] {
def value = requestQueue.size
}
)
})

newGauge("ResponseQueueSize", new Gauge[Int]{
def value = responseQueues.values.asScala.foldLeft(0) {(total, q) => total + q.size()}
def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize
}
})

def addProcessor(processorId: Int): Unit = {
val responseQueue = new LinkedBlockingQueue[RequestChannel.Response]()
if (responseQueues.putIfAbsent(processorId, responseQueue) != null)
warn(s"Unexpected processor with processorId $processorId")
newGauge("ResponseQueueSize",
new Gauge[Int] {
def value = responseQueue.size()
},
Map("processor" -> processorId.toString)
)
def addProcessor(processor: Processor): Unit = {
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")
}

def removeProcessor(processorId: Int): Unit = {
removeMetric("ResponseQueueSize", Map("processor" -> processorId.toString))
responseQueues.remove(processorId)
processors.remove(processorId)
}

/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
Expand All @@ -294,13 +284,11 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
trace(message)
}

val responseQueue = responseQueues.get(response.processor)
// `responseQueue` may be null if the processor was shutdown. In this case, the connections
val processor = processors.get(response.processor)
// The processor may be null if it was shutdown. In this case, the connections
// are closed, so the response is dropped.
if (responseQueue != null) {
responseQueue.put(response)
for (onResponse <- responseListeners)
onResponse(response.processor)
if (processor != null) {
processor.enqueueResponse(response)
}
}

Expand All @@ -312,21 +300,6 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
def receiveRequest(): RequestChannel.BaseRequest =
requestQueue.take()

/** Get a response for the given processor if there is one */
def receiveResponse(processor: Int): RequestChannel.Response = {
val responseQueue = responseQueues.get(processor)
if (responseQueue == null)
throw new IllegalStateException(s"receiveResponse with invalid processor $processor: processors=${responseQueues.keySet}")
val response = responseQueue.poll()
if (response != null)
response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
response
}

def addResponseListener(onResponse: Int => Unit) {
responseListeners ::= onResponse
}

def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
errors.foreach { case (error, count) =>
metrics(apiKey.name).markErrorMeter(error, count)
Expand Down
53 changes: 37 additions & 16 deletions core/src/main/scala/kafka/network/SocketServer.scala
Expand Up @@ -119,15 +119,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId

val numProcessorThreads = config.numNetworkThreads
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()

for (i <- 0 until newProcessorsPerListener) {
listenerProcessors += newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
requestChannel.addProcessor(nextProcessorId)
val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
listenerProcessors += processor
requestChannel.addProcessor(processor)
nextProcessorId += 1
}
listenerProcessors.foreach(p => processors.put(p.id, p))
Expand All @@ -140,21 +140,14 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}
}

// register the processor threads for notification of responses
requestChannel.addResponseListener(id => {
val processor = processors.get(id)
if (processor != null)
processor.wakeup()
})

/**
* Stop processing requests and new connections.
*/
def stopProcessingRequests() = {
info("Stopping socket server request processors")
this.synchronized {
acceptors.asScala.values.foreach(_.shutdown)
processors.asScala.values.foreach(_.shutdown)
acceptors.asScala.values.foreach(_.shutdown())
processors.asScala.values.foreach(_.shutdown())
requestChannel.clear()
stoppedProcessingRequests = true
}
Expand Down Expand Up @@ -475,11 +468,20 @@ private[kafka] class Processor(val id: Int,

private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

private[kafka] val metricTags = mutable.LinkedHashMap(
"listener" -> listenerName.value,
"networkProcessor" -> id.toString
).asJava

newGauge("ResponseQueueSize",
Copy link
Contributor

@ijuma ijuma Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this change the full metric name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. I forgot about the type attribute. @rajinisivaram Maybe you can fix it in #4539?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I had forgotten too. Yes, I will update in #4539

new Gauge[Int] {
def value = responseQueue.size()
},
Map("processor" -> id.toString)
)

newGauge("IdlePercent",
new Gauge[Double] {
def value = {
Expand Down Expand Up @@ -562,15 +564,15 @@ private[kafka] class Processor(val id: Int,

private def processChannelException(channelId: String, errorMessage: String, throwable: Throwable) {
if (openOrClosingChannel(channelId).isDefined) {
error(s"Closing socket for ${channelId} because of error", throwable)
error(s"Closing socket for $channelId because of error", throwable)
close(channelId)
}
processException(errorMessage, throwable)
}

private def processNewResponses() {
var curr: RequestChannel.Response = null
while ({curr = requestChannel.receiveResponse(id); curr != null}) {
while ({curr = dequeueResponse(); curr != null}) {
val channelId = curr.request.context.connectionId
try {
curr.responseAction match {
Expand Down Expand Up @@ -753,6 +755,20 @@ private[kafka] class Processor(val id: Int,
connId
}

private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
responseQueue.put(response)
wakeup()
}

private def dequeueResponse(): RequestChannel.Response = {
val response = responseQueue.poll()
if (response != null)
response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
response
}

private[network] def responseQueueSize = responseQueue.size

// Only for testing
private[network] def inflightResponseCount: Int = inflightResponses.size

Expand All @@ -772,8 +788,13 @@ private[kafka] class Processor(val id: Int,
/**
* Wakeup the thread for selection.
*/
@Override
def wakeup = selector.wakeup()
override def wakeup() = selector.wakeup()

override def shutdown(): Unit = {
super.shutdown()
removeMetric("ResponseQueueSize", Map("processor" -> id.toString))
removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
}

}

Expand Down