-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5735: KIP-190: Handle client-ids consistently #3906
Conversation
@ijuma @rajinisivaram Can you take a look ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison @edoardocomar Thank you for the PR. I have left some minor comments.
@@ -298,7 +299,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso | |||
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true); | |||
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, | |||
MetricsReporter.class); | |||
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); | |||
Map<String, String> metricTags = Collections.singletonMap("client-id", Sanitizer.sanitize(clientId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there are two calls to sanitize
here, perhaps worth doing once and reusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here to avoid the 2nd call, we'd have to pass sanitizedClientId as an argument to the private constructor. Is it what you prefer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, yes, I prefer one call, but I leave it up to you.
@@ -657,7 +658,7 @@ private KafkaConsumer(ConsumerConfig config, | |||
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); | |||
this.time = Time.SYSTEM; | |||
|
|||
Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId); | |||
Map<String, String> metricsTags = Collections.singletonMap("client-id", Sanitizer.sanitize(this.clientId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as before, do once and reuse?
this.ioThread = new KafkaThread(ioThreadName, this.sender, true); | ||
this.ioThread.start(); | ||
this.errors = this.metrics.sensor("errors"); | ||
config.logUnused(); | ||
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); | ||
AppInfoParser.registerAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As before, do once and reuse?
|
||
public class Sanitizer { | ||
|
||
public static String sanitize(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a class comment?
for (int i = 0; i < encoded.length(); i++) { | ||
char c = encoded.charAt(i); | ||
if (c == '*') { | ||
builder.append("%2A"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? there was a comment in the original code
if (c == '*') { | ||
builder.append("%2A"); | ||
} else if (c == '+') { | ||
builder.append("%20"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again comment removed from original code
@@ -234,7 +234,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { | |||
quotaType.toString, | |||
"Tracking throttle-time per user/client-id", | |||
"user", quotaId.sanitizedUser.getOrElse(""), | |||
"client-id", quotaId.clientId.getOrElse("")) | |||
"client-id", quotaId.sanitizedClientId.getOrElse("")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change one of the client-ids in this test to contain special characters?
} | ||
|
||
private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) { | ||
AdminUtils.changeUserOrUserClientIdConfig(zkUtils, QuotaId.sanitize(userPrincipal) + "/clients/" + clientId, properties) | ||
AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + clientId, properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the equivalent code in ClientIdQuotaTest
to do sanitize as well for consistency? Also, AdminUtils.changeClientIdConfig
now uses sanitized client-ids, so the parameters and javadoc need updating.
@@ -387,14 +360,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, | |||
* First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor | |||
*/ | |||
def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = { | |||
val clientQuotaEntity = quotaEntity(sanitizedUser, clientId) | |||
val sanitizedClientId = Sanitizer.sanitize(clientId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we cache sanitized client-ids somewhere so that we dont have to sanitize for every request? Obviously they could change, but perhaps check and re-sanitize if required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also move sanitizedUser
from Session
to KafkaChannel
perhaps since Session
objects are now created for every request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rajinisivaram I think the optimization of moving the sanitisedUser should be done in a separate PR, maybe for a point release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently once a connection is established , Sanitize is called once for the user and - with this PR - once per client-id.
So I'd say about the suggested optimization of caching the sanitize(client-id) in CilentQuotaManager.getOrCreate... that as well should go in a separate PR.
At that time we could measure if caching is worth doing !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edoardocomar Hmm.. We used to sanitize user principal once per connection, but there was a recent PR (not this one) which changed it to once per request. That should be easy to fix though, but a separate PR is fine. For client-id caching, there is a bit more work involved since client-ids can change within a connection. Again I am ok with doing that later.
@rajinisivaram how do you think about the status of @mimaison's patch? And is the KIP itself low risk for introducing regression? The feature freeze date has passed, so if you are not confident to merge it in the next day or two, I'll have to push it to 1.1.0 release. |
This fixes an important bug with potential security implications, so we should get this in. It's close and low risk. @mimaison, will you be able to address the comments today? |
Yes I should be able to update the PR in the afternoon. |
Should I add a line in the upgrade notes ? Do we want to mention the quotas issues ? |
Yes, please add a line in the upgrade notes. |
I believe we addressed all comments so far. The few larger optimizations suggested will be done later in another PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, merging to trunk
Developed with @edoardocomar