-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5763: Refactor NetworkClient to use LogContext #3761
Conversation
This PR lets logging client id in every log line in NetworkClient
Thanks for the PR. It would be good to receive the LogContext from the calling client (Consumer, AdminClient, Producer). Also, the log messages that include a client id explicitly should be changed. |
@ijuma If we pass LogContext to constructor, then NetworkClient will log with the prefix of the LogContext, which could be |
Yes, the idea is that one retains the context of the client within the networking classes. The log statement will include the actual logging class separately. |
Should we consider pushing the log context into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji, yes, I was thinking the same thing regarding the Selector. A couple more comments inline.
@@ -327,21 +329,22 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso | |||
static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata, Time time) { | |||
Metrics metrics = null; | |||
String clientId = generateClientId(config); | |||
LogContext logContext = new LogContext("[KafkaAdminClient clientId=" + clientId + "] "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should extract a static method createLogContext
to avoid duplicating the logic. Also, AdminClient
(instead of KafkaAdminClient
) is shorter and provides enough context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -160,7 +161,8 @@ public NetworkClient(Selectable selector, | |||
this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, | |||
reconnectBackoffMs, reconnectBackoffMax, | |||
socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, | |||
discoverBrokerVersions, apiVersions, null); | |||
discoverBrokerVersions, apiVersions, null, | |||
new LogContext("[NetworkClient clientId=" + clientId + "] ")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can receive a LogContext
in this constructor as well. This constructor is typically used for inter-broker communication and the callers can provide a useful context too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -365,6 +365,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP | |||
|
|||
def doControlledShutdown(retries: Int): Boolean = { | |||
val metadataUpdater = new ManualMetadataUpdater() | |||
val logContext = new LogContext("[KafkaServer clientId=" + config.brokerId.toString + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use "borkerId=" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought since NetworkClient uses it as clientId, it made sense to use clientId, but you are right it could be misleading
99a77c2
@@ -46,7 +44,7 @@ object TransactionMarkerChannelManager { | |||
txnStateManager: TransactionStateManager, | |||
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker], | |||
time: Time): TransactionMarkerChannelManager = { | |||
|
|||
val logContext = new LogContext("[TransactionMarkerChannelManager clientId=" + config.brokerId + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use "borkerId=" here?
@@ -108,6 +108,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf | |||
val messageQueue = new LinkedBlockingQueue[QueueItem] | |||
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id)) | |||
val brokerNode = broker.getNode(config.interBrokerListenerName) | |||
val logContext = new LogContext("[ControllerChannelManager clientId=" + config.brokerId.toString + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use "borkerId=" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, I think we're close. A few more comments.
@@ -114,26 +114,27 @@ | |||
|
|||
/** | |||
* Create a new nioSelector | |||
* | |||
* @param maxReceiveSize Max size in bytes of a single network receive (use {@link NetworkReceive#UNLIMITED} for no limit) | |||
* @param maxReceiveSize Max size in bytes of a single network receive (use {@link NetworkReceive#UNLIMITED} for no limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was probably unintentional.
@@ -108,6 +108,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf | |||
val messageQueue = new LinkedBlockingQueue[QueueItem] | |||
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id)) | |||
val brokerNode = broker.getNode(config.interBrokerListenerName) | |||
val logContext = new LogContext("[ControllerChannelManager brokerId=" + config.brokerId.toString + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this could be:
s"[ControllerChannelManager controllerId=${config.brokerId}, targetBrokerId=${brokerNode.idString}] "
It's a bit long though, we could potentially shorten ControllerChannelManager
to Controller
and replace controllerId
with id
. @hachikuji, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd vote to shorten both. Seems not much room for confusion.
@@ -125,7 +126,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf | |||
"controller-channel", | |||
Map("broker-id" -> broker.id.toString).asJava, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should take the chance and replace broker.id.toString
with broker.idString
.
@@ -46,7 +44,7 @@ object TransactionMarkerChannelManager { | |||
txnStateManager: TransactionStateManager, | |||
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker], | |||
time: Time): TransactionMarkerChannelManager = { | |||
|
|||
val logContext = new LogContext("[TransactionMarkerChannelManager brokerId=" + config.brokerId + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could pass this to TransactionMarkerChannelManager
so that it can be used to set logIdent
(instead of duplicating the logic). We'd have to add a logPrefix
method to LogContext
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma should I also replace kafka.utils.Logging
usage which uses logIdent
with LogContext
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest not doing that in this PR. We can open a separate JIRA about that to discuss the best way to do it. My suggestion was to use the same log prefix for logIdent
and logContext
and the easiest way to do that is to create the LogContext
first and then pass its value to logIdent
. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it makes total sense to move it to another ticket.
@@ -441,7 +441,8 @@ private[kafka] class Processor(val id: Int, | |||
false, | |||
true, | |||
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache), | |||
memoryPool) | |||
memoryPool, | |||
new LogContext()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In SocketServer
, we have:
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
We could set the logContext
as a field in SocketServer
, use it to set logIdent
and then pass it here. We can probably use [SocketServer brokerId=${config.brokerId}]
as the prefix.
@@ -365,6 +365,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP | |||
|
|||
def doControlledShutdown(retries: Int): Boolean = { | |||
val metadataUpdater = new ManualMetadataUpdater() | |||
val logContext = new LogContext("[KafkaServer brokerId=" + config.brokerId.toString + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, we may want to do this at the same time as logIdent
to keep things consistent.
@@ -49,6 +49,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, | |||
private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs | |||
|
|||
private val networkClient = { | |||
val logContext = new LogContext("[ReplicaFetcherBlockingSend clientId=" + clientId + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would pass this from ReplicaFetcherThread
. We can take the chance and set a reasonable logIdent
there too.
@ijuma I have fixed review comments, could you give it another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, just a few more tweaks and I think we're done.
|
||
new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, time) | ||
val logContext = new LogContext("[Transaction Coordinator " + config.brokerId + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To maintain the idea of key/value pairs in a context, we can maybe change this to: new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
@@ -198,7 +200,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP | |||
/* generate brokerId */ | |||
val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs | |||
config.brokerId = brokerId | |||
this.logIdent = "[Kafka Server " + config.brokerId + "], " | |||
logContext = new LogContext("[Kafka Server " + config.brokerId + "] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, this can be changed to new LogContext("[KafkaServer id=${config.brokerId}] ")
@@ -54,8 +54,10 @@ class ReplicaFetcherThread(name: String, | |||
type REQ = FetchRequest | |||
type PD = PartitionData | |||
|
|||
private val logContext = new LogContext(s"[ReplicaFetcherThread brokerId=${brokerConfig.brokerId}] ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be s"[ReplicaFetcher brokerId=${brokerConfig.brokerId}, sourceBrokerId=${sourceBroker.id}, fetcherId=$fetcherId] ")
and we can then remove mentions of the source broker from individual log statements.
@ijuma i have addressed your comments |
@ijuma ping |
@adyach No need to ping within such a short period of time. We have a lot of things going on in parallel. Can you please review and, if you agree, integrate the following changes into your PR? Thanks! |
Thanks for integrating the change. Looks like there's a minor conflict in SocketServer. Can you resolve that, please? |
@ijuma resolved the conflicts, thank you for the help |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates and for your patience. LGTM. This should make it easier to debug issues when they happen. :) Merging to trunk.
This PR lets logging client id in every log line in NetworkClient