Skip to content

Commit

Permalink
By doing the validation whether the kafka message was successfully se…
Browse files Browse the repository at this point in the history
…nd in the prepare commit, it happened after storing the token. Which made it very likely to miss events, this has now been fixed.
  • Loading branch information
gklijs committed Oct 16, 2023
1 parent 0eff0f9 commit 6e8994e
Showing 1 changed file with 9 additions and 12 deletions.
Expand Up @@ -28,7 +28,6 @@
import org.axonframework.lifecycle.Phase;
import org.axonframework.messaging.EventPublicationFailedException;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.MessageMonitor.MonitorCallback;
import org.axonframework.monitoring.NoOpMessageMonitor;
Expand Down Expand Up @@ -137,7 +136,6 @@ public <T extends EventMessage<?>> void send(T event) {
logger.debug("Skip publishing event for [{}] since topicFunction returned empty.", event.getPayloadType());
return;
}
UnitOfWork<?> uow = CurrentUnitOfWork.get();

MonitorCallback monitorCallback = messageMonitor.onMessageIngested(event);
Producer<K, V> producer = producerFactory.createProducer();
Expand All @@ -150,21 +148,20 @@ public <T extends EventMessage<?>> void send(T event) {
// Sends event messages to Kafka and receive a future indicating the status.
Future<RecordMetadata> publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic.get()));

uow.onPrepareCommit(u -> {
if (confirmationMode.isTransactional()) {
tryCommit(producer, monitorCallback);
} else if (confirmationMode.isWaitForAck()) {
waitForPublishAck(publishStatus, monitorCallback);
}
tryClose(producer);
});

uow.onRollback(u -> {
CurrentUnitOfWork.get().onRollback(u -> {
//provides a way to prevent duplicate messages on Kafka, in case there is a problem with the token store
if (confirmationMode.isTransactional()) {
tryRollback(producer);
}
tryClose(producer);
});

if (confirmationMode.isTransactional()) {
tryCommit(producer, monitorCallback);
} else if (confirmationMode.isWaitForAck()) {
waitForPublishAck(publishStatus, monitorCallback);
}
tryClose(producer);
}

private void tryBeginTxn(Producer<?, ?> producer) {
Expand Down

0 comments on commit 6e8994e

Please sign in to comment.