diff --git a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java index fad58ec8b6452..f391e0a6ac44d 100644 --- a/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java +++ b/components/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.azure.storage.queue; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -111,10 +112,24 @@ public int processBatch(Queue exchanges) { // update pending number of exchanges pendingExchanges = total - index - 1; + // copy messageId, popReceipt, timeout for fix exchange override case + // azure storage blob can override this headers + final String messageId = exchange.getIn() + .getHeader(QueueConstants.MESSAGE_ID, String.class); + final String popReceipt = exchange.getIn() + .getHeader(QueueConstants.POP_RECEIPT, String.class); + final Duration timeout = exchange.getIn() + .getHeader(QueueConstants.TIMEOUT, Duration.class); + // add on completion to handle after work when the exchange is done exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { + // past messageId, popReceipt, timeout for fix exchange override case + exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, messageId); + exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, popReceipt); + exchange.getIn().setHeader(QueueConstants.TIMEOUT, timeout); + processCommit(exchange); }