Skip to content

Commit

Permalink
#586 add configuration for kafka consumer, add migration notes for ch…
Browse files Browse the repository at this point in the history
…anged kafka configuration

Signed-off-by: Johannes Schneider <johannes.schneider@bosch.io>
  • Loading branch information
jokraehe committed May 21, 2021
1 parent 584b26b commit 1236d91
Show file tree
Hide file tree
Showing 22 changed files with 282 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
public final class DefaultKafkaConfig implements KafkaConfig {

private static final String CONFIG_PATH = "kafka";
private static final String CONSUMER_PATH = "consumer";
private static final String PRODUCER_PATH = "producer";

private final Config internalProducerConfig;
private final Config consumerConfig;
private final Config producerConfig;

private DefaultKafkaConfig(final ScopedConfig kafkaScopedConfig) {
internalProducerConfig = kafkaScopedConfig.getConfig("producer.internal");
consumerConfig = kafkaScopedConfig.getConfig(CONSUMER_PATH);
producerConfig = kafkaScopedConfig.getConfig(PRODUCER_PATH);
}

/**
Expand All @@ -47,8 +51,13 @@ public static DefaultKafkaConfig of(final Config config) {
}

@Override
public Config getInternalProducerConfig() {
return internalProducerConfig;
public Config getConsumerConfig() {
return consumerConfig;
}

@Override
public Config getProducerConfig() {
return producerConfig;
}

@Override
Expand All @@ -60,18 +69,19 @@ public boolean equals(final Object o) {
return false;
}
final DefaultKafkaConfig that = (DefaultKafkaConfig) o;
return Objects.equals(internalProducerConfig, that.internalProducerConfig);
return Objects.equals(producerConfig, that.producerConfig);
}

@Override
public int hashCode() {
return Objects.hash(internalProducerConfig);
return Objects.hash(producerConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"internalProducerConfig=" + internalProducerConfig +
"consumerConfig=" + consumerConfig +
"producerConfig=" + producerConfig +
"]";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
@Immutable
public interface KafkaConfig {

/**
* Returns the Config for consumers needed by the Kafka client.
*
* @return consumer configuration needed by the Kafka client.
*/
Config getConsumerConfig();

/**
* Returns the Config for producers needed by the Kafka client.
*
* @return internal producer configuration needed by the Kafka client.
* @return producer configuration needed by the Kafka client.
*/
Config getInternalProducerConfig();
Config getProducerConfig();

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.util.Map;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.KafkaConfig;

/**
* Default implementation of {@link org.eclipse.ditto.connectivity.service.messaging.kafka.KafkaProducerFactory}.
*/
final class DefaultKafkaProducerFactory implements KafkaProducerFactory {

private static final Serializer<String> KEY_SERIALIZER = new StringSerializer();
private static final Serializer<String> VALUE_SERIALIZER = KEY_SERIALIZER;

private final Map<String, Object> producerProperties;

private DefaultKafkaProducerFactory(final Map<String, Object> producerProperties) {
this.producerProperties = producerProperties;
}

/**
* Returns an instance of the default Kafka connection factory.
*
* @param propertiesFactory a factory to create kafka client configuration properties.
* @return an Kafka connection factory.
*/
static DefaultKafkaProducerFactory getInstance(final PropertiesFactory propertiesFactory) {
final Map<String, Object> producerProperties = propertiesFactory.getProducerProperties();
return new DefaultKafkaProducerFactory(producerProperties);
}

@Override
public Producer<String, String> newProducer() {
return new KafkaProducer<>(producerProperties, KEY_SERIALIZER, VALUE_SERIALIZER);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String getActorName() {
}

@Override
public Props props(final Connection connection, final KafkaConnectionFactory factory, final boolean dryRun,
public Props props(final Connection connection, final KafkaProducerFactory factory, final boolean dryRun,
final String clientId) {
return KafkaPublisherActor.props(connection, factory, dryRun, clientId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ public static KafkaBootstrapServerSpecificConfig getInstance() {
return instance;
}

public String getBootstrapServers(final Connection connection) {
final String mergedBootstrapServers;
if (isValid(connection)) {
final String bootstrapServerFromUri = getBootstrapServerFromUri(connection);
final String additionalBootstrapServers = getBootstrapServersFromSpecificConfig(connection);
mergedBootstrapServers =
mergeAdditionalBootstrapServers(bootstrapServerFromUri, additionalBootstrapServers);
} else {
// basically we should never end in this else-branch, since the connection should always contain bootstrap servers.
// so this is just a fallback is something bad happens.
LOG.warn(
"Kafka connection <{}> contains invalid configuration for its bootstrap servers. Either they are empty," +
" or don't match the pattern <host:port[,host:port]>. This should never happen as the connection should" +
" not have been stored with the invalid pattern.", connection.getId());
mergedBootstrapServers = getBootstrapServerFromUri(connection);
}
return mergedBootstrapServers;
}

@Override
public boolean isApplicable(final Connection connection) {
// bootstrap servers have always to be part of the connection, so the config is always applicable.
Expand All @@ -81,21 +100,7 @@ public boolean isValid(final Connection connection) {

@Override
public void apply(final HashMap<String, Object> producerProperties, final Connection connection) {
final String mergedBootstrapServers;
if (isValid(connection)) {
final String bootstrapServerFromUri = getBootstrapServerFromUri(connection);
final String additionalBootstrapServers = getBootstrapServersFromSpecificConfig(connection);
mergedBootstrapServers =
mergeAdditionalBootstrapServers(bootstrapServerFromUri, additionalBootstrapServers);
} else {
// basically we should never end in this else-branch, since the connection should always contain bootstrap servers.
// so this is just a fallback is something bad happens.
LOG.warn(
"Kafka connection <{}> contains invalid configuration for its bootstrap servers. Either they are empty," +
" or don't match the pattern <host:port[,host:port]>. This should never happen as the connection should" +
" not have been stored with the invalid pattern.", connection.getId());
mergedBootstrapServers = getBootstrapServerFromUri(connection);
}
final String mergedBootstrapServers = getBootstrapServers(connection);
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mergedBootstrapServers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public final class KafkaClientActor extends BaseClientActor {

private final KafkaPublisherActorFactory publisherActorFactory;
private final Set<ActorRef> pendingStatusReportsFromStreams;
private final KafkaConnectionFactory connectionFactory;
private final PropertiesFactory propertiesFactory;
private final KafkaProducerFactory connectionFactory;

private CompletableFuture<Status.Status> testConnectionFuture = null;
private ActorRef kafkaPublisherActor;
Expand All @@ -68,8 +69,8 @@ private KafkaClientActor(final Connection connection,
kafkaConsumerActors = new ArrayList<>();
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
final KafkaConfig kafkaConfig = connectionConfig.getKafkaConfig();
connectionFactory =
DefaultKafkaConnectionFactory.getInstance(connection, kafkaConfig, getClientId(connection.getId()));
propertiesFactory = PropertiesFactory.newInstance(connection, kafkaConfig, getClientId(connection.getId()));
connectionFactory = DefaultKafkaProducerFactory.getInstance(propertiesFactory);
this.publisherActorFactory = publisherActorFactory;
pendingStatusReportsFromStreams = new HashSet<>();
}
Expand Down Expand Up @@ -166,9 +167,9 @@ private void connectClient(final boolean dryRun, final ConnectionId connectionId
}

private void startKafkaPublisher(final boolean dryRun, final ConnectionId connectionId,
@Nullable final CharSequence correlationid) {
@Nullable final CharSequence correlationId) {

logger.withCorrelationId(correlationid).withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId)
logger.withCorrelationId(correlationId).withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId)
.info("Starting Kafka publisher actor.");
// ensure no previous publisher stays in memory
stopPublisherActor();
Expand All @@ -180,10 +181,12 @@ private void startKafkaPublisher(final boolean dryRun, final ConnectionId connec

private void startKafkaConsumers(final boolean dryRun, final ConnectionId connectionId,
@Nullable final CharSequence correlationId) {

logger.withCorrelationId(correlationId).withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId)
.info("Starting Kafka consumer actor.");
// ensure no previous consumer stays in memory
stopConsumerActors();

// start consumer actors
connection().getSources().stream()
.flatMap(this::consumerDataFromSource)
Expand All @@ -200,7 +203,7 @@ private Stream<ConsumerData> consumerDataFromSource(final Source source) {

private void startKafkaConsumer(final ConsumerData consumerData, final boolean dryRun) {
final Props consumerActorProps =
KafkaConsumerActor.props(connection(), connectionFactory, consumerData.getAddress(),
KafkaConsumerActor.props(connection(), propertiesFactory, consumerData.getAddress(),
getInboundMappingProcessorActor(), consumerData.getSource(), dryRun);
final ActorRef consumerActor =
startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps);
Expand Down
Loading

0 comments on commit 1236d91

Please sign in to comment.