Skip to content

Commit

Permalink
extract kafka topic, key and timestamp from a received record and add…
Browse files Browse the repository at this point in the history
… them to the headers to make them available in payload and header mappings, implemented AbstractConsumerActorTest for Kafka, extracted source supplier from KafkaConsumerActor for better testability

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Jul 20, 2021
1 parent 4a5a89c commit 5d8e357
Show file tree
Hide file tree
Showing 14 changed files with 508 additions and 152 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.Source;

/**
* Default implementation of {@code KafkaConsumerSourceProvider}.
*/
public class DefaultKafkaConsumerSourceSupplier implements KafkaConsumerSourceSupplier {

final PropertiesFactory propertiesFactory;
final String sourceAddress;
final boolean dryRun;

DefaultKafkaConsumerSourceSupplier(
final PropertiesFactory propertiesFactory, final String sourceAddress, final boolean dryRun) {
this.propertiesFactory = propertiesFactory;
this.sourceAddress = sourceAddress;
this.dryRun = dryRun;
}

@Override
public Source<ConsumerRecord<String, String>, Consumer.Control> get() {
final ConsumerSettings<String, String> consumerSettings = propertiesFactory.getConsumerSettings(dryRun);
final AutoSubscription subscription = Subscriptions.topics(sourceAddress);
return Consumer.plainSource(consumerSettings, subscription);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,10 @@ private Stream<ConsumerData> consumerDataFromSource(final Source source) {
}

private void startKafkaConsumer(final ConsumerData consumerData, final boolean dryRun) {
final DefaultKafkaConsumerSourceSupplier sourceSupplier =
new DefaultKafkaConsumerSourceSupplier(propertiesFactory, consumerData.getAddress(), dryRun);
final Props consumerActorProps =
KafkaConsumerActor.props(connection(), kafkaConfig.getConsumerConfig(), propertiesFactory,
KafkaConsumerActor.props(connection(), kafkaConfig.getConsumerConfig(), sourceSupplier,
consumerData.getAddress(), getInboundMappingSink(), consumerData.getSource(), dryRun);
final ActorRef consumerActor =
startChildActorConflictFree(consumerData.getActorNamePrefix(), consumerActorProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
Expand All @@ -72,30 +70,30 @@ final class KafkaConsumerActor extends BaseConsumerActor {

@SuppressWarnings("unused")
private KafkaConsumerActor(final Connection connection,
final KafkaConsumerConfig kafkaConfig, final PropertiesFactory propertiesFactory,
final KafkaConsumerConfig kafkaConfig,
final KafkaConsumerSourceSupplier sourceSupplier,
final String sourceAddress, final Sink<Object, NotUsed> inboundMappingSink,
final Source source, final boolean dryRun) {
super(connection, sourceAddress, inboundMappingSink, source);

log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);

final ConsumerSettings<String, String> consumerSettings = propertiesFactory.getConsumerSettings(dryRun);
final Enforcement enforcement = source.getEnforcement().orElse(null);
final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory =
enforcement != null
? newEnforcementFilterFactory(enforcement, newHeadersPlaceholder())
: input -> null;
final KafkaMessageTransformer kafkaMessageTransformer =
new KafkaMessageTransformer(source, sourceAddress, headerEnforcementFilterFactory, inboundMonitor);
kafkaStream = new KafkaConsumerStream(kafkaConfig, consumerSettings, kafkaMessageTransformer, dryRun,
kafkaStream = new KafkaConsumerStream(kafkaConfig, sourceSupplier, kafkaMessageTransformer, dryRun,
Materializer.createMaterializer(this::getContext));
}

static Props props(final Connection connection, final KafkaConsumerConfig kafkaConfig,
final PropertiesFactory factory,
final String sourceAddress, final Sink<Object, NotUsed> inboundMappingSink, final Source source,
final KafkaConsumerSourceSupplier sourceSupplier, final String sourceAddress,
final Sink<Object, NotUsed> inboundMappingSink, final Source source,
final boolean dryRun) {
return Props.create(KafkaConsumerActor.class, connection, kafkaConfig, factory, sourceAddress,
return Props.create(KafkaConsumerActor.class, connection, kafkaConfig, sourceSupplier, sourceAddress,
inboundMappingSink, source, dryRun);
}

Expand Down Expand Up @@ -156,13 +154,13 @@ private final class KafkaConsumerStream {

private KafkaConsumerStream(
final KafkaConsumerConfig kafkaConfig,
final ConsumerSettings<String, String> consumerSettings,
final KafkaConsumerSourceSupplier sourceSupplier,
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun,
final Materializer materializer) {

this.materializer = materializer;
runnableKafkaStream = Consumer.plainSource(consumerSettings, Subscriptions.topics(sourceAddress))
runnableKafkaStream = sourceSupplier.get()
.throttle(kafkaConfig.getThrottlingConfig().getLimit(),
kafkaConfig.getThrottlingConfig().getInterval())
.filter(consumerRecord -> isNotDryRun(consumerRecord, dryRun))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;


import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.Source;

/**
* Supplier of a {@code Source<ConsumerRecord<String, String>, Consumer.Control>} used by {@code KafkaConsumerActor}
* to consume messages from a kafka topic.
*/
@FunctionalInterface
public interface KafkaConsumerSourceSupplier
extends Supplier<Source<ConsumerRecord<String, String>, Consumer.Control>> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import static java.util.Optional.ofNullable;

import java.util.Optional;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* Defines headers that are extracted from a consumed {@code ConsumerRecord} and made available to payload and/or
* header mappings.
*/
enum KafkaHeader implements Function<ConsumerRecord<String, String>, Optional<String>> {

/**
* The topic the record is received from.
*/
KAFKA_TOPIC("kafka.topic", ConsumerRecord::topic),
/**
* The timestamp of the received record.
*/
KAFKA_TIMESTAMP("kafka.timestamp", record -> Long.toString(record.timestamp())),
/**
* The key of the received record (or null if not specified).
*/
KAFKA_KEY("kafka.key", ConsumerRecord::key);

private final String name;
private final Function<ConsumerRecord<String, String>, String> extractor;

/**
* @param name the header name to be used in source header mappings
*/
KafkaHeader(final String name,
final Function<ConsumerRecord<String, String>, String> extractor) {
this.name = name;
this.extractor = extractor;
}

/**
* @return the header name
*/
public String getName() {
return name;
}

@Override
public Optional<String> apply(final ConsumerRecord<String, String> consumerRecord) {
return ofNullable(extractor.apply(consumerRecord));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -124,6 +125,12 @@ private Map<String, String> extractMessageHeaders(final ConsumerRecord<String, S
if (!messageHeaders.containsKey(DittoHeaderDefinition.CORRELATION_ID.getKey())) {
messageHeaders.put(DittoHeaderDefinition.CORRELATION_ID.getKey(), UUID.randomUUID().toString());
}

// add properties from consumer record to headers to make them available in payload/header mappings
Arrays.stream(KafkaHeader.values())
.forEach(kafkaHeader -> kafkaHeader.apply(consumerRecord)
.ifPresent(property -> messageHeaders.put(kafkaHeader.getName(), property)));

return messageHeaders;
}

Expand Down
Loading

0 comments on commit 5d8e357

Please sign in to comment.