From 6e8994efa42170a959a070298a801cbe78f4970b Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Sat, 14 Oct 2023 01:25:47 +0200 Subject: [PATCH] By doing the validation whether the kafka message was successfully send in the prepare commit, it happened after storing the token. Which made it very likely to miss events, this has now been fixed. --- .../producer/KafkaPublisher.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java index 007e7497..ffd7253e 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java @@ -28,7 +28,6 @@ import org.axonframework.lifecycle.Phase; import org.axonframework.messaging.EventPublicationFailedException; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; -import org.axonframework.messaging.unitofwork.UnitOfWork; import org.axonframework.monitoring.MessageMonitor; import org.axonframework.monitoring.MessageMonitor.MonitorCallback; import org.axonframework.monitoring.NoOpMessageMonitor; @@ -137,7 +136,6 @@ public > void send(T event) { logger.debug("Skip publishing event for [{}] since topicFunction returned empty.", event.getPayloadType()); return; } - UnitOfWork uow = CurrentUnitOfWork.get(); MonitorCallback monitorCallback = messageMonitor.onMessageIngested(event); Producer producer = producerFactory.createProducer(); @@ -150,21 +148,20 @@ public > void send(T event) { // Sends event messages to Kafka and receive a future indicating the status. Future publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic.get())); - uow.onPrepareCommit(u -> { - if (confirmationMode.isTransactional()) { - tryCommit(producer, monitorCallback); - } else if (confirmationMode.isWaitForAck()) { - waitForPublishAck(publishStatus, monitorCallback); - } - tryClose(producer); - }); - - uow.onRollback(u -> { + CurrentUnitOfWork.get().onRollback(u -> { + //provides a way to prevent duplicate messages on Kafka, in case there is a problem with the token store if (confirmationMode.isTransactional()) { tryRollback(producer); } tryClose(producer); }); + + if (confirmationMode.isTransactional()) { + tryCommit(producer, monitorCallback); + } else if (confirmationMode.isWaitForAck()) { + waitForPublishAck(publishStatus, monitorCallback); + } + tryClose(producer); } private void tryBeginTxn(Producer producer) {