From 59214448c4aee579dda2c0ea676a365a682cf7b9 Mon Sep 17 00:00:00 2001 From: Yannic Klem Date: Thu, 26 Aug 2021 09:24:27 +0200 Subject: [PATCH] Remove MessageSequentializer as it's not needed * mapAsync already guarantees that the elements will be passed downstream in the order they were received from upstream, no matter when the futures complete Signed-off-by: Yannic Klem --- .../kafka/AtLeastOnceConsumerStream.java | 23 ------------------- .../kafka/KafkaAcknowledgableMessage.java | 12 ---------- 2 files changed, 35 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java index ab0eb1c1d6..5fc3c9be10 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/AtLeastOnceConsumerStream.java @@ -13,7 +13,6 @@ package org.eclipse.ditto.connectivity.service.messaging.kafka; import java.time.Instant; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -32,12 +31,10 @@ import akka.Done; import akka.NotUsed; -import akka.japi.function.Function; import akka.kafka.CommitterSettings; import akka.kafka.ConsumerMessage; import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; -import akka.stream.Attributes; import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; @@ -100,7 +97,6 @@ public void stop() { private Sink externalMessageSink( final Sink inboundMappingSink) { return Flow.fromFunction(this::toAcknowledgeableMessage) - .statefulMapConcat(MessageSequentializer::new) .alsoTo(committerSink()) .map(KafkaAcknowledgableMessage::getAcknowledgeableMessage) .to(inboundMappingSink); @@ -173,23 +169,4 @@ private Sink> unexpectedM )); } - private static final class MessageSequentializer implements - Function> { - - private transient CompletableFuture last; - - private MessageSequentializer() { - last = new CompletableFuture<>(); - last.complete(null); - } - - @Override - public Iterable apply(final KafkaAcknowledgableMessage kafkaAcknowledgableMessage) { - final KafkaAcknowledgableMessage sequentialized = kafkaAcknowledgableMessage.commitAfter(last); - last = sequentialized.getAcknowledgementFuture(); - return List.of(sequentialized); - } - - } - } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaAcknowledgableMessage.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaAcknowledgableMessage.java index 88791a4207..8af7ebc9ca 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaAcknowledgableMessage.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaAcknowledgableMessage.java @@ -38,12 +38,6 @@ final class KafkaAcknowledgableMessage { }); } - private KafkaAcknowledgableMessage(final AcknowledgeableMessage acknowledgeableMessage, - final CompletableFuture acknowledgementFuture) { - this.acknowledgementFuture = acknowledgementFuture; - this.acknowledgeableMessage = acknowledgeableMessage; - } - AcknowledgeableMessage getAcknowledgeableMessage() { return acknowledgeableMessage; } @@ -52,10 +46,4 @@ CompletableFuture getAcknowledgementFuture() return acknowledgementFuture; } - KafkaAcknowledgableMessage commitAfter(final CompletableFuture precedingFuture) { - final CompletableFuture chainedFuture = - precedingFuture.thenCompose(result -> acknowledgementFuture); - return new KafkaAcknowledgableMessage(acknowledgeableMessage, chainedFuture); - } - }