Skip to content

Commit

Permalink
#586 make kafka producer restart backoff configurable, move config fo…
Browse files Browse the repository at this point in the history
…r alpakka into separate config inside of consumer and producer, implemented config model for consumer and producer

Signed-off-by: Johannes Schneider <johannes.schneider@bosch.io>
  • Loading branch information
jokraehe committed Jun 16, 2021
1 parent 90e8a50 commit d927f65
Show file tree
Hide file tree
Showing 20 changed files with 655 additions and 317 deletions.
Expand Up @@ -12,12 +12,10 @@
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;

Expand All @@ -30,122 +28,63 @@
@Immutable
public final class DefaultKafkaConfig implements KafkaConfig {

private static final String CONFIG_PATH = "kafka";
private static final String KAFKA_PATH = "kafka";
private static final String CONSUMER_PATH = "consumer";
private static final String PRODUCER_PATH = "producer";
private static final String PRODUCER_QUEUE_SIZE = "queue-size";
private static final String PRODUCER_PARALLELISM = "parallelism";
private static final String PRODUCER_MIN_BACKOFF = "min-backoff";
private static final String PRODUCER_MAX_BACKOFF = "max-backoff";
private static final String PRODUCER_RANDOM_FACTOR = "random-factor";

private final Config consumerConfig;
private final ThrottlingConfig consumerThrottlingConfig;

private final Config producerConfig;
private final int producerQueueSize;
private final int producerParallelism;
private final Duration producerMinBackoff;
private final Duration producerMaxBackoff;
private final double producerRandomFactor;
private final KafkaConsumerConfig consumerConfig;
private final KafkaProducerConfig producerConfig;

private DefaultKafkaConfig(final ScopedConfig kafkaScopedConfig) {
consumerConfig = kafkaScopedConfig.getConfig(CONSUMER_PATH);
consumerThrottlingConfig = ThrottlingConfig.of(kafkaScopedConfig.hasPath(CONSUMER_PATH)
consumerConfig = KafkaConsumerConfig.of(kafkaScopedConfig.hasPath(CONSUMER_PATH)
? kafkaScopedConfig.getConfig(CONSUMER_PATH)
: ConfigFactory.empty());

producerConfig = kafkaScopedConfig.getConfig(PRODUCER_PATH);
producerQueueSize = producerConfig.getInt(PRODUCER_QUEUE_SIZE);
producerParallelism = producerConfig.getInt(PRODUCER_PARALLELISM);
producerMinBackoff = producerConfig.getDuration(PRODUCER_MIN_BACKOFF);
producerMaxBackoff = producerConfig.getDuration(PRODUCER_MAX_BACKOFF);
producerRandomFactor = producerConfig.getDouble(PRODUCER_RANDOM_FACTOR);
producerConfig = KafkaProducerConfig.of(kafkaScopedConfig.hasPath(PRODUCER_PATH)
? kafkaScopedConfig.getConfig(PRODUCER_PATH)
: ConfigFactory.empty());
}

/**
* Returns an instance of {@code DefaultKafkaConfig} based on the settings of the specified Config.
*
* @param config is supposed to provide the Kafka config setting at {@value #CONFIG_PATH}.
* @param config is supposed to provide the Kafka config setting at {@value #KAFKA_PATH}.
* @return the instance.
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultKafkaConfig of(final Config config) {
return new DefaultKafkaConfig(DefaultScopedConfig.newInstance(config, CONFIG_PATH));
return new DefaultKafkaConfig(DefaultScopedConfig.newInstance(config, KAFKA_PATH));
}

@Override
public Config getConsumerConfig() {
public KafkaConsumerConfig getConsumerConfig() {
return consumerConfig;
}

@Override
public ThrottlingConfig getConsumerThrottlingConfig() {
return consumerThrottlingConfig;
}

@Override
public Config getProducerConfig() {
public KafkaProducerConfig getProducerConfig() {
return producerConfig;
}

@Override
public int getProducerQueueSize() {
return producerQueueSize;
}

@Override
public int getProducerParallelism() {
return producerParallelism;
}

@Override
public Duration getProducerMinBackoff() {
return producerMinBackoff;
}

@Override
public Duration getProducerMaxBackoff() {
return producerMaxBackoff;
}

@Override
public double getProducerRandomFactor() {
return producerRandomFactor;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultKafkaConfig that = (DefaultKafkaConfig) o;
return Objects.equals(consumerConfig, that.consumerConfig) &&
Objects.equals(consumerThrottlingConfig, that.consumerThrottlingConfig) &&
Objects.equals(producerConfig, that.producerConfig) &&
Objects.equals(producerQueueSize, that.producerQueueSize) &&
Objects.equals(producerParallelism, that.producerParallelism) &&
Objects.equals(producerMinBackoff, that.producerMinBackoff) &&
Objects.equals(producerMaxBackoff, that.producerMaxBackoff) &&
Objects.equals(producerRandomFactor, that.producerRandomFactor);
Objects.equals(producerConfig, that.producerConfig);
}

@Override
public int hashCode() {
return Objects.hash(consumerConfig, consumerThrottlingConfig, producerConfig, producerQueueSize,
producerParallelism, producerMinBackoff, producerMaxBackoff, producerRandomFactor);
return Objects.hash(consumerConfig, producerConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"consumerConfig=" + consumerConfig +
", consumerThrottlingConfig=" + consumerThrottlingConfig +
", producerConfig=" + producerConfig +
", producerQueueSize=" + producerQueueSize +
", producerParallelism=" + producerParallelism +
", producerMinBackoff=" + producerMinBackoff +
", producerMaxBackoff=" + producerMaxBackoff +
", producerRandomFactor=" + producerRandomFactor +
"]";
}
}
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.config;

import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.service.config.ThrottlingConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

/**
* This class is the default implementation of {@link KafkaConsumerConfig}.
*/
@Immutable
public final class DefaultKafkaConsumerConfig implements KafkaConsumerConfig {

private static final String ALPAKKA_PATH = "alpakka";

private final ThrottlingConfig throttlingConfig;
private final Config alpakkaConfig;

private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
throttlingConfig = ThrottlingConfig.of(kafkaConsumerScopedConfig.hasPath(ThrottlingConfig.CONFIG_PATH)
? kafkaConsumerScopedConfig.getConfig(ThrottlingConfig.CONFIG_PATH)
: ConfigFactory.empty());
alpakkaConfig = kafkaConsumerScopedConfig.getConfig(ALPAKKA_PATH);
}

/**
* Returns an instance of {@code DefaultKafkaConsumerConfig} based on the settings of the specified Config.
*
* @param config is supposed to provide the Kafka config setting.
* @return the instance.
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultKafkaConsumerConfig of(final Config config) {
return new DefaultKafkaConsumerConfig(config);
}

@Override
public ThrottlingConfig getThrottlingConfig() {
return throttlingConfig;
}

@Override
public Config getAlpakkaConfig() {
return alpakkaConfig;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultKafkaConsumerConfig that = (DefaultKafkaConsumerConfig) o;
return Objects.equals(throttlingConfig, that.throttlingConfig) &&
Objects.equals(alpakkaConfig, that.alpakkaConfig);
}

@Override
public int hashCode() {
return Objects.hash(throttlingConfig, alpakkaConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"throttlingConfig=" + throttlingConfig +
", alpakkaConfig=" + alpakkaConfig +
"]";
}
}
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.config;

import java.time.Duration;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import com.typesafe.config.Config;

/**
* This class is the default implementation of {@link KafkaProducerConfig}.
*/
@Immutable
public final class DefaultKafkaProducerConfig implements KafkaProducerConfig {

private static final String QUEUE_SIZE_PATH = "queue-size";
private static final String PARALLELISM_PATH = "parallelism";
private static final String MIN_BACKOFF_PATH = "min-backoff";
private static final String MAX_BACKOFF_PATH = "max-backoff";
private static final String RANDOM_FACTOR_PATH = "random-factor";
private static final String ALPAKKA_PATH = "alpakka";

private final int queueSize;
private final int parallelism;
private final Duration minBackoff;
private final Duration maxBackoff;
private final double randomFactor;
private final Config alpakkaConfig;

private DefaultKafkaProducerConfig(final Config kafkaProducerScopedConfig) {
queueSize = kafkaProducerScopedConfig.getInt(QUEUE_SIZE_PATH);
parallelism = kafkaProducerScopedConfig.getInt(PARALLELISM_PATH);
minBackoff = kafkaProducerScopedConfig.getDuration(MIN_BACKOFF_PATH);
maxBackoff = kafkaProducerScopedConfig.getDuration(MAX_BACKOFF_PATH);
randomFactor = kafkaProducerScopedConfig.getDouble(RANDOM_FACTOR_PATH);
alpakkaConfig = kafkaProducerScopedConfig.getConfig(ALPAKKA_PATH);
}

/**
* Returns an instance of {@code DefaultKafkaProducerConfig} based on the settings of the specified Config.
*
* @param config is supposed to provide the Kafka config setting.
* @return the instance.
* @throws org.eclipse.ditto.internal.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultKafkaProducerConfig of(final Config config) {
return new DefaultKafkaProducerConfig(config);
}

@Override
public int getQueueSize() {
return queueSize;
}

@Override
public int getParallelism() {
return parallelism;
}

@Override
public Duration getMinBackoff() {
return minBackoff;
}

@Override
public Duration getMaxBackoff() {
return maxBackoff;
}

@Override
public double getRandomFactor() {
return randomFactor;
}

@Override
public Config getAlpakkaConfig() {
return alpakkaConfig;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultKafkaProducerConfig that = (DefaultKafkaProducerConfig) o;
return Objects.equals(queueSize, that.queueSize) &&
Objects.equals(parallelism, that.parallelism) &&
Objects.equals(minBackoff, that.minBackoff) &&
Objects.equals(maxBackoff, that.maxBackoff) &&
Objects.equals(randomFactor, that.randomFactor) &&
Objects.equals(alpakkaConfig, that.alpakkaConfig);
}

@Override
public int hashCode() {
return Objects.hash(queueSize, parallelism, minBackoff, maxBackoff, randomFactor, alpakkaConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"queueSize=" + queueSize +
", parallelism=" + parallelism +
", minBackoff=" + minBackoff +
", maxBackoff=" + maxBackoff +
", randomFactor=" + randomFactor +
", alpakkaConfig=" + alpakkaConfig +
"]";
}
}

0 comments on commit d927f65

Please sign in to comment.