-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Streams overrides bugfix #8514
fix: Streams overrides bugfix #8514
Conversation
…fixed in Kafka client configuration
Thanks for the PR @piotrsmolinski -- It's unclear to me what the bug it though. Can you shortly describe the issue/misbehavior that this PR addresses? |
Hi Matthias, The problem is that the value ssl.keystore.location (and all similar) is used to configure two features. Normally it is used to define the server certificate for https, but in case when the Kafka cluster uses mTLS, the same properties are used to configure Kafka connection certificates. Frequently these certificates are same, but... We have a cluster where separate certificate is used for ksqlDB server (identifies the https endpoint) and for Kafka cluster (with CN provides connecting user identity). Normally we could use ksql.streams. prefix to override subset of properties that are applied for Kafka settings only. The bug is that the effective configuration is built from a list of key/value pairs where the key can be reduced by dropping the known prefix. This is dependent on in which order the input key/values are passed. If ssl.keystore.location comes first followed by ksql.streams.ssl.keystore.location, then Kafka clients and Kafka Streams will use the latter. If ksql.streams.ssl.keystore.location comes in the loop first, then its value is ignored. The order is dependent on the actual state of the HashMap, i.e. depending on how the values were inserted, the prefixed value can come first or not. The test case shows ssl.truststore.location behaviour, because the limited content configuration with ssl.keystore.location rendered the loop with ssl.keystore.location before ksql.streams.ssl.keystore.location, which in turn gave the desired output (contrary to the actual in-situ configuration). The fix forces the prefixed values to override non-prefixed if they are provided, therefore yielding the deterministic configuration. Piotr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. Makes a lot of sense. I will update the commit message accordingly when merging.
One minor question on the PR.
for (Map.Entry<String, ?> entry : originals.entrySet()) { | ||
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) { | ||
final String keyWithNoPrefix = entry.getKey().substring(prefix.length()); | ||
result.put(keyWithNoPrefix, entry.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should we also result.delete(entry.getKey())
to avoid a double entry? In the end, Kafka Streams won't understand the prefixed key, and IIRC would log a warning about an unknown config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if instead calling delete, you avoid populating the hashmap during initialization, and just add all of them in the for loop, but removing the prefix for those with the prefix found?. It'll save time on adding on initialization, then deleting on the loop.
i.e.
# initialize the initial capacity to save time on resizing internally
final Map<String, Object> result = new HashMap<>(originals.size());
for (Map.Entry<String, ?> entry : originals.entrySet()) {
if (prefixed) {
... add with prefix removed
} else {
... add not prefixed value
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind. I understand the code now. Originals are added first, and then not prefixed values are overriden by a prefixed value.
Regarding the question if calling delete
to avoid duplication. If the duplicated value will be ignored later anyway, then we can save time by not calling delete here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it would make much of a difference from a perf point of view if we delete them or not. -- But if we can avoid unnecessary WARN logs, it might be better to delete them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It absolutely makes sense. I was even considering it in the beginning. I was wondering whether we should retain or not the prefixed values.
This logic anyway should be in the base config classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic anyway should be in the base config classes.
What logic are you referring, too? To drop unknown configs? It would not be correct to do it in the base config class, because we have an architecture of nesting -- thus, it must be possible to add any custom config and it's expected that it's just forwarded. (We still log a WARN for this case to guard against typos.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the logic to get the originals with prefix overrides.
We have already the logic for values:
https://kafka.apache.org/30/javadoc/org/apache/kafka/common/config/AbstractConfig.html#originalsWithPrefix(java.lang.String,boolean)
This, however, includes filtering of the entries to the available keys only. The code I added is a counterpart where the configuration keys cannot be determined at the query time.
@piotrsmolinski The change looks good. I see that with or without prefix, KsqlConfig will always return stream configs without the prefix, but as you pointed out, depending on the order of the values, ksql will return the latest value added to the map. With your change, the prefixed value takes precedence now. |
As per Matthias comment, if a prefixed value is provided, the entry should not show up twice as prefixed and with stripped-prefix.
Merged. Thanks for the PR! |
Description
The bugfix introduces a helper method originalsWithPrefixOverride similar to existing valuesWithPrefixOverride. Contrary to the latter it does not limit the result to defined configuration entries. The new method is used instead of originals to provide the values for Kafka configuration in KsqlConfig.
Testing done
Unit test added to KsqlConfigTest failing in unmodified class.