Skip to content

Commit

Permalink
Switch to ByteSerializer and ByteDeserializer for Kafka Consumer and …
Browse files Browse the repository at this point in the history
…Publisher

* This allows us to ensure that bytes will not be modified based on charset

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Nov 23, 2021
1 parent 727087b commit c83aec1
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private KafkaAcknowledgableMessage toAcknowledgeableMessage(final CommittableTra
return new KafkaAcknowledgableMessage(externalMessage, committableOffset, ackMonitor);
}

private static boolean isNotDryRun(final ConsumerRecord<String, String> cRecord, final boolean dryRun) {
private static boolean isNotDryRun(final ConsumerRecord<String, byte[]> cRecord, final boolean dryRun) {
if (dryRun && LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropping record (key: {}, topic: {}, partition: {}, offset: {}) in dry run mode.",
cRecord.key(), cRecord.topic(), cRecord.partition(), cRecord.offset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* {@link akka.kafka.javadsl.Consumer.Control} in order to be able to shutdown/terminate Kafka consumption.
*/
class AtLeastOnceKafkaConsumerSourceSupplier
implements Supplier<Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>> {
implements Supplier<Source<ConsumerMessage.CommittableMessage<String, byte[]>, Consumer.Control>> {

private final PropertiesFactory propertiesFactory;
private final String sourceAddress;
Expand All @@ -42,8 +42,8 @@ class AtLeastOnceKafkaConsumerSourceSupplier
}

@Override
public Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> get() {
final ConsumerSettings<String, String> consumerSettings = propertiesFactory.getConsumerSettings(dryRun)
public Source<ConsumerMessage.CommittableMessage<String, byte[]>, Consumer.Control> get() {
final ConsumerSettings<String, byte[]> consumerSettings = propertiesFactory.getConsumerSettings(dryRun)
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
final AutoSubscription subscription = Subscriptions.topics(sourceAddress);
return Consumer.committableSource(consumerSettings, subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private static boolean isExternalMessage(final TransformationResult value) {
return value.getExternalMessage().isPresent();
}

private static boolean isNotDryRun(final ConsumerRecord<String, String> cRecord, final boolean dryRun) {
private static boolean isNotDryRun(final ConsumerRecord<String, byte[]> cRecord, final boolean dryRun) {
if (dryRun && LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropping record (key: {}, topic: {}, partition: {}, offset: {}) in dry run mode.",
cRecord.key(), cRecord.topic(), cRecord.partition(), cRecord.offset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* {@link akka.kafka.javadsl.Consumer.Control} in order to be able to shutdown/terminate Kafka consumption.
*/
class AtMostOnceKafkaConsumerSourceSupplier
implements Supplier<Source<ConsumerRecord<String, String>, Consumer.Control>> {
implements Supplier<Source<ConsumerRecord<String, byte[]>, Consumer.Control>> {

private final PropertiesFactory propertiesFactory;
private final String sourceAddress;
Expand All @@ -41,8 +41,8 @@ class AtMostOnceKafkaConsumerSourceSupplier
}

@Override
public Source<ConsumerRecord<String, String>, Consumer.Control> get() {
final ConsumerSettings<String, String> consumerSettings = propertiesFactory.getConsumerSettings(dryRun);
public Source<ConsumerRecord<String, byte[]>, Consumer.Control> get() {
final ConsumerSettings<String, byte[]> consumerSettings = propertiesFactory.getConsumerSettings(dryRun);
final AutoSubscription subscription = Subscriptions.topics(sourceAddress);
return Consumer.plainSource(consumerSettings, subscription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
*/
final class DefaultSendProducerFactory implements SendProducerFactory {

private final ProducerSettings<String, String> producerSettings;
private final ProducerSettings<String, byte[]> producerSettings;
private final ActorSystem actorSystem;

private DefaultSendProducerFactory(final ProducerSettings<String, String> producerSettings,
private DefaultSendProducerFactory(final ProducerSettings<String, byte[]> producerSettings,
final ActorSystem actorSystem) {

this.producerSettings = producerSettings;
Expand All @@ -38,14 +38,14 @@ private DefaultSendProducerFactory(final ProducerSettings<String, String> produc
* @param actorSystem the actor system
* @return a Kafka SendProducerFactory.
*/
static DefaultSendProducerFactory getInstance(final ProducerSettings<String, String> producerSettings,
static DefaultSendProducerFactory getInstance(final ProducerSettings<String, byte[]> producerSettings,
final ActorSystem actorSystem) {

return new DefaultSendProducerFactory(producerSettings, actorSystem);
}

@Override
public SendProducer<String, String> newSendProducer() {
public SendProducer<String, byte[]> newSendProducer() {
return new SendProducer<>(producerSettings, actorSystem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* Defines headers that are extracted from a consumed {@code ConsumerRecord} and made available to payload and/or
* header mappings.
*/
enum KafkaHeader implements Function<ConsumerRecord<String, String>, Optional<String>> {
enum KafkaHeader implements Function<ConsumerRecord<String, byte[]>, Optional<String>> {

/**
* The topic the record is received from.
Expand All @@ -39,13 +39,13 @@ enum KafkaHeader implements Function<ConsumerRecord<String, String>, Optional<St
KAFKA_KEY("kafka.key", ConsumerRecord::key);

private final String name;
private final Function<ConsumerRecord<String, String>, String> extractor;
private final Function<ConsumerRecord<String, byte[]>, String> extractor;

/**
* @param name the header name to be used in source header mappings
*/
KafkaHeader(final String name,
final Function<ConsumerRecord<String, String>, String> extractor) {
final Function<ConsumerRecord<String, byte[]>, String> extractor) {
this.name = name;
this.extractor = extractor;
}
Expand All @@ -58,7 +58,7 @@ public String getName() {
}

@Override
public Optional<String> apply(final ConsumerRecord<String, String> consumerRecord) {
public Optional<String> apply(final ConsumerRecord<String, byte[]> consumerRecord) {
return ofNullable(extractor.apply(consumerRecord));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -72,7 +74,7 @@ final class KafkaMessageTransformer {
*/
@Nullable
public CommittableTransformationResult transform(
final ConsumerMessage.CommittableMessage<String, String> committableMessage) {
final ConsumerMessage.CommittableMessage<String, byte[]> committableMessage) {

final TransformationResult result = transform(committableMessage.record());
if (result == null) {
Expand All @@ -92,7 +94,7 @@ public CommittableTransformationResult transform(
* automated recovery is expected.
*/
@Nullable
public TransformationResult transform(final ConsumerRecord<String, String> consumerRecord) {
public TransformationResult transform(final ConsumerRecord<String, byte[]> consumerRecord) {

LOGGER.trace("Received record from kafka: {}", consumerRecord);

Expand All @@ -105,15 +107,15 @@ public TransformationResult transform(final ConsumerRecord<String, String> consu

try {
final String key = consumerRecord.key();
final String value = consumerRecord.value();
final byte[] value = consumerRecord.value();
final DittoLogger correlationIdScopedLogger = LOGGER.withCorrelationId(correlationId);
correlationIdScopedLogger.debug(
"Transforming incoming kafka message <{}> with headers <{}> and key <{}>.",
value, messageHeaders, key
);

final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(messageHeaders)
.withTextAndBytes(value, value == null ? null : value.getBytes())
.withTextAndBytes(value == null ? null : new String(value, StandardCharsets.UTF_8), value)
.withAuthorizationContext(source.getAuthorizationContext())
.withEnforcement(headerEnforcementFilterFactory.getFilter(messageHeaders))
.withHeaderMapping(source.getHeaderMapping())
Expand Down Expand Up @@ -144,7 +146,7 @@ public TransformationResult transform(final ConsumerRecord<String, String> consu

}

private Map<String, String> extractMessageHeaders(final ConsumerRecord<String, String> consumerRecord) {
private Map<String, String> extractMessageHeaders(final ConsumerRecord<String, byte[]> consumerRecord) {
final Map<String, String> messageHeaders = new HashMap<>();
for (final Header header : consumerRecord.headers()) {
if (messageHeaders.put(header.key(), new String(header.value())) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -73,7 +75,6 @@
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
import akka.util.ByteString;

/**
* Responsible for publishing {@link org.eclipse.ditto.connectivity.api.ExternalMessage}s into an Kafka
Expand All @@ -93,7 +94,7 @@ private KafkaPublisherActor(final Connection connection,
final String clientId,
final ConnectivityStatusResolver connectivityStatusResolver,

final ConnectivityConfig connectivityConfig) {
final ConnectivityConfig connectivityConfig) {
super(connection, clientId, connectivityStatusResolver, connectivityConfig);
this.dryRun = dryRun;
final Materializer materializer = Materializer.createMaterializer(this::getContext);
Expand Down Expand Up @@ -311,20 +312,20 @@ final class KafkaProducerStream {
"b) The client count of this connection is not configured high enough.";

private final KillSwitch killSwitch;
private final SourceQueueWithComplete<ProducerMessage.Envelope<String, String, CompletableFuture<RecordMetadata>>>
private final SourceQueueWithComplete<ProducerMessage.Envelope<String, byte[], CompletableFuture<RecordMetadata>>>
sourceQueue;
private final AtomicReference<SendProducer<String, String>> sendProducer = new AtomicReference<>();
private final AtomicReference<SendProducer<String, byte[]>> sendProducer = new AtomicReference<>();

private KafkaProducerStream(final KafkaProducerConfig config, final Materializer materializer,
final SendProducerFactory producerFactory) {

final RestartSettings restartSettings =
RestartSettings.create(config.getMinBackoff(),config.getMaxBackoff(), config.getRandomFactor())
RestartSettings.create(config.getMinBackoff(), config.getMaxBackoff(), config.getRandomFactor())
.withMaxRestarts(config.getMaxRestartsCount(), config.getMaxRestartsWithin());

final Pair<SourceQueueWithComplete<ProducerMessage.Envelope<String, String, CompletableFuture<RecordMetadata>>>, Source<ProducerMessage.Envelope<String, String, CompletableFuture<RecordMetadata>>, NotUsed>>
final Pair<SourceQueueWithComplete<ProducerMessage.Envelope<String, byte[], CompletableFuture<RecordMetadata>>>, Source<ProducerMessage.Envelope<String, byte[], CompletableFuture<RecordMetadata>>, NotUsed>>
sourcePair =
Source.<ProducerMessage.Envelope<String, String, CompletableFuture<RecordMetadata>>>
Source.<ProducerMessage.Envelope<String, byte[], CompletableFuture<RecordMetadata>>>
queue(config.getQueueSize(), OverflowStrategy.dropNew()).preMaterialize(materializer);

sourceQueue = sourcePair.first();
Expand All @@ -345,12 +346,12 @@ private KafkaProducerStream(final KafkaProducerConfig config, final Materializer
}

private void handleSendResult(
@Nullable final ProducerMessage.Results<String, String, CompletableFuture<RecordMetadata>> results,
@Nullable final ProducerMessage.Results<String, byte[], CompletableFuture<RecordMetadata>> results,
@Nullable final Throwable exception, final CompletableFuture<RecordMetadata> resultFuture) {
if (exception == null) {
if (results instanceof ProducerMessage.Result) {
final ProducerMessage.Result<String, String, CompletableFuture<RecordMetadata>> result =
(ProducerMessage.Result<String, String, CompletableFuture<RecordMetadata>>) results;
final ProducerMessage.Result<String, byte[], CompletableFuture<RecordMetadata>> result =
(ProducerMessage.Result<String, byte[], CompletableFuture<RecordMetadata>>) results;
resultFuture.complete(result.metadata());
} else {
// should never happen, we provide only ProducerMessage.single to the source
Expand All @@ -370,8 +371,8 @@ private CompletableFuture<RecordMetadata> publish(final KafkaPublishTarget publi
final ExternalMessage externalMessage) {

final CompletableFuture<RecordMetadata> resultFuture = new CompletableFuture<>();
final ProducerRecord<String, String> producerRecord = getProducerRecord(publishTarget, externalMessage);
final ProducerMessage.Envelope<String, String, CompletableFuture<RecordMetadata>> envelope =
final ProducerRecord<String, byte[]> producerRecord = getProducerRecord(publishTarget, externalMessage);
final ProducerMessage.Envelope<String, byte[], CompletableFuture<RecordMetadata>> envelope =
ProducerMessage.single(producerRecord, resultFuture);
if (null != sourceQueue) {
sourceQueue.offer(envelope).whenComplete(handleQueueOfferResult(externalMessage, resultFuture));
Expand All @@ -383,10 +384,10 @@ private CompletableFuture<RecordMetadata> publish(final KafkaPublishTarget publi
return resultFuture;
}

private ProducerRecord<String, String> getProducerRecord(final KafkaPublishTarget publishTarget,
private ProducerRecord<String, byte[]> getProducerRecord(final KafkaPublishTarget publishTarget,
final ExternalMessage externalMessage) {

final String payload = mapExternalMessagePayload(externalMessage);
final byte[] payload = mapExternalMessagePayload(externalMessage);
final Iterable<Header> headers = mapExternalMessageHeaders(externalMessage);

return new ProducerRecord<>(publishTarget.getTopic(),
Expand All @@ -404,16 +405,18 @@ private Iterable<Header> mapExternalMessageHeaders(final ExternalMessage externa
.collect(Collectors.toList());
}

private String mapExternalMessagePayload(final ExternalMessage externalMessage) {
if (externalMessage.isTextMessage()) {
return externalMessage.getTextPayload().orElse("");
} else if (externalMessage.isBytesMessage()) {
private byte[] mapExternalMessagePayload(final ExternalMessage externalMessage) {
if (externalMessage.isBytesMessage()) {
return externalMessage.getBytePayload()
.map(ByteString::fromByteBuffer)
.map(ByteString::utf8String)
.orElse("");
.map(ByteBuffer::array)
.orElseGet(() -> new byte[0]);
} else if (externalMessage.isTextMessage()) {
final Charset charset = getCharsetFromMessage(externalMessage);
return externalMessage.getTextPayload()
.map(text -> text.getBytes(charset))
.orElseGet(() -> new byte[0]);
} else {
return "";
return new byte[0];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -83,12 +85,12 @@ static PropertiesFactory newInstance(final Connection connection, final KafkaCon
*
* @return the settings.
*/
ConsumerSettings<String, String> getConsumerSettings(final boolean dryRun) {
ConsumerSettings<String, byte[]> getConsumerSettings(final boolean dryRun) {
final Config alpakkaConfig = this.config.getConsumerConfig().getAlpakkaConfig();
final ConnectionCheckerSettings connectionCheckerSettings =
ConnectionCheckerSettings.apply(alpakkaConfig.getConfig("connection-checker"));
final ConsumerSettings<String, String> consumerSettings =
ConsumerSettings.apply(alpakkaConfig, new StringDeserializer(), new StringDeserializer())
final ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.apply(alpakkaConfig, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(connection.getId().toString())
.withClientId(clientId + "-consumer")
Expand All @@ -106,9 +108,9 @@ CommitterSettings getCommitterSettings() {
return CommitterSettings.apply(committerConfig);
}

ProducerSettings<String, String> getProducerSettings() {
ProducerSettings<String, byte[]> getProducerSettings() {
final Config alpakkaConfig = this.config.getProducerConfig().getAlpakkaConfig();
return ProducerSettings.apply(alpakkaConfig, new StringSerializer(), new StringSerializer())
return ProducerSettings.apply(alpakkaConfig, new StringSerializer(), new ByteArraySerializer())
.withBootstrapServers(bootstrapServers)
.withProperties(getClientIdProperties())
.withProperties(getProducerSpecificConfigProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ interface SendProducerFactory {
*
* @return the producer.
*/
SendProducer<String, String> newSendProducer();
SendProducer<String, byte[]> newSendProducer();

}

0 comments on commit c83aec1

Please sign in to comment.