Skip to content

Commit

Permalink
remove streams clients metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelli321 committed Aug 14, 2023
1 parent 2c849dc commit cfa4457
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
Expand Up @@ -74,7 +74,7 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_DEPLOYMENT_TYPE_CONFIG =
KSQL_CONFIG_PROPERTY_PREFIX + "deployment.type";

public static enum DeploymentType {
public enum DeploymentType {
selfManaged,
confluent
}
Expand Down Expand Up @@ -886,7 +886,7 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
.define(
KSQL_DEPLOYMENT_TYPE_CONFIG,
ConfigDef.Type.STRING,
DeploymentType.confluent.name(),
DeploymentType.selfManaged.name(),
ConfigDef.LambdaValidator.with(
(name, value) -> parseDeploymentType(value),
() -> Arrays.asList(DeploymentType.values()).toString()
Expand Down Expand Up @@ -1586,7 +1586,7 @@ public Map<String, Object> originalsWithPrefixOverride(final String prefix) {
return result;
}

private boolean isKeyPrefixed(final String key, final String prefix) {
private static boolean isKeyPrefixed(final String key, final String prefix) {
Objects.requireNonNull(key);
Objects.requireNonNull(prefix);
return key.startsWith(prefix) && key.length() > prefix.length();
Expand Down Expand Up @@ -1644,6 +1644,12 @@ private static Map<String, ConfigValue> buildStreamingConfig(
final Map<String, ConfigValue> streamConfigProps = new HashMap<>();
applyStreamsConfig(baseStreamConfig, streamConfigProps);
applyStreamsConfig(overrides, streamConfigProps);

// Streams client metrics aren't used in Confluent deployment
if (streamConfigProps.get(KSQL_DEPLOYMENT_TYPE_CONFIG).value.equals(DeploymentType.confluent.name())) {
streamConfigProps.entrySet().stream().filter(e -> isKeyPrefixed(e.getKey(), TELEMETRY_PREFIX)).forEach(streamConfigProps::remove);
}

return ImmutableMap.copyOf(streamConfigProps);
}

Expand Down
Expand Up @@ -531,6 +531,7 @@ public static Map<String, Object> buildStreamsProperties(
) {
Map<String, Object> newStreamsProperties = new HashMap<>(config.getKsqlStreamConfigProps(applicationId));

// Streams client metrics aren't used in Confluent deployment
if (!KsqlConfig.DeploymentType.confluent.toString().equals(config.getString(KSQL_DEPLOYMENT_TYPE_CONFIG))) {
// Reassign here to make unmodifiable map modifiable
newStreamsProperties = new HashMap<>(newStreamsProperties);
Expand Down

0 comments on commit cfa4457

Please sign in to comment.