Skip to content

Commit

Permalink
Restructured AtLeastOnceConsumerStream
Browse files Browse the repository at this point in the history
* Now all consumed records are forwarded to the committer sink
* Set stop-timeout for consumers to 0s because we're now using the
  the DrainingControl to stop the stream. The documentation states:
  The ConsumerSettings stop-timeout delays stopping the Kafk Consumer and
  the stream, but when using drainAndShutdown that delay is not required
  and can be set to zero.

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Sep 2, 2021
1 parent f0735f8 commit 72892f5
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import akka.Done;
import akka.NotUsed;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerMessage.CommittableOffset;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

/**
* Kafka consumer stream with "at least once" (QoS 1) semantics.
Expand All @@ -42,17 +44,17 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {

private static final Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(AtLeastOnceConsumerStream.class);

private final akka.stream.javadsl.Source<CommittableTransformationResult, Consumer.Control> runnableKafkaStream;
private final RunnableGraph<Consumer.DrainingControl<Done>> runnableKafkaStream;
private final ConnectionMonitor inboundMonitor;
private final Materializer materializer;
private final CommitterSettings committerSettings;
private final int consumerMaxInflight;
@Nullable private Consumer.Control consumerControl;
private final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink;
private final Sink<DittoRuntimeException, ?> dreSink;
@Nullable private Consumer.DrainingControl<Done> consumerControl;

AtLeastOnceConsumerStream(
final AtLeastOnceKafkaConsumerSourceSupplier sourceSupplier,
final CommitterSettings committerSettings,
final int consumerMaxInInflight,
final int consumerMaxInflight,
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun,
final Materializer materializer,
Expand All @@ -61,60 +63,76 @@ final class AtLeastOnceConsumerStream implements KafkaConsumerStream {
final Sink<DittoRuntimeException, ?> dreSink) {

this.inboundMonitor = inboundMonitor;
this.inboundMappingSink = inboundMappingSink;
this.dreSink = dreSink;
this.materializer = materializer;
this.committerSettings = committerSettings;
this.consumerMaxInflight = consumerMaxInInflight;
runnableKafkaStream = sourceSupplier.get()
.filter(committableMessage -> isNotDryRun(committableMessage.record(), dryRun))
.filter(committableMessage -> committableMessage.record().value() != null)
.filter(committableMessage -> KafkaMessageTransformer.isNotExpired(committableMessage.record()))
.filter(committableMessage -> KafkaConsumerStream.isNotExpired(committableMessage.record()))
.map(kafkaMessageTransformer::transform)
.divertTo(this.externalMessageSink(inboundMappingSink), this::isExternalMessage)
.divertTo(this.dittoRuntimeExceptionSink(dreSink), this::isDittoRuntimeException);
.flatMapConcat(this::processTransformationResult)
.mapAsync(consumerMaxInflight, x -> x)
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl);
}

@Override
public CompletionStage<Done> start() throws IllegalStateException {
if (consumerControl != null) {
stop();
}
return runnableKafkaStream
.mapMaterializedValue(cc -> {
consumerControl = cc;
return cc;
})
.runWith(unexpectedMessageSink(), materializer);
consumerControl = runnableKafkaStream.run(materializer);
return consumerControl.streamCompletion();
}

@Override
public void stop() {
if (consumerControl != null) {
consumerControl.drainAndShutdown(new CompletableFuture<>(), materializer.executionContext());
consumerControl.drainAndShutdown(materializer.executionContext());
consumerControl = null;
}
}

private Sink<CommittableTransformationResult, ?> externalMessageSink(
final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink) {
return Flow.fromFunction(this::toAcknowledgeableMessage)
.alsoTo(committerSink())
.map(KafkaAcknowledgableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink);
private Source<CompletableFuture<CommittableOffset>, NotUsed> processTransformationResult(
final CommittableTransformationResult result) {
if (isExternalMessage(result)) {
return Source.single(result)
.map(this::toAcknowledgeableMessage)
.alsoTo(this.externalMessageSink())
.map(KafkaAcknowledgableMessage::getAcknowledgementFuture);
}
/*
* For all other cases a retry for consuming this message makes no sense, so we want to commit these offsets.
* Therefore, we return an already completed future holding the offset to commit. No reject needed.
*/
final CompletableFuture<CommittableOffset> offsetFuture =
CompletableFuture.completedFuture(result.getCommittableOffset());
if (isDittoRuntimeException(result)) {
return Source.single(result)
.map(this::extractDittoRuntimeException)
.alsoTo(dreSink)
.map(dre -> offsetFuture);
}
return Source.single(result)
.alsoTo(unexpectedMessageSink())
.map(unexpected -> offsetFuture);
}

private Sink<KafkaAcknowledgableMessage, NotUsed> committerSink() {
private Sink<KafkaAcknowledgableMessage, NotUsed> externalMessageSink() {
return Flow.of(KafkaAcknowledgableMessage.class)
.mapAsync(consumerMaxInflight, KafkaAcknowledgableMessage::getAcknowledgementFuture)
.to(Committer.sink(committerSettings));
.map(KafkaAcknowledgableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink);
}

private boolean isExternalMessage(final CommittableTransformationResult transformationResult) {
return transformationResult.getTransformationResult().getExternalMessage().isPresent();
}

private KafkaAcknowledgableMessage toAcknowledgeableMessage(final CommittableTransformationResult value) {
final ExternalMessage externalMessage = value.getTransformationResult().getExternalMessage().orElseThrow(); // at this point, the ExternalMessage is present
final ConsumerMessage.CommittableOffset committableOffset = value.getCommittableOffset();
final ExternalMessage externalMessage = value.getTransformationResult()
.getExternalMessage()
.orElseThrow(); // at this point, the ExternalMessage is present
final CommittableOffset committableOffset = value.getCommittableOffset();
return new KafkaAcknowledgableMessage(externalMessage, committableOffset);
}

Expand All @@ -126,18 +144,14 @@ private boolean isNotDryRun(final ConsumerRecord<String, String> cRecord, final
return !dryRun;
}

private Sink<CommittableTransformationResult, ?> dittoRuntimeExceptionSink(
final Sink<DittoRuntimeException, ?> dreSink) {
return Flow.fromFunction(this::extractDittoRuntimeException)
.to(dreSink);
}

private boolean isDittoRuntimeException(final CommittableTransformationResult value) {
return value.getTransformationResult().getDittoRuntimeException().isPresent();
}

private DittoRuntimeException extractDittoRuntimeException(final CommittableTransformationResult value) {
return value.getTransformationResult().getDittoRuntimeException().orElseThrow(); // at this point, the DRE is present
return value.getTransformationResult()
.getDittoRuntimeException()
.orElseThrow(); // at this point, the DRE is present
}

private Sink<CommittableTransformationResult, CompletionStage<Done>> unexpectedMessageSink() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;
Expand All @@ -30,6 +29,7 @@
import akka.kafka.javadsl.Consumer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;

/**
Expand All @@ -39,10 +39,10 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {

private static final Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(AtMostOnceConsumerStream.class);

private final akka.stream.javadsl.Source<TransformationResult, Consumer.Control> runnableKafkaStream;
private final RunnableGraph<Consumer.DrainingControl<Done>> runnableKafkaStream;
private final ConnectionMonitor inboundMonitor;
private final Materializer materializer;
@Nullable private Consumer.Control consumerControl;
@Nullable private Consumer.DrainingControl<Done> consumerControl;

AtMostOnceConsumerStream(
final AtMostOnceKafkaConsumerSourceSupplier sourceSupplier,
Expand All @@ -58,30 +58,27 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
runnableKafkaStream = sourceSupplier.get()
.filter(consumerRecord -> isNotDryRun(consumerRecord, dryRun))
.filter(consumerRecord -> consumerRecord.value() != null)
.filter(KafkaMessageTransformer::isNotExpired)
.filter(KafkaConsumerStream::isNotExpired)
.map(kafkaMessageTransformer::transform)
.divertTo(this.externalMessageSink(externalMessageSink), this::isExternalMessage)
.divertTo(this.dittoRuntimeExceptionSink(dreSink), this::isDittoRuntimeException);
.divertTo(this.dittoRuntimeExceptionSink(dreSink), this::isDittoRuntimeException)
.toMat(unexpectedMessageSink(), Consumer::createDrainingControl);
}

@Override
public CompletionStage<Done> start() throws IllegalStateException {
if (consumerControl != null) {
stop();
}
return runnableKafkaStream
.mapMaterializedValue(cc -> {
consumerControl = cc;
return cc;
})
.runWith(unexpectedMessageSink(), materializer);
consumerControl = runnableKafkaStream.run(materializer);
return consumerControl.streamCompletion();
}


@Override
public void stop() {
if (consumerControl != null) {
consumerControl.drainAndShutdown(new CompletableFuture<>(), materializer.executionContext());
consumerControl.drainAndShutdown(materializer.executionContext());
consumerControl = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import akka.Done;

/**
Expand All @@ -33,4 +39,33 @@ interface KafkaConsumerStream {
*/
void stop();

/**
* Checks based on the Kafka headers {@code "creation-time"} and {@code "ttl"} (time to live) whether the processed
* record should be treated as expired message (and no longer processed as a result) or not.
*
* @param consumerRecord the Kafka record to check the headers for expiry in.
* @return whether the record/message is expired or not.
*/
static boolean isNotExpired(final ConsumerRecord<String, String> consumerRecord) {
final Headers headers = consumerRecord.headers();
final long now = Instant.now().toEpochMilli();
try {
final Optional<Long> creationTimeOptional = Optional.ofNullable(headers.lastHeader("creation-time"))
.map(Header::value)
.map(String::new)
.map(Long::parseLong);
final Optional<Long> ttlOptional = Optional.ofNullable(headers.lastHeader("ttl"))
.map(Header::value)
.map(String::new)
.map(Long::parseLong);
if (creationTimeOptional.isPresent() && ttlOptional.isPresent()) {
return now - creationTimeOptional.get() >= ttlOptional.get();
}
return true;
} catch (final Exception e) {
// Errors during reading/parsing headers should not cause the message to be dropped.
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -49,35 +46,6 @@ final class KafkaMessageTransformer {

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(KafkaMessageTransformer.class);

/**
* Checks based on the Kafka headers {@code "creation-time"} and {@code "ttl"} (time to live) whether the processed
* record should be treated as expired message (and no longer processed as a result) or not.
*
* @param consumerRecord the Kafka record to check the headers for expiry in.
* @return whether the record/message is expired or not.
*/
static boolean isNotExpired(final ConsumerRecord<String, String> consumerRecord) {
final Headers headers = consumerRecord.headers();
final long now = Instant.now().toEpochMilli();
try {
final Optional<Long> creationTimeOptional = Optional.ofNullable(headers.lastHeader("creation-time"))
.map(Header::value)
.map(String::new)
.map(Long::parseLong);
final Optional<Long> ttlOptional = Optional.ofNullable(headers.lastHeader("ttl"))
.map(Header::value)
.map(String::new)
.map(Long::parseLong);
if (creationTimeOptional.isPresent() && ttlOptional.isPresent()) {
return now - creationTimeOptional.get() >= ttlOptional.get();
}
return true;
} catch (final Exception e) {
// Errors during reading/parsing headers should not cause the message to be dropped.
return true;
}
}

private final Source source;
private final String sourceAddress;
private final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory;
Expand Down
1 change: 1 addition & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ ditto {
alpakka = {
use-dispatcher = "kafka-consumer-dispatcher"
poll-interval = 50ms
stop-timeout = 0s

connection-checker {
# Flag to turn on connection checker
Expand Down

0 comments on commit 72892f5

Please sign in to comment.