Skip to content

Commit

Permalink
Implement basic structure for kafka consumers
Browse files Browse the repository at this point in the history
* All TODOs are prefixed with 'TODO: kafka source'

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed May 17, 2021
1 parent 740abd2 commit f1eb522
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 23 deletions.
9 changes: 7 additions & 2 deletions bom/pom.xml
Expand Up @@ -42,7 +42,7 @@
<akka-http-bom.version>10.2.4</akka-http-bom.version>
<akka-management.version>1.0.10</akka-management.version>
<hivemq-mqtt-client.version>1.2.2</hivemq-mqtt-client.version>
<kafka-clients.version>2.5.1</kafka-clients.version>
<kafka.version>2.8.0</kafka.version>
<sshd.version>2.6.0</sshd.version>
<eddsa.version>0.3.0</eddsa.version>

Expand Down Expand Up @@ -323,7 +323,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
Expand Down
4 changes: 4 additions & 0 deletions connectivity/service/pom.xml
Expand Up @@ -79,6 +79,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.mozilla</groupId>
<artifactId>rhino-runtime</artifactId>
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import org.eclipse.ditto.connectivity.model.Source;

/**
* This is class holds the data to identify a single address of a source of a kafka connection.
*/
final class ConsumerData {

private final Source source;
private final String address;
private final String addressWithIndex;

ConsumerData(final Source source, final String address, final String addressWithIndex) {
this.source = source;
this.address = address;
this.addressWithIndex = addressWithIndex;
}

Source getSource() {
return source;
}

String getAddress() {
return address;
}

String getActorNamePrefix() {
return KafkaConsumerActor.ACTOR_NAME_PREFIX + source.getIndex() + "-" + addressWithIndex;
}

}
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.eclipse.ditto.connectivity.model.Connection;

final class ConsumerPropertiesFactory {

private final Connection connection;

private ConsumerPropertiesFactory(final Connection connection) {
this.connection = checkNotNull(connection, "connection");
}

static ConsumerPropertiesFactory getInstance(final Connection connection) {
return new ConsumerPropertiesFactory(connection);
}

Properties getConsumerProperties() {
// TODO: kafka source - Adjust configuration according to kafka documentation
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, connection.getUri());
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, connection.getId().toString());
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return properties;
}

}
Expand Up @@ -13,12 +13,14 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.service.config.KafkaConfig;

/**
Expand All @@ -30,12 +32,15 @@ final class DefaultKafkaConnectionFactory implements KafkaConnectionFactory {
private static final Serializer<String> VALUE_SERIALIZER = KEY_SERIALIZER;

private final Connection connection;
private final Map<String, Object> properties;
private final Map<String, Object> producerProperties;
private final Properties consumerStreamProperties;

private DefaultKafkaConnectionFactory(final Connection connection, final Map<String, Object> producerProperties) {
private DefaultKafkaConnectionFactory(final Connection connection, final Map<String, Object> producerProperties,
final Properties consumerStreamProperties) {

this.connection = connection;
properties = producerProperties;
this.producerProperties = producerProperties;
this.consumerStreamProperties = consumerStreamProperties;
}

/**
Expand All @@ -48,20 +53,27 @@ private DefaultKafkaConnectionFactory(final Connection connection, final Map<Str
*/
static DefaultKafkaConnectionFactory getInstance(final Connection connection, final KafkaConfig kafkaConfig,
final String clientId) {
final ProducerPropertiesFactory settingsFactory =
final ProducerPropertiesFactory producerPropertiesFactory =
ProducerPropertiesFactory.getInstance(connection, kafkaConfig, clientId);
final ConsumerPropertiesFactory consumerPropertiesFactory = ConsumerPropertiesFactory.getInstance(connection);

return new DefaultKafkaConnectionFactory(connection, settingsFactory.getProducerProperties());
return new DefaultKafkaConnectionFactory(connection, producerPropertiesFactory.getProducerProperties(),
consumerPropertiesFactory.getConsumerProperties());
}

@Override
public EntityId connectionId() {
public ConnectionId connectionId() {
return connection.getId();
}

@Override
public org.apache.kafka.clients.producer.Producer<String, String> newProducer() {
return new KafkaProducer<>(properties, KEY_SERIALIZER, VALUE_SERIALIZER);
public Producer<String, String> newProducer() {
return new KafkaProducer<>(producerProperties, KEY_SERIALIZER, VALUE_SERIALIZER);
}

@Override
public Properties consumerStreamProperties() {
return consumerStreamProperties;
}

}
Expand Up @@ -12,17 +12,24 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.config.KafkaConfig;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor;
Expand All @@ -32,8 +39,6 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;

import akka.actor.ActorRef;
import akka.actor.Props;
Expand All @@ -51,19 +56,21 @@ public final class KafkaClientActor extends BaseClientActor {

private CompletableFuture<Status.Status> testConnectionFuture = null;
private ActorRef kafkaPublisherActor;
private final List<ActorRef> kafkaConsumerActors;

@SuppressWarnings("unused") // used by `props` via reflection
private KafkaClientActor(final Connection connection,
@Nullable final ActorRef proxyActor,
final ActorRef connectionActor,
final KafkaPublisherActorFactory factory) {
final KafkaPublisherActorFactory publisherActorFactory) {

super(connection, proxyActor, connectionActor);
kafkaConsumerActors = new ArrayList<>();
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
final KafkaConfig kafkaConfig = connectionConfig.getKafkaConfig();
connectionFactory =
DefaultKafkaConnectionFactory.getInstance(connection, kafkaConfig, getClientId(connection.getId()));
publisherActorFactory = factory;
this.publisherActorFactory = publisherActorFactory;
pendingStatusReportsFromStreams = new HashSet<>();
}

Expand Down Expand Up @@ -154,7 +161,8 @@ private void connectClient(final boolean dryRun, final ConnectionId connectionId

// start publisher
startKafkaPublisher(dryRun, connectionId, correlationId);
// no command consumers as we don't support consuming from sources yet
// start consumers
startKafkaConsumers(dryRun, connectionId, correlationId);
}

private void startKafkaPublisher(final boolean dryRun, final ConnectionId connectionId,
Expand All @@ -170,10 +178,49 @@ private void startKafkaPublisher(final boolean dryRun, final ConnectionId connec
pendingStatusReportsFromStreams.add(kafkaPublisherActor);
}

private void startKafkaConsumers(final boolean dryRun, final ConnectionId connectionId,
@Nullable final CharSequence correlationId) {
logger.withCorrelationId(correlationId).withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId)
.info("Starting Kafka consumer actor.");
// ensure no previous consumer stays in memory
stopConsumerActors();
// start consumer actors
connection().getSources().stream()
.flatMap(this::consumerDataFromSource)
.forEach(consumerData -> this.startKafkaConsumer(consumerData, dryRun));
}

private Stream<ConsumerData> consumerDataFromSource(final Source source) {
return source.getAddresses().stream()
.flatMap(sourceAddress ->
IntStream.range(0, source.getConsumerCount())
.mapToObj(i -> sourceAddress + "-" + i)
.map(addressWithIndex -> new ConsumerData(source, sourceAddress, addressWithIndex)));
}

private void startKafkaConsumer(final ConsumerData consumerData, final boolean dryRun) {
final Props consumerActorProps =
KafkaConsumerActor.props(connection(), connectionFactory, consumerData.getAddress(),
getInboundMappingProcessorActor(), consumerData.getSource(), dryRun);
final ActorRef consumerActor =
startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps);
kafkaConsumerActors.add(consumerActor);
}

private void stopConsumerActors() {
kafkaConsumerActors.forEach(consumerActor -> {
logger.debug("Stopping child actor <{}>.", consumerActor.path());
// shutdown using a message, so the actor can clean up first
consumerActor.tell(KafkaConsumerActor.GracefulStop.INSTANCE, getSelf());
});
kafkaConsumerActors.clear();
}

@Override
protected void cleanupResourcesForConnection() {
pendingStatusReportsFromStreams.clear();
stopPublisherActor();
stopConsumerActors();
}

@Override
Expand Down
Expand Up @@ -12,8 +12,10 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Producer;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.connectivity.model.ConnectionId;

/**
* Creates Kafka sinks.
Expand All @@ -25,12 +27,20 @@ interface KafkaConnectionFactory {
*
* @return the ID.
*/
EntityId connectionId();
ConnectionId connectionId();

/**
* Create a producer of Kafka messages.
*
* @return the producer.
*/
Producer<String, String> newProducer();

/**
* Returns the consumer properties to configure a Kafka stream.
*
* @return the consumer properties.
*/
Properties consumerStreamProperties();

}

0 comments on commit f1eb522

Please sign in to comment.