Skip to content

Commit

Permalink
WIP: switch to akka-stream-kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed May 18, 2021
1 parent 7e1dc07 commit 584b26b
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 122 deletions.
6 changes: 6 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<akka-persistence-mongo.version>3.0.6</akka-persistence-mongo.version>
<akka-http-bom.version>10.2.4</akka-http-bom.version>
<akka-management.version>1.0.10</akka-management.version>
<akka-stream-kafka.version>2.1.0</akka-stream-kafka.version>
<hivemq-mqtt-client.version>1.2.2</hivemq-mqtt-client.version>
<kafka.version>2.8.0</kafka.version>
<sshd.version>2.6.0</sshd.version>
Expand Down Expand Up @@ -134,6 +135,11 @@
<version>${akka-bom.version}</version>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_${scala.version}</artifactId>
<version>${akka-stream-kafka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.version}</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions connectivity/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.mozilla</groupId>
<artifactId>rhino-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@

import java.time.Duration;
import java.util.Map;
import java.util.function.Supplier;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
Expand All @@ -41,6 +34,12 @@

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import scala.util.Either;

final class KafkaConsumerActor extends BaseConsumerActor {
Expand All @@ -64,10 +63,10 @@ private KafkaConsumerActor(final Connection connection,
enforcement != null
? newEnforcementFilterFactory(enforcement, newHeadersPlaceholder())
: input -> null;
final Supplier<KafkaMessageTransformer> kafkaMessageTransformerFactory =
() -> new KafkaMessageTransformer(source, sourceAddress, headerEnforcementFilterFactory,
inboundMonitor);
kafkaStream = new KafkaConsumerStream(factory, kafkaMessageTransformerFactory, dryRun);
final KafkaMessageTransformer kafkaMessageTransformer =
new KafkaMessageTransformer(source, sourceAddress, headerEnforcementFilterFactory, inboundMonitor);
kafkaStream = new KafkaConsumerStream(factory, kafkaMessageTransformer, dryRun,
Materializer.createMaterializer(this::getContext));
}

static Props props(final Connection connection, final KafkaConnectionFactory factory, final String sourceAddress,
Expand Down Expand Up @@ -126,59 +125,40 @@ private GracefulStop() {

private class KafkaConsumerStream {

private static final String BRANCH_PREFIX = "TransformationResult-";
private static final String MESSAGE_TRANSFORMER = "messageTransformer";
private static final String ERROR_FORWARDER = "errorForwarder";
private static final String MESSAGE_FORWARDER = "messageForwarder";
private static final String MESSAGE_DROPPER = "messageDropper";
private final Duration closeTimeout = Duration.ofSeconds(10);
private final Supplier<KafkaStreams> kafkaStreamsSupplier;
private KafkaStreams kafkaStreams;

private final RunnableGraph<Consumer.Control> runnableKafkaStream;
private final Materializer materializer;
private Consumer.Control kafkaStream;

private KafkaConsumerStream(final KafkaConnectionFactory factory,
final Supplier<KafkaMessageTransformer> kafkaMessageTransformerFactory,
final boolean dryRun) {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
if (!dryRun) {
// TODO: kafka source - Implement rate limiting/throttling
final Map<String, KStream<String, Either<ExternalMessage, DittoRuntimeException>>> branches =
streamsBuilder.<String, String>stream(sourceAddress)
.filter((key, value) -> key != null && value != null)
.transform(kafkaMessageTransformerFactory::get, Named.as(MESSAGE_TRANSFORMER))
.split(Named.as(BRANCH_PREFIX))
.branch(this::isExternalMessage, Branched.as(MESSAGE_FORWARDER))
.branch(this::isDittoRuntimeException, Branched.as(ERROR_FORWARDER))
.defaultBranch(Branched.as(MESSAGE_DROPPER));

branches.get(BRANCH_PREFIX + MESSAGE_FORWARDER)
.map(this::extractExternalMessage)
.foreach(this::forwardExternalMessage);

branches.get(BRANCH_PREFIX + ERROR_FORWARDER)
.map(this::extractDittoRuntimeException)
.foreach(this::forwardDittoRuntimeException);

branches.get(BRANCH_PREFIX + MESSAGE_DROPPER).foreach((key, value) -> inboundMonitor.exception(
"Got unexpected message <{0}>. This is an internal error. Please contact the service team",
value
));
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun, //TODO: handle dry run
final Materializer materializer) {

this.materializer = materializer;
runnableKafkaStream = Consumer.plainSource(null, Subscriptions.topics(
sourceAddress)) //TODO: replace "null" with actual ConsumerSettings. See https://akka.io/alpakka-samples/kafka-to-websocket-clients/example.html#subscribe-to-the-kafka-topic for an example
.throttle(100, Duration.ofSeconds(1)) //TODO: make this configurable
.filter((consumerRecord) -> consumerRecord.value() != null)
.map(kafkaMessageTransformer::transform) //TODO: ensure serialisation works correctly. The record is now of type <Object,Object> and no longer <String,String>
.divertTo(this.externalMessageSink(), this::isExternalMessage)
.divertTo(this.dittoRuntimeExceptionSink(), this::isDittoRuntimeException)
.to(this.unexpectedMessageSink());
}

}
kafkaStreamsSupplier = () -> new KafkaStreams(streamsBuilder.build(), factory.consumerStreamProperties());
private Sink<Either<ExternalMessage, DittoRuntimeException>, ?> externalMessageSink() {
return Flow.fromFunction(this::extractExternalMessage)
.to(Sink.foreach(this::forwardExternalMessage));
}

private boolean isExternalMessage(final String key,
final Either<ExternalMessage, DittoRuntimeException> value) {
private boolean isExternalMessage(final Either<ExternalMessage, DittoRuntimeException> value) {
return value.isLeft();
}

private KeyValue<String, ExternalMessage> extractExternalMessage(final String key,
final Either<ExternalMessage, DittoRuntimeException> value) {
return new KeyValue<>(key, value.left().get());
private ExternalMessage extractExternalMessage(final Either<ExternalMessage, DittoRuntimeException> value) {
return value.left().get();
}

private void forwardExternalMessage(final String key, final ExternalMessage value) {
private void forwardExternalMessage(final ExternalMessage value) {
inboundMonitor.success(value);
forwardToMappingActor(value,
() -> {
Expand All @@ -189,34 +169,43 @@ private void forwardExternalMessage(final String key, final ExternalMessage valu
});
}

private boolean isDittoRuntimeException(final String key,
final Either<ExternalMessage, DittoRuntimeException> value) {
private Sink<Either<ExternalMessage, DittoRuntimeException>, ?> dittoRuntimeExceptionSink() {
return Flow.fromFunction(this::extractDittoRuntimeException)
.to(Sink.foreach(this::forwardDittoRuntimeException));
}

private boolean isDittoRuntimeException(final Either<ExternalMessage, DittoRuntimeException> value) {
return value.isRight();
}

private KeyValue<String, DittoRuntimeException> extractDittoRuntimeException(final String key,
private DittoRuntimeException extractDittoRuntimeException(
final Either<ExternalMessage, DittoRuntimeException> value) {
return new KeyValue<>(key, value.right().get());
return value.right().get();
}

private void forwardDittoRuntimeException(final String key, final DittoRuntimeException value) {
private void forwardDittoRuntimeException(final DittoRuntimeException value) {
inboundMonitor.failure(value.getDittoHeaders(), value);
forwardToMappingActor(value);
}

private Sink<Either<ExternalMessage, DittoRuntimeException>, ?> unexpectedMessageSink() {
return Sink.foreach(either -> inboundMonitor.exception(
"Got unexpected transformation result <{0}>. This is an internal error. " +
"Please contact the service team", either
));
}

private void start() throws IllegalStateException, StreamsException {
if (kafkaStreams != null) {
if (kafkaStream != null) {
stop();
}
kafkaStreams = kafkaStreamsSupplier.get();
kafkaStreams.start();
kafkaStream = runnableKafkaStream.run(materializer);
}

private void stop() {
if (kafkaStreams != null) {
kafkaStreams.close(closeTimeout);
kafkaStreams.cleanUp();
kafkaStreams = null;
if (kafkaStream != null) {
kafkaStream.shutdown();
kafkaStream = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -41,19 +40,15 @@
import scala.util.Left;
import scala.util.Right;


@NotThreadSafe
final class KafkaMessageTransformer
implements Transformer<String, String, KeyValue<String, Either<ExternalMessage, DittoRuntimeException>>> {
@Immutable
final class KafkaMessageTransformer {

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(KafkaMessageTransformer.class);
private final Source source;
private final String sourceAddress;
private final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory;
private final ConnectionMonitor inboundMonitor;

private ProcessorContext context;

public KafkaMessageTransformer(final Source source, final String sourceAddress,
final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory,
final ConnectionMonitor inboundMonitor) {
Expand All @@ -63,25 +58,22 @@ public KafkaMessageTransformer(final Source source, final String sourceAddress,
this.inboundMonitor = inboundMonitor;
}

@Override
public void init(final ProcessorContext context) {
this.context = context;
}

/**
* Takes incoming kafka messages and transforms them to an {@link ExternalMessage}.
* Takes incoming kafka record and transforms the value to an {@link ExternalMessage}.
*
* @param key the key of the kafka record.
* @param value the value (the message) of the kafka record.
* @return A {@link KeyValue} holding a String key and a value that is either an {@link ExternalMessage} in case the
* @param record the kafka record.
* @return A value that is either an {@link ExternalMessage} in case the
* transformation succeeded, or a {@link DittoRuntimeException} if it failed. Could also be null if an unexpected
* Exception occurred which should result in the message being dropped as automated recovery is expected.
*/
@Override
@Nullable
public KeyValue<String, Either<ExternalMessage, DittoRuntimeException>> transform(String key, String value) {
public Either<ExternalMessage, DittoRuntimeException> transform(ConsumerRecord<Object, Object> record) {

final Map<String, String> messageHeaders = getHeadersFromContext();
final String key = record.key().toString();
final String value = record.value().toString();
final Map<String, String> messageHeaders = StreamSupport.stream(record.headers().spliterator(), false)
.collect(Collectors.toMap(Header::key, header -> new String(header.value())));
final String correlationId = messageHeaders
.getOrDefault(DittoHeaderDefinition.CORRELATION_ID.getKey(), UUID.randomUUID().toString());
try {
Expand All @@ -105,12 +97,13 @@ public KeyValue<String, Either<ExternalMessage, DittoRuntimeException>> transfor
.build();

inboundMonitor.success(externalMessage);
return KeyValue.pair(key, new Left<>(externalMessage));

return new Left<>(externalMessage);
} catch (final DittoRuntimeException e) {
LOGGER.withCorrelationId(e)
.info("Got DittoRuntimeException '{}' when command was parsed: {}", e.getErrorCode(),
e.getMessage());
return KeyValue.pair(key, new Right<>(e.setDittoHeaders(DittoHeaders.of(messageHeaders))));
return new Right<>(e.setDittoHeaders(DittoHeaders.of(messageHeaders)));
} catch (final Exception e) {
inboundMonitor.exception(messageHeaders, e);
LOGGER.withCorrelationId(correlationId)
Expand All @@ -119,17 +112,4 @@ public KeyValue<String, Either<ExternalMessage, DittoRuntimeException>> transfor
}
}

private Map<String, String> getHeadersFromContext() {
return Optional.ofNullable(context)
.map(ProcessorContext::headers)
.map(headers -> StreamSupport.stream(headers.spliterator(), false)
.collect(Collectors.toMap(Header::key, header -> new String(header.value()))))
.orElseGet(Map::of);
}

@Override
public void close() {
this.context = null;
}

}
Loading

0 comments on commit 584b26b

Please sign in to comment.