-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8855; Collect and Expose Client's Name and Version in the Brokers (KIP-511 Part 2) #7398
KAFKA-8855; Collect and Expose Client's Name and Version in the Brokers (KIP-511 Part 2) #7398
Conversation
fb03f4f
to
34b831b
Compare
var connection = connectionRegistry.get(connectionId) | ||
if (connection == null) { | ||
connection = connectionRegistry.register(connectionId, header.clientId(), channel.clientSoftwareName(), | ||
channel.clientSoftwareVersion(), listenerName, securityProtocol, channel.socketAddress(), channel.principal()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👀 This is the critical part of the PR.
34b831b
to
44aab56
Compare
732bc9c
to
a9a0439
Compare
retest this please |
1 similar comment
retest this please |
Failed test: JDK 8 and Scala 2.11
It does not seem related to this PR. |
clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
Outdated
Show resolved
Hide resolved
* should rarely happen in practice because the metadata is only updated by the ApiVersionsRequest | ||
* and there are normally no concurrent requests within the connection. | ||
*/ | ||
public ConnectionMetadata get(String connectionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong. Unsynchronized access to a map could cause more than just "stale or inconsistent" data. It could cause null pointer exceptions or other issues. We cannot access this without synchronization.
|
||
/** | ||
* Maintains metadata about each active connections and exposed various metrics about the connections | ||
* and the clients. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary to keep a central registry of all this information for all connections. We really just need the metrics, most of which can just be simple counters. If we need more information about a connection, we can look at the request context of that connection. But it doesn't have to be stored here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me explain why I went down this path.
-
As you said, the request context can be used to get the information about the connection. The issue is that that information must come from some place. It could come directly from the KafkaChannel if SASL is used and an ApiVersionsRequest with the information is received during the SASL initialisation. Unfortunately, some clients, including the java one, does not provide the information during the SASL initialisation but in a second ApiVersionsRequest which is handled in the KafkaApis layer this time. This means that a place is required to store and update the name and the version.
-
KIP-511 has proposed to add a new
Gauge<List<Map<String, String>>
kafka.server:type=ClientMetrics,name=Connections
which lists all the active connections with their metadata (client id, software name, software version, etc.). This requires a list of all the active connections to be maintained somewhere to back the gauge. We could argue that this could be replaced by counters for each combination. I preferred the Gauge to limit the number of metrics.
I have considered the following alternative to the registry but did not choose it. You may prefer it.
-
As the Selector already maintains a list of the active KafkaChannels (equivalent to connections), we could add all the require metadata as attributes in the KafkaChannel and use the selector as source of truth. This would require to add the clientId in channel for instance.
-
To avoid having the pass the KafkaChannel up to the KafkaApis layer to update the name and the version when the ApiVersionsRequest is received, we could partially or completely move the handling of the AVR in the processor to update the KafkaChannel. The later would impact the throttling, the metrics and the request logs.
-
As many selectors are run (one per processor if not mistaken), we could have counters per selector to limit the locking required to update them. For the Gauge which lists all the connections, we could also have one per Selector but it is less convenient so for this one, I would propose to have a way to list all the KafkaChannel across all the selectors.
The major advantage of this approach is that the "per selector/processor" does not require any locking.
The downsides:
- Moving the handling of the AVR in the processor has wide impact on the throttling, the metrics of the AVR and the request log which is based on the request send to the KafkaApis layer.
- It makes exposing the metadata via other API little harder. For instance, one could think of adding an Admin API to list all the active connections of a broker (already got such request from our colleagues).
I was not comfortable with moving the handling of the AVR in the processor when I looked at the options so I went with the current approach.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation, and thanks for fixing the synchronization. As an optimization, I think we should use something like ConcurrentMap
here rather than synchronized blocks, so that we can minimize the amount of time threads spend waiting. This will be a bit more tricky to use, but more scalable.
I guess when I thought about KIP-511, I thought of it in terms of metrics and perhaps occasional samples of connections (similar to how we do request sampling by logging a few selected requests). I don't see why we'd ever need a full snapshot of all existing connections. The snapshot would probably be out of date by the time it had been returned, since new connections are closed and opened all the time.
Reading the KIP more carefully, I see that KIP-511 does specify a metric which essentially requires each connection to register itself. Considering we don't even have a way to visualize or graph this metric, I'm not sure this belongs in JMX. I have to think about this more...
I've noticed, as a client, the client must pessimistically assume that the broker it is talking can only handle a max ApiVersions request version of 2. Otherwise, with flexible versions, if the client assumes the latest version, the request will be written compact and an old broker will not understand the request and will close the connection. Compact requests can only be written if the client knows the broker is at least 2.4.0. This seems to break the spirit of ApiVersions, which historically has been able to be handled even if the broker does not know the version. When requesting with flexible ApiVersions v3 and a short client name / version:
|
My question is: should the client behavior going forward be to
If yes, I think the KIP should be updated to describe this new required behavior. If no, then I do not think ApiVersions request v3 should be flexible, as well as versions going forward. If not that, then I am not sure about this flexible dilemma. |
Ah, that's yet another reasonable fix! That makes sense. Sorry I missed that; I followed the links in KAFKA-8855! |
I have opened a new PR with uses a different approach: #7749 Closing this one. |
This PR implements the second part of KIP-511:
Committer Checklist (excluded from commit message)