Skip to content

Commit

Permalink
Refactor AtMostOnceConsumerStream to not materialize 1 external messa…
Browse files Browse the repository at this point in the history
…ge sink per message.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 19, 2021
1 parent 1c06d05 commit 3ed9783
Showing 1 changed file with 53 additions and 26 deletions.
Expand Up @@ -32,7 +32,9 @@
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

/**
* Kafka consumer stream with "at most once" (QoS 0) semantics.
Expand Down Expand Up @@ -66,7 +68,7 @@ final class AtMostOnceConsumerStream implements KafkaConsumerStream {
.filter(consumerRecord -> isNotDryRun(consumerRecord, dryRun))
.map(kafkaMessageTransformer::transform)
.filter(result -> !result.isExpired())
.flatMapConcat(this::processTransformationResult)
.via(processTransformationResult())
.mapAsync(consumerMaxInflight, x -> x)
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(materializer);
Expand All @@ -82,34 +84,59 @@ public CompletionStage<Done> stop() {
return consumerControl.drainAndShutdown(materializer.executionContext());
}

private Sink<KafkaCompletableMessage, NotUsed> externalMessageSink() {
return Flow.of(KafkaCompletableMessage.class)
.map(KafkaCompletableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink);
private Flow<TransformationResult, Either<TransformationResult, CompletableFuture<Done>>, NotUsed>
externalMessageFlow() {
return Flow.of(TransformationResult.class)
.<Either<TransformationResult, KafkaCompletableMessage>>map(transformationResult ->
isExternalMessage(transformationResult)
? new Right<>(toAcknowledgeableMessage(transformationResult))
: new Left<>(transformationResult))
.alsoTo(Flow.<Either<TransformationResult, KafkaCompletableMessage>>create()
.filter(Either::isRight)
.map(either -> either.right().get())
.map(KafkaCompletableMessage::getAcknowledgeableMessage)
.to(inboundMappingSink))
.map(either -> either.right().map(KafkaCompletableMessage::getAcknowledgementFuture));
}

private Source<CompletableFuture<Done>, NotUsed> processTransformationResult(
final TransformationResult result) {

if (isExternalMessage(result)) {
return Source.single(result)
.map(this::toAcknowledgeableMessage)
.alsoTo(this.externalMessageSink())
.map(KafkaCompletableMessage::getAcknowledgementFuture);
}

final CompletableFuture<Done> offsetFuture = CompletableFuture.completedFuture(Done.getInstance());
private Flow<Either<TransformationResult, CompletableFuture<Done>>, Either<TransformationResult, CompletableFuture<Done>>, NotUsed>
dittoRuntimeExceptionFlow() {
return Flow.<Either<TransformationResult, CompletableFuture<Done>>>create()
.alsoTo(Flow.<Either<TransformationResult, CompletableFuture<Done>>>create()
.filter(either -> either.isLeft() && isDittoRuntimeException(either.left().get()))
.map(either -> either.left().get())
.map(AtMostOnceConsumerStream::extractDittoRuntimeException)
.to(dreSink))
.map(either -> {
if (either.isLeft() && isDittoRuntimeException(either.left().get())) {
return new Right<>(CompletableFuture.completedFuture(Done.getInstance()));
} else {
return either;
}
});
}

if (isDittoRuntimeException(result)) {
return Source.single(result)
.map(AtMostOnceConsumerStream::extractDittoRuntimeException)
.alsoTo(dreSink)
.map(dre -> offsetFuture);
}
private Flow<Either<TransformationResult, CompletableFuture<Done>>, CompletableFuture<Done>, NotUsed>
unexpectedMessageFlow() {
return Flow.<Either<TransformationResult, CompletableFuture<Done>>>create()
.alsoTo(Flow.<Either<TransformationResult, CompletableFuture<Done>>>create()
.filter(Either::isLeft)
.map(either -> either.left().get())
.to(unexpectedMessageSink()))
.map(either -> {
if (either.isLeft()) {
return CompletableFuture.completedFuture(Done.getInstance());
} else {
return either.right().get();
}
});
}

return Source.single(result)
.alsoTo(unexpectedMessageSink())
.map(unexpected -> offsetFuture);
private Flow<TransformationResult, CompletableFuture<Done>, NotUsed> processTransformationResult() {
return Flow.<TransformationResult>create()
.via(externalMessageFlow())
.via(dittoRuntimeExceptionFlow())
.via(unexpectedMessageFlow());
}

private KafkaCompletableMessage toAcknowledgeableMessage(final TransformationResult value) {
Expand Down

0 comments on commit 3ed9783

Please sign in to comment.