Skip to content

Commit

Permalink
Fix Kafka consumer and producer config
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 30, 2021
1 parent 7c9529a commit 8ce9ce9
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

/**
* This class is the default implementation of {@link KafkaConfig}.
Expand All @@ -29,20 +28,13 @@
public final class DefaultKafkaConfig implements KafkaConfig {

private static final String KAFKA_PATH = "kafka";
private static final String CONSUMER_PATH = "consumer";
private static final String PRODUCER_PATH = "producer";

private final KafkaConsumerConfig consumerConfig;
private final KafkaProducerConfig producerConfig;

private DefaultKafkaConfig(final ScopedConfig kafkaScopedConfig) {
consumerConfig = KafkaConsumerConfig.of(kafkaScopedConfig.hasPath(CONSUMER_PATH)
? kafkaScopedConfig.getConfig(CONSUMER_PATH)
: ConfigFactory.empty());

producerConfig = KafkaProducerConfig.of(kafkaScopedConfig.hasPath(PRODUCER_PATH)
? kafkaScopedConfig.getConfig(PRODUCER_PATH)
: ConfigFactory.empty());
consumerConfig = KafkaConsumerConfig.of(kafkaScopedConfig);
producerConfig = KafkaProducerConfig.of(kafkaScopedConfig);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@
@Immutable
public final class DefaultKafkaConsumerConfig implements KafkaConsumerConfig {

private static final String CONFIG_PATH = "consumer";
private static final String ALPAKKA_PATH = "alpakka";

private final ThrottlingConfig throttlingConfig;
private final Config alpakkaConfig;

private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
throttlingConfig = ThrottlingConfig.of(kafkaConsumerScopedConfig.hasPath(ThrottlingConfig.CONFIG_PATH)
? kafkaConsumerScopedConfig.getConfig(ThrottlingConfig.CONFIG_PATH)
: ConfigFactory.empty());
alpakkaConfig = kafkaConsumerScopedConfig.getConfig(ALPAKKA_PATH);
throttlingConfig = ThrottlingConfig.of(kafkaConsumerScopedConfig);
alpakkaConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, ALPAKKA_PATH);
}

/**
Expand All @@ -47,7 +46,11 @@ private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultKafkaConsumerConfig of(final Config config) {
return new DefaultKafkaConsumerConfig(config);
return new DefaultKafkaConsumerConfig(getConfigOrEmpty(config, CONFIG_PATH));
}

private static Config getConfigOrEmpty(final Config config, final String configKey) {
return config.hasPath(configKey) ? config.getConfig(configKey) : ConfigFactory.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.ConfigWithFallback;

import com.typesafe.config.Config;

/**
Expand All @@ -25,11 +27,7 @@
@Immutable
public final class DefaultKafkaProducerConfig implements KafkaProducerConfig {

private static final String QUEUE_SIZE_PATH = "queue-size";
private static final String PARALLELISM_PATH = "parallelism";
private static final String MIN_BACKOFF_PATH = "min-backoff";
private static final String MAX_BACKOFF_PATH = "max-backoff";
private static final String RANDOM_FACTOR_PATH = "random-factor";
private static final String CONFIG_PATH = "producer";
private static final String ALPAKKA_PATH = "alpakka";

private final int queueSize;
Expand All @@ -40,11 +38,11 @@ public final class DefaultKafkaProducerConfig implements KafkaProducerConfig {
private final Config alpakkaConfig;

private DefaultKafkaProducerConfig(final Config kafkaProducerScopedConfig) {
queueSize = kafkaProducerScopedConfig.getInt(QUEUE_SIZE_PATH);
parallelism = kafkaProducerScopedConfig.getInt(PARALLELISM_PATH);
minBackoff = kafkaProducerScopedConfig.getDuration(MIN_BACKOFF_PATH);
maxBackoff = kafkaProducerScopedConfig.getDuration(MAX_BACKOFF_PATH);
randomFactor = kafkaProducerScopedConfig.getDouble(RANDOM_FACTOR_PATH);
queueSize = kafkaProducerScopedConfig.getInt(ConfigValue.QUEUE_SIZE.getConfigPath());
parallelism = kafkaProducerScopedConfig.getInt(ConfigValue.PARALLELISM.getConfigPath());
minBackoff = kafkaProducerScopedConfig.getDuration(ConfigValue.MIN_BACKOFF.getConfigPath());
maxBackoff = kafkaProducerScopedConfig.getDuration(ConfigValue.MAX_BACKOFF.getConfigPath());
randomFactor = kafkaProducerScopedConfig.getDouble(ConfigValue.RANDOM_FACTOR.getConfigPath());
alpakkaConfig = kafkaProducerScopedConfig.getConfig(ALPAKKA_PATH);
}

Expand All @@ -56,7 +54,9 @@ private DefaultKafkaProducerConfig(final Config kafkaProducerScopedConfig) {
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultKafkaProducerConfig of(final Config config) {
return new DefaultKafkaProducerConfig(config);
return new DefaultKafkaProducerConfig(
ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

import com.typesafe.config.Config;

/**
Expand Down Expand Up @@ -67,4 +69,36 @@ static KafkaProducerConfig of(final Config config) {
return DefaultKafkaProducerConfig.of(config);
}

enum ConfigValue implements KnownConfigValue {

QUEUE_SIZE("queue-size", 100),

PARALLELISM("parallelism", 10),

MIN_BACKOFF("min-backoff", Duration.ofSeconds(3)),

MAX_BACKOFF("max-backoff", Duration.ofSeconds(30)),

RANDOM_FACTOR("random-factor", 0.2);

private final String path;
private final Object defaultValue;

ConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}


@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}
}

}

0 comments on commit 8ce9ce9

Please sign in to comment.