New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4453: Separating controller connections and requests from the data plane (KIP-291) #5783
Conversation
…ata plane (KIP-291)
retest this please |
@junrao @jjkoshy @MayureshGharat Can you please take a look at this PR? Thanks |
@gitlw Thanks for the patch. On it now. |
val requestChannel = new RequestChannel(maxQueuedRequests) | ||
private val processors = new ConcurrentHashMap[Int, Processor]() | ||
val dataRequestChannel = new RequestChannel(maxQueuedRequests, RequestChannel.RequestQueueSizeMetric) | ||
var controlPlaneRequestChannel: RequestChannel = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var controlPlaneRequestChannel: RequestChannel = null | |
val controlPlaneRequestChannel: RequestChannel = if (config.controlPlaneListenerName.isDefined) { | |
controlPlaneRequestChannel = new RequestChannel(20, RequestChannel.ControlPlaneRequestQueueSizeMetric) | |
} else { | |
null | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually how about :
var controlPlaneRequestChannel: RequestChannel = null | |
private var controlPlaneRequestChannelOpt: Option[RequestChannel] = None |
We can then create the controlPlaneRequestChannel lazily when we call startup
} | ||
private val dataProcessors = new ConcurrentHashMap[Int, Processor]() | ||
// there should be only one controller processor, however we use a map to store it so that we can reuse the logic for data processors | ||
private[network] val controlPlaneProcessors = new ConcurrentHashMap[Int, Processor]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having a map, which might indicate that we might have multiple processors for the controlPlane, do you think we can do :
private[network] val controlPlaneProcessors = new ConcurrentHashMap[Int, Processor]() | |
private[network] var controlPlaneProcessorOpt : Option[Processor] = None |
We can lazily create controlPlaneProcessor when we call startup()
@@ -88,25 +94,35 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time | |||
def startup(startupProcessors: Boolean = true) { | |||
this.synchronized { | |||
connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) | |||
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) | |||
createAcceptorAndProcessors(config.numNetworkThreads, config.dataListeners, dataRequestChannel, dataProcessors, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was thinking if we should have 2 explicit functions :
-
createDataPlaneAcceptorsAndProcessors(processorsPerListener : Int, endpoints: Seq[EndPoint])
-
createControlPlaneAcceptorAndProcessor (controlEndPointOpt : Option[EndPoint])
We can probably do something like :
createAcceptorAndProcessors(config.numNetworkThreads, config.dataListeners, dataRequestChannel, dataProcessors, false) | |
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataListeners) | |
createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) |
The two functions can look something like this :
private def createDataPlaneAcceptorsAndProcessors(processorsPerListener: Int,
endpoints: Seq[EndPoint]) : Unit = synchronized {
endpoints.foreach { endpoint =>
val acceptor = createAcceptorWithProcessors()
addDataProcessors(acceptor, endpoint, processorsPerListener, dataRequestChannel, dataProcessors)
KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
acceptors.put(endpoint, acceptor)
}
}
private def createControlPlaneAcceptorAndProcessor (controlEndPointOpt: Option[EndPoint]) : Unit = synchronized {
if (controlEndPointOpt.isDefined) {
val controlPlaneEndPoint = controlEndPointOpt.get
controlPlaneRequestChannelOpt = Some(new RequestChannel(20, RequestChannel.ControlPlaneRequestQueueSizeMetric))
val controlPlaneAcceptor = createAcceptor(controlPlaneEndPoint)
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, controlPlaneEndPoint.listenerName, controlPlaneEndPoint.securityProtocol, memoryPool, isControlPlane = true)
controlPlaneRequestChannelOpt.get.addProcessor(controlPlaneProcessor)
val listenerProcessors = new ArrayBufferProcessor
listenerProcessors += controlPlaneProceesor
controlPlaneAcceptor.addProcessors(listenerProcessors)
KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-$controlPlaneListenerName-$controlPlaneSecurityProtocol-${controlPlaneEndPoint.port}", controlPlaneAcceptor).start()
controlPlaneAcceptor.awaitStartup()
acceptors.put(controlPlaneEndPoint, controlPlaneAcceptor)
}
}
private def createAcceptor(endpoint: EndPoint): Acceptor = synchronized {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
}
private def addDataProcessors(acceptor: Acceptor, endpoint: EndPoint,
newProcessorsPerListener: Int): Unit = synchronized {
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBufferProcessor
for (_ <- 0 until newProcessorsPerListener) {
val processor = newProcessor(nextProcessorId, dataRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool, isControlPlane = false)
listenerProcessors += processor
requestChannel.addProcessor(processor)
nextProcessorId += 1
}
listenerProcessors.foreach(p => dataProcessors.put(p.id, p))
acceptor.addProcessors(listenerProcessors)
}
processors.asScala.values.foreach(_.shutdown()) | ||
requestChannel.clear() | ||
dataProcessors.asScala.values.foreach(_.shutdown()) | ||
controlPlaneProcessors.asScala.values.foreach(_.shutdown()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the above suggestion, this would be :
controlPlaneProcessors.asScala.values.foreach(_.shutdown()) | |
controlPlaneProcessorOpt.map(_.shutdown()) |
acceptors.asScala.foreach { case (endpoint, acceptor) => | ||
addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads) | ||
dataAcceptors.foreach { case (endpoint, acceptor) => | ||
addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads, dataRequestChannel, dataProcessors, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the above suggestion, this would be :
addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads, dataRequestChannel, dataProcessors, false) | |
addDataProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads) |
controlPlaneProcessors.asScala.values.foreach(_.shutdown()) | ||
dataRequestChannel.clear() | ||
if (controlPlaneRequestChannel != null) | ||
controlPlaneRequestChannel.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would become :
controlPlaneRequestChannel.clear() | |
controlPlaneRequestChannelOpt.map(_.clear()) |
requestChannel.shutdown() | ||
dataRequestChannel.shutdown() | ||
if (controlPlaneRequestChannel != null ) | ||
controlPlaneRequestChannel.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the above suggestion :
controlPlaneRequestChannel.shutdown() | |
controlPlaneRequestChannelOpt.map(_.shutdown()) |
if (dataPlaneListenersAdded.nonEmpty) | ||
createAcceptorAndProcessors(config.numNetworkThreads, dataPlaneListenersAdded, dataRequestChannel, dataProcessors, false) | ||
if (controlPlaneListenersAdded.nonEmpty) | ||
createAcceptorAndProcessors(1, controlPlaneListenersAdded, controlPlaneRequestChannel, controlPlaneProcessors, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a check that a control plane listener is already defined, since there should be only one controlPlaneListener, no?
var authorizer: Option[Authorizer] = None | ||
var socketServer: SocketServer = null | ||
var requestHandlerPool: KafkaRequestHandlerPool = null | ||
var dataRequestHandlerPool: KafkaRequestHandlerPool = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we rename this to "dataPlaneRequestHandlerPool" to be consistent with "controlPlaneRequestHandlerPool" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch @gitlw. Left some initial comments.
@@ -65,8 +65,14 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time | |||
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics") | |||
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) | |||
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE | |||
val requestChannel = new RequestChannel(maxQueuedRequests) | |||
private val processors = new ConcurrentHashMap[Int, Processor]() | |||
val dataRequestChannel = new RequestChannel(maxQueuedRequests, RequestChannel.RequestQueueSizeMetric) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: to be consistent with variable name controlPlaneRequestChannel
, would we name this parameter dataPlaneRequestChannel
?
@@ -40,6 +40,7 @@ object RequestChannel extends Logging { | |||
private val requestLogger = Logger("kafka.request.logger") | |||
|
|||
val RequestQueueSizeMetric = "RequestQueueSize" | |||
val ControlPlaneRequestQueueSizeMetric = "ControlPlaneRequestQueueSize" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: could we also rename variable RequestQueueSizeMetric
to DataPlaneRequestQueueSizeMetric
to be consistent with the variable name ControlPlaneRequestQueueSizeMetric
? This could also reduce confusion for the variable requestQueueSizeMetric
used in class RequestChannel(val queueSize: Int, val requestQueueSizeMetric: String)
.
val requestChannel = new RequestChannel(maxQueuedRequests) | ||
private val processors = new ConcurrentHashMap[Int, Processor]() | ||
val dataRequestChannel = new RequestChannel(maxQueuedRequests, RequestChannel.RequestQueueSizeMetric) | ||
var controlPlaneRequestChannel: RequestChannel = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general in scala we prefer to use val
and use None
to indicate null. Can we do the following:
var controlPlaneRequestChannel: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, RequestChannel.ControlPlaneRequestQueueSizeMetric))
if (config.controlPlaneListenerName.isDefined) { | ||
controlPlaneRequestChannel = new RequestChannel(20, RequestChannel.ControlPlaneRequestQueueSizeMetric) | ||
} | ||
private val dataProcessors = new ConcurrentHashMap[Int, Processor]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: can we name it dataPlaneProcessors
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) | ||
createAcceptorAndProcessors(config.numNetworkThreads, config.dataListeners, dataRequestChannel, dataProcessors, false) | ||
if (config.controlPlaneListener.isDefined) | ||
createAcceptorAndProcessors(1, Seq(config.controlPlaneListener.get), controlPlaneRequestChannel, controlPlaneProcessors, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify the code here to createAcceptorAndProcessors(1, config.controlPlaneListener.toSeq, controlPlaneRequestChannel, controlPlaneProcessors, true)
without checking config.controlPlaneListener.isDefined
? createAcceptorAndProcessors will do nothing if endpoints
is an empty sequence.
stoppedProcessingRequests = true | ||
} | ||
info("Stopped socket server request processors") | ||
} | ||
|
||
def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized { | ||
info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads") | ||
val dataAcceptors = config.controlPlaneListenerName match { | ||
case Some(controlPlaneListenerName) => | ||
acceptors.asScala.filter{ case (endpoint, _) => !endpoint.listenerName.value().equals(controlPlaneListenerName.value())} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general it is preferred to identify type based on e.g. enum, or use separate variables (e.g. dataPlaneAccetor vs. controlPlaneAcceptor), rather than by matching the string value. Could you see if there is a way to improve it.
@@ -40,6 +40,8 @@ class KafkaRequestHandler(id: Int, | |||
apis: KafkaApis, | |||
time: Time) extends Runnable with Logging { | |||
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " | |||
// the requestsHandled counter is defined only for testing | |||
var requestsHandled: Long = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally don't want to add state/variable specifically for unit test, particularly if there is no existing code which does this.
Usually we can do unit test using mockit to verify that something is triggered. Can you see if this is doable? Also, maybe we can just read the newly defined metrics value to verify that the newly defined processor/acceptor has processed some requests.
@@ -528,6 +567,8 @@ private[kafka] class Processor(val id: Int, | |||
override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort-$index" | |||
} | |||
|
|||
// the receivesProcessed counter is defined only for testing | |||
private[network] var receivesProcessed: Long = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We generally don't want to define variable and state just for unit tests. Can you find a way to test the code without adding such variables?
@@ -96,21 +99,23 @@ class KafkaRequestHandlerPool(val brokerId: Int, | |||
val requestChannel: RequestChannel, | |||
val apis: KafkaApis, | |||
time: Time, | |||
numThreads: Int) extends Logging with KafkaMetricsGroup { | |||
numThreads: Int, | |||
requestHandlerAvgIdleMetric: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: requestHandlerAvgIdleMetric
=> requestHandlerAvgIdleMetricName
seems better.
@@ -113,10 +113,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP | |||
|
|||
val brokerState: BrokerState = new BrokerState | |||
|
|||
var apis: KafkaApis = null | |||
var dataPlaneApis: KafkaApis = null | |||
var controlPlaneApis: KafkaApis = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaApis contains the logic to handle request and itself should contain no state (other than those state in its constructor parameters). So it seems weird to have two KafkaApis. Currently KafkaApis only uses requestChannel
with together with RequestChannel.Request
. One reasonable approach is to put the requestChannel
in RequestChannel.Request
similar to the existing RequestChannel.Request.processor
. So that we no longer need two KafkaApis instance. I am not sure this is the best alternative approach though.
Closing this PR as @MayureshGharat is taking over the ownership in #5921 |
Separating controller connections and requests from the data plane (KIP-291)
control.plane.listener.name is set
Committer Checklist (excluded from commit message)