Skip to content

Commit

Permalink
fix: Streams overrides bugfix (#8514)
Browse files Browse the repository at this point in the history
This fix enforces that "ksql.streams." prefixed configs overwrite unprefixed ones in Kafka client configuration.
  • Loading branch information
piotrsmolinski committed Jan 19, 2022
1 parent 3a7ccf9 commit 366af1f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,33 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
}
// CHECKSTYLE_RULES.ON: MethodLength

public Map<String, Object> originalsWithPrefixOverride(final String prefix) {
final Map<String, Object> originals = originals();
final Map<String, Object> result = new HashMap<>();
// first we iterate over the originals and we add only the entries without the prefix
for (Map.Entry<String, ?> entry : originals.entrySet()) {
if (!isKeyPrefixed(entry.getKey(), prefix)) {
result.put(entry.getKey(), entry.getValue());
}
}
// then we add only prefixed entries with dropped prefix
for (Map.Entry<String, ?> entry : originals.entrySet()) {
if (isKeyPrefixed(entry.getKey(), prefix)) {
result.put(entry.getKey().substring(prefix.length()), entry.getValue());
}
}
// two iterations are necessary to avoid a situation where the unprefixed value
// is handled after the prefixed one, because we do not control the order in which
// the entries are presented from the originals map
return result;
}

private boolean isKeyPrefixed(final String key, final String prefix) {
Objects.requireNonNull(key);
Objects.requireNonNull(prefix);
return key.startsWith(prefix) && key.length() > prefix.length();
}

private static final class ConfigValue {
final ConfigItem configItem;
final String key;
Expand Down Expand Up @@ -1477,7 +1504,8 @@ private KsqlConfig(final ConfigGeneration generation, final Map<?, ?> props) {
config.name,
generation == ConfigGeneration.CURRENT
? config.defaultValueCurrent : config.defaultValueLegacy));
this.ksqlStreamConfigProps = buildStreamingConfig(streamsConfigDefaults, originals());
this.ksqlStreamConfigProps = buildStreamingConfig(streamsConfigDefaults,
originalsWithPrefixOverride(KSQL_STREAMS_PREFIX));
}

private static Set<String> streamTopicConfigNames() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ public void shouldSetPrefixedStreamsConfigProperties() {
is(nullValue()));
}

@Test
public void shouldOverrideStreamsConfigProperties() {
Map<String, Object> originals = new HashMap<>();
originals.put(KsqlConfig.KSQL_STREAMS_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"kafka.jks");
originals.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"https.jks");

final KsqlConfig ksqlConfig = new KsqlConfig(originals);

assertThat(ksqlConfig.getKsqlStreamConfigProps().
get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), equalTo("kafka.jks"));

}

@Test
public void shouldSetMonitoringInterceptorConfigProperties() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap(
Expand Down

0 comments on commit 366af1f

Please sign in to comment.