Skip to content

Commit

Permalink
Remove MessageSequentializer as it's not needed
Browse files Browse the repository at this point in the history
* mapAsync already guarantees that the elements will be passed downstream
  in the order they were received from upstream, no matter when the futures
  complete

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 26, 2021
1 parent b8f72b5 commit 5921444
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -32,12 +31,10 @@

import akka.Done;
import akka.NotUsed;
import akka.japi.function.Function;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerMessage;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.stream.Attributes;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
Expand Down Expand Up @@ -100,7 +97,6 @@ public void stop() {
private Sink<CommittableTransformationResult, ?> externalMessageSink(
final Sink<AcknowledgeableMessage, NotUsed> inboundMappingSink) {
return Flow.fromFunction(this::toAcknowledgeableMessage)
.statefulMapConcat(MessageSequentializer::new)
.alsoTo(committerSink())
.map(KafkaAcknowledgableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink);
Expand Down Expand Up @@ -173,23 +169,4 @@ private Sink<CommittableTransformationResult, CompletionStage<Done>> unexpectedM
));
}

private static final class MessageSequentializer implements
Function<KafkaAcknowledgableMessage, Iterable<KafkaAcknowledgableMessage>> {

private transient CompletableFuture<ConsumerMessage.CommittableOffset> last;

private MessageSequentializer() {
last = new CompletableFuture<>();
last.complete(null);
}

@Override
public Iterable<KafkaAcknowledgableMessage> apply(final KafkaAcknowledgableMessage kafkaAcknowledgableMessage) {
final KafkaAcknowledgableMessage sequentialized = kafkaAcknowledgableMessage.commitAfter(last);
last = sequentialized.getAcknowledgementFuture();
return List.of(sequentialized);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ final class KafkaAcknowledgableMessage {
});
}

private KafkaAcknowledgableMessage(final AcknowledgeableMessage acknowledgeableMessage,
final CompletableFuture<ConsumerMessage.CommittableOffset> acknowledgementFuture) {
this.acknowledgementFuture = acknowledgementFuture;
this.acknowledgeableMessage = acknowledgeableMessage;
}

AcknowledgeableMessage getAcknowledgeableMessage() {
return acknowledgeableMessage;
}
Expand All @@ -52,10 +46,4 @@ CompletableFuture<ConsumerMessage.CommittableOffset> getAcknowledgementFuture()
return acknowledgementFuture;
}

KafkaAcknowledgableMessage commitAfter(final CompletableFuture<ConsumerMessage.CommittableOffset> precedingFuture) {
final CompletableFuture<ConsumerMessage.CommittableOffset> chainedFuture =
precedingFuture.thenCompose(result -> acknowledgementFuture);
return new KafkaAcknowledgableMessage(acknowledgeableMessage, chainedFuture);
}

}

0 comments on commit 5921444

Please sign in to comment.