Skip to content

Commit

Permalink
alpakka to pekko-connectors migration
Browse files Browse the repository at this point in the history
Signed-off-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.com>
  • Loading branch information
alstanchev authored and Aleksandar Stanchev committed Aug 28, 2023
1 parent 75bf598 commit 78353f5
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
final class DefaultKafkaCommitterConfig implements KafkaCommitterConfig {

private static final String CONFIG_PATH = "committer";
private static final String ALPAKKA_PATH = "alpakka";
private static final String PEKKO_CONNECTORS_PATH = "pekko-connectors";

private final Config alpakkaConfig;
private final Config pekkoConnectorsConfig;

private DefaultKafkaCommitterConfig(final Config kafkaCommitterScopedConfig) {
alpakkaConfig = getConfigOrEmpty(kafkaCommitterScopedConfig, ALPAKKA_PATH);
pekkoConnectorsConfig = getConfigOrEmpty(kafkaCommitterScopedConfig, PEKKO_CONNECTORS_PATH);
}

/**
Expand All @@ -50,8 +50,8 @@ private static Config getConfigOrEmpty(final Config config, final String configK
}

@Override
public Config getAlpakkaConfig() {
return alpakkaConfig;
public Config getPekkoConnectorsConfig() {
return pekkoConnectorsConfig;
}

@Override
Expand All @@ -63,18 +63,18 @@ public boolean equals(final Object o) {
return false;
}
final DefaultKafkaCommitterConfig that = (DefaultKafkaCommitterConfig) o;
return Objects.equals(alpakkaConfig, that.alpakkaConfig);
return Objects.equals(pekkoConnectorsConfig, that.pekkoConnectorsConfig);
}

@Override
public int hashCode() {
return Objects.hash(alpakkaConfig);
return Objects.hash(pekkoConnectorsConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"alpakkaConfig=" + alpakkaConfig +
"pekkoConnectorsConfig=" + pekkoConnectorsConfig +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@
final class DefaultKafkaConsumerConfig implements KafkaConsumerConfig {

private static final String CONFIG_PATH = "consumer";
private static final String ALPAKKA_PATH = "alpakka";
private static final String PEKKO_CONNECTORS_PATH = "pekko-connectors";
private static final String RESTART_PATH = "restart";

private final ConnectionThrottlingConfig throttlingConfig;
private final ExponentialBackOffConfig restartBackOffConfig;
private final Config alpakkaConfig;
private final Config pekkoConnectorsConfig;
private final Duration metricCollectingInterval;
private final long initTimeoutSeconds;

private DefaultKafkaConsumerConfig(final Config kafkaConsumerScopedConfig) {
throttlingConfig = ConnectionThrottlingConfig.of(kafkaConsumerScopedConfig);
restartBackOffConfig =
DefaultExponentialBackOffConfig.of(getConfigOrEmpty(kafkaConsumerScopedConfig, RESTART_PATH));
alpakkaConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, ALPAKKA_PATH);
pekkoConnectorsConfig = getConfigOrEmpty(kafkaConsumerScopedConfig, PEKKO_CONNECTORS_PATH);
metricCollectingInterval =
kafkaConsumerScopedConfig.getDuration(ConfigValue.METRIC_COLLECTING_INTERVAL.getConfigPath());
if (metricCollectingInterval.isNegative() || metricCollectingInterval.isZero()) {
Expand Down Expand Up @@ -79,8 +79,8 @@ public ExponentialBackOffConfig getRestartBackOffConfig() {
}

@Override
public Config getAlpakkaConfig() {
return alpakkaConfig;
public Config getPekkoConnectorsConfig() {
return pekkoConnectorsConfig;
}

@Override
Expand All @@ -104,14 +104,14 @@ public boolean equals(final Object o) {
final DefaultKafkaConsumerConfig that = (DefaultKafkaConsumerConfig) o;
return Objects.equals(throttlingConfig, that.throttlingConfig) &&
Objects.equals(restartBackOffConfig, that.restartBackOffConfig) &&
Objects.equals(alpakkaConfig, that.alpakkaConfig) &&
Objects.equals(pekkoConnectorsConfig, that.pekkoConnectorsConfig) &&
Objects.equals(metricCollectingInterval, that.metricCollectingInterval) &&
Objects.equals(initTimeoutSeconds, that.initTimeoutSeconds);
}

@Override
public int hashCode() {
return Objects.hash(throttlingConfig, restartBackOffConfig, alpakkaConfig, metricCollectingInterval,
return Objects.hash(throttlingConfig, restartBackOffConfig, pekkoConnectorsConfig, metricCollectingInterval,
initTimeoutSeconds);
}

Expand All @@ -120,7 +120,7 @@ public String toString() {
return getClass().getSimpleName() + " [" +
"throttlingConfig=" + throttlingConfig +
", restartBackOffConfig=" + restartBackOffConfig +
", alpakkaConfig=" + alpakkaConfig +
", pekkoConnectorsConfig=" + pekkoConnectorsConfig +
", metricCollectingInterval=" + metricCollectingInterval +
", initTimeoutSeconds=" + initTimeoutSeconds +
"]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
final class DefaultKafkaProducerConfig implements KafkaProducerConfig {

private static final String CONFIG_PATH = "producer";
private static final String ALPAKKA_PATH = "alpakka";
private static final String PEKKO_CONNECTORS_PATH = "pekko-connectors";

private final int queueSize;
private final int parallelism;
Expand All @@ -37,7 +37,7 @@ final class DefaultKafkaProducerConfig implements KafkaProducerConfig {
private final double randomFactor;
private final int maxRestartsCount;
private final Duration maxRestartsWithin;
private final Config alpakkaConfig;
private final Config pekkoConnectorsConfig;
private final long initTimeoutSeconds;

private DefaultKafkaProducerConfig(final Config kafkaProducerScopedConfig) {
Expand All @@ -48,7 +48,7 @@ private DefaultKafkaProducerConfig(final Config kafkaProducerScopedConfig) {
randomFactor = kafkaProducerScopedConfig.getDouble(ConfigValue.RANDOM_FACTOR.getConfigPath());
maxRestartsCount = kafkaProducerScopedConfig.getInt(ConfigValue.MAX_RESTARTS_COUNT.getConfigPath());
maxRestartsWithin = kafkaProducerScopedConfig.getDuration(ConfigValue.MAX_RESTARTS_WITHIN.getConfigPath());
alpakkaConfig = kafkaProducerScopedConfig.getConfig(ALPAKKA_PATH);
pekkoConnectorsConfig = kafkaProducerScopedConfig.getConfig(PEKKO_CONNECTORS_PATH);
initTimeoutSeconds = kafkaProducerScopedConfig.getLong(ConfigValue.INIT_TIMEOUT_SECONDS.getConfigPath());
}

Expand Down Expand Up @@ -100,8 +100,8 @@ public Duration getMaxRestartsWithin() {
}

@Override
public Config getAlpakkaConfig() {
return alpakkaConfig;
public Config getPekkoConnectorsConfig() {
return pekkoConnectorsConfig;
}

@Override
Expand All @@ -121,14 +121,14 @@ public boolean equals(final Object o) {
Objects.equals(randomFactor, that.randomFactor) &&
Objects.equals(maxRestartsCount, that.maxRestartsCount) &&
Objects.equals(maxRestartsWithin, that.maxRestartsWithin) &&
Objects.equals(alpakkaConfig, that.alpakkaConfig) &&
Objects.equals(pekkoConnectorsConfig, that.pekkoConnectorsConfig) &&
Objects.equals(initTimeoutSeconds, that.initTimeoutSeconds);
}

@Override
public int hashCode() {
return Objects.hash(queueSize, parallelism, minBackoff, maxBackoff, maxRestartsCount, maxRestartsWithin,
randomFactor, alpakkaConfig, initTimeoutSeconds);
randomFactor, pekkoConnectorsConfig, initTimeoutSeconds);
}

@Override
Expand All @@ -141,7 +141,7 @@ public String toString() {
", randomFactor=" + randomFactor +
", maxRestartsCount=" + maxRestartsCount +
", maxRestartsWithin=" + maxRestartsWithin +
", alpakkaConfig=" + alpakkaConfig +
", pekkoConnectorsConfig=" + pekkoConnectorsConfig +
", initTimeoutSeconds=" + initTimeoutSeconds +
"]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface KafkaCommitterConfig {
*
* @return consumer configuration needed by the Kafka client.
*/
Config getAlpakkaConfig();
Config getPekkoConnectorsConfig();

/**
* Returns an instance of {@code KafkaCommitterConfig} based on the settings of the specified Config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface KafkaConsumerConfig {
*
* @return consumer configuration needed by the Kafka client.
*/
Config getAlpakkaConfig();
Config getPekkoConnectorsConfig();

/**
* Returns the interval in which metrics from the Apache Kafka client should be collected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface KafkaProducerConfig {
*
* @return consumer configuration needed by the Kafka client.
*/
Config getAlpakkaConfig();
Config getPekkoConnectorsConfig();

/**
* @return timeout before the producer is initialized and considered "ready".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ static PropertiesFactory newInstance(final Connection connection,
* @return the settings.
*/
ConsumerSettings<String, ByteBuffer> getConsumerSettings(final boolean dryRun) {
final Config alpakkaConfig = config.getConsumerConfig().getAlpakkaConfig();
final Config pekkoConnectorsConfig = config.getConsumerConfig().getPekkoConnectorsConfig();
final ConnectionCheckerSettings connectionCheckerSettings =
ConnectionCheckerSettings.apply(alpakkaConfig.getConfig("connection-checker"));
ConnectionCheckerSettings.apply(pekkoConnectorsConfig.getConfig("connection-checker"));
final ConsumerSettings<String, ByteBuffer> consumerSettings =
ConsumerSettings.apply(alpakkaConfig, new StringDeserializer(), new ByteBufferDeserializer())
ConsumerSettings.apply(pekkoConnectorsConfig, new StringDeserializer(), new ByteBufferDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(connection.getId().toString())
.withClientId(clientId + "-consumer")
Expand All @@ -115,13 +115,13 @@ ConsumerSettings<String, ByteBuffer> getConsumerSettings(final boolean dryRun) {
}

CommitterSettings getCommitterSettings() {
final Config committerConfig = config.getCommitterConfig().getAlpakkaConfig();
final Config committerConfig = config.getCommitterConfig().getPekkoConnectorsConfig();
return CommitterSettings.apply(committerConfig);
}

ProducerSettings<String, ByteBuffer> getProducerSettings() {
final Config alpakkaConfig = config.getProducerConfig().getAlpakkaConfig();
return ProducerSettings.apply(alpakkaConfig, new StringSerializer(), new ByteBufferSerializer())
final Config pekkoConnectorsConfig = config.getProducerConfig().getPekkoConnectorsConfig();
return ProducerSettings.apply(pekkoConnectorsConfig, new StringSerializer(), new ByteBufferSerializer())
.withBootstrapServers(bootstrapServers)
.withProperties(getClientIdProperties())
.withProperties(getTrustedSelfSignedCertificates())
Expand Down Expand Up @@ -194,7 +194,7 @@ private boolean isConnectionSecure() {

/**
* Convert a structured Config into flat map from config paths to values.
* Replicates Alpakka Kafka client's interpretation of client Config.
* Replicates Pekko Connectors Kafka client's interpretation of client Config.
*
* @param config the Config object.
* @return flat map from config paths to values.
Expand Down
12 changes: 6 additions & 6 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ ditto {
}
}

alpakka = ${pekko.kafka.consumer} # resolve defaults from reference.conf
alpakka = {
pekko-connectors = ${pekko.kafka.consumer} # resolve defaults from reference.conf
pekko-connectors = {
use-dispatcher = "kafka-consumer-dispatcher"
poll-interval = 50ms
stop-timeout = 0s
Expand Down Expand Up @@ -613,8 +613,8 @@ ditto {
}

committer {
alpakka = ${pekko.kafka.committer} # resolve defaults from reference.conf
alpakka = {
pekko-connectors = ${pekko.kafka.committer} # resolve defaults from reference.conf
pekko-connectors = {
# Maximum number of messages in a single commit batch
max-batch = 1000
max-batch = ${?KAFKA_COMMITTER_MAX_BATCH}
Expand Down Expand Up @@ -656,8 +656,8 @@ ditto {
init-timeout-seconds = 3
init-timeout-seconds = ${?KAFKA_PRODUCER_INIT_TIMEOUT_SECONDS}

alpakka = ${pekko.kafka.producer} # resolve defaults from reference.conf
alpakka {
pekko-connectors = ${pekko.kafka.producer} # resolve defaults from reference.conf
pekko-connectors {
use-dispatcher = "kafka-producer-dispatcher"

# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ public void underTestReturnsValuesOfConfigFile() {
assertThat(underTest.getConsumerConfig().getThrottlingConfig().getInterval()).isEqualTo(Duration.ofSeconds(1));

assertThat(underTest.getConsumerConfig()
.getAlpakkaConfig()
.getPekkoConnectorsConfig()
.getDuration("poll-interval")) // from pekko.kafka.consumer
.isEqualTo(DEFAULT_POLL_INTERVAL);
assertThat(underTest.getConsumerConfig().getAlpakkaConfig().getDuration("poll-timeout")) // from kafka-test.conf
assertThat(underTest.getConsumerConfig().getPekkoConnectorsConfig().getDuration("poll-timeout")) // from kafka-test.conf
.isEqualTo(DEFAULT_POLL_TIMEOUT);

assertThat(underTest.getProducerConfig().getParallelism()) // from pekko.kafka.producer
.isEqualTo(DEFAULT_PARALLELISM);
assertThat(
underTest.getProducerConfig().getAlpakkaConfig().getDuration("close-timeout")) // from kafka-test.conf
underTest.getProducerConfig().getPekkoConnectorsConfig().getDuration("close-timeout")) // from kafka-test.conf
.isEqualTo(DEFAULT_CLOSE_TIMEOUT);

assertThat(underTest.getProducerConfig().getQueueSize()).isEqualTo(39);
Expand Down
8 changes: 4 additions & 4 deletions connectivity/service/src/test/resources/kafka-test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ kafka {
interval = 1s
limit = 100
}
alpakka = ${pekko.kafka.consumer}
alpakka {
pekko-connectors = ${pekko.kafka.consumer}
pekko-connectors {
poll-timeout = 100ms # overwrites default from reference.conf
kafka-clients {
default.key.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde" # default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
Expand All @@ -24,8 +24,8 @@ kafka {
random-factor = 0.2
init-timeout-seconds = 3

alpakka = ${pekko.kafka.producer}
alpakka {
pekko-connectors = ${pekko.kafka.producer}
pekko-connectors {
close-timeout = 10s # overwrites default from reference.conf
kafka-clients {
connections.max.idle.ms = 540000 # default: 540000 (9min)
Expand Down
8 changes: 4 additions & 4 deletions connectivity/service/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ ditto {

throttling.enabled = true

alpakka = ${pekko.kafka.consumer}
alpakka = {
pekko-connectors = ${pekko.kafka.consumer}
pekko-connectors = {
default.key.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde" # default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.value.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde" # default: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
kafka-clients {
Expand All @@ -170,8 +170,8 @@ ditto {
random-factor = 0.2
init-timeout-seconds = 3

alpakka = ${pekko.kafka.producer}
alpakka {
pekko-connectors = ${pekko.kafka.producer}
pekko-connectors {
kafka-clients {
connections.max.idle.ms = 543210
reconnect.backoff {
Expand Down

0 comments on commit 78353f5

Please sign in to comment.