Skip to content

Commit

Permalink
eric comments
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelli321 committed Aug 11, 2023
1 parent 65d10a9 commit 2c849dc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 19 deletions.
Expand Up @@ -75,8 +75,8 @@ public class KsqlConfig extends AbstractConfig {
KSQL_CONFIG_PROPERTY_PREFIX + "deployment.type";

public static enum DeploymentType {
onprem,
cloud
selfManaged,
confluent
}

public static final String KSQL_DEPLOYMENT_TYPE_DOC =
Expand Down Expand Up @@ -886,7 +886,7 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
.define(
KSQL_DEPLOYMENT_TYPE_CONFIG,
ConfigDef.Type.STRING,
"on-prem",
DeploymentType.confluent.name(),
ConfigDef.LambdaValidator.with(
(name, value) -> parseDeploymentType(value),
() -> Arrays.asList(DeploymentType.values()).toString()
Expand Down Expand Up @@ -1718,18 +1718,14 @@ private KsqlConfig(final ConfigGeneration generation,
this.ksqlStreamConfigProps = ksqlStreamConfigProps;
}

public Map<String, Object> getKsqlStreamConfigProps(final String applicationId, boolean addConfluentMetricsContextConfigsKafka) {
public Map<String, Object> getKsqlStreamConfigProps(final String applicationId) {
final Map<String, Object> map = new HashMap<>(getKsqlStreamConfigProps());
map.put(
MetricCollectors.RESOURCE_LABEL_PREFIX
+ StreamsConfig.APPLICATION_ID_CONFIG,
applicationId
);

if (addConfluentMetricsContextConfigsKafka) {
map.putAll(addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
}

return Collections.unmodifiableMap(map);
}

Expand Down
Expand Up @@ -529,13 +529,12 @@ public static Map<String, Object> buildStreamsProperties(
final KsqlConfig config,
final ProcessingLogContext processingLogContext
) {
final Map<String, Object> newStreamsProperties;
Map<String, Object> newStreamsProperties = new HashMap<>(config.getKsqlStreamConfigProps(applicationId));

if (Objects.equals(config.getString(KSQL_DEPLOYMENT_TYPE_CONFIG),
KsqlConfig.DeploymentType.cloud.toString())) {
newStreamsProperties = new HashMap<>(config.getKsqlStreamConfigProps(applicationId, false));
} else {
newStreamsProperties = new HashMap<>(config.getKsqlStreamConfigProps(applicationId, true));
if (!KsqlConfig.DeploymentType.confluent.toString().equals(config.getString(KSQL_DEPLOYMENT_TYPE_CONFIG))) {
// Reassign here to make unmodifiable map modifiable
newStreamsProperties = new HashMap<>(newStreamsProperties);
newStreamsProperties.putAll(config.addConfluentMetricsContextConfigsKafka(Collections.emptyMap()));
}

newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
Expand Down
Expand Up @@ -224,7 +224,7 @@ public void setup() {
when(ksqlMaterializationFactory.create(any(), any(), any(), any())).thenReturn(materialization);
when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory);
when(processingLoggerFactory.getLogger(any(), anyMap())).thenReturn(processingLogger);
when(ksqlConfig.getKsqlStreamConfigProps(anyString(), anyBoolean())).thenReturn(Collections.emptyMap());
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(Collections.emptyMap());
when(ksqlConfig.getString(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)).thenReturn("");
when(ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG))
.thenReturn(PERSISTENT_PREFIX);
Expand Down Expand Up @@ -664,7 +664,7 @@ private void shouldUseProvidedOptimizationConfig(final Object value) {
// Given:
final Map<String, Object> properties =
Collections.singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, value);
when(ksqlConfig.getKsqlStreamConfigProps(anyString(), anyBoolean())).thenReturn(properties);
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(properties);

// When:
final PersistentQueryMetadata queryMetadata = buildPersistentQuery(
Expand Down Expand Up @@ -716,7 +716,7 @@ private void assertPropertiesContainDummyInterceptors() {
@Test
public void shouldAddMetricsInterceptorsToExistingList() {
// Given:
when(ksqlConfig.getKsqlStreamConfigProps(anyString(), anyBoolean())).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
ImmutableList.of(DummyConsumerInterceptor.class.getName()),
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
Expand All @@ -737,7 +737,7 @@ public void shouldAddMetricsInterceptorsToExistingList() {
@Test
public void shouldAddMetricsInterceptorsToExistingString() {
// When:
when(ksqlConfig.getKsqlStreamConfigProps(anyString(), anyBoolean())).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyConsumerInterceptor.class.getName(),
StreamsConfig.producerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
Expand All @@ -759,7 +759,7 @@ public void shouldAddMetricsInterceptorsToExistingString() {
@SuppressWarnings("unchecked")
public void shouldAddMetricsInterceptorsToExistingStringList() {
// When:
when(ksqlConfig.getKsqlStreamConfigProps(anyString(), anyBoolean())).thenReturn(ImmutableMap.of(
when(ksqlConfig.getKsqlStreamConfigProps(anyString())).thenReturn(ImmutableMap.of(
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
DummyConsumerInterceptor.class.getName()
+ ","
Expand Down

0 comments on commit 2c849dc

Please sign in to comment.